Home > Software engineering >  Why is the spark.implicits._ import not helping with encoder derivation inside a method?
Why is the spark.implicits._ import not helping with encoder derivation inside a method?

Time:11-10

So, importing an implicit member from a created instance works as expected,

object Test extends App {
  class Bag {
    implicit val ssss: String = "omg"
  }

  def call(): Unit = {
    val bag = new Bag

    import bag._

    val s = implicitly[String]

    println(s)
  }

  call()
}

But, if I try doing the same with spark.implicits._

object Test extends App {
  val spark: SparkSession = ...

  def call(): Unit = {
    import spark.implicits._

    case class Person(id: Long, name: String)

    // I can summon an existing encoder
    // val enc = implicitly[Encoder[Long]]

    // but encoder derivation is failing for some reason
    // val encP = implicitly[Encoder[Person]]

    val df: Dataset[Person] =
      spark.range(10).map(i => Person(i, i.toString))

    df.show()
  }
}

It fails to derive the Encoder[Person],

Unable to find encoder for type Person. An implicit Encoder[Person] is needed to store Person instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
        .map(i => Person(i, i.toString)

But, it works if I create the dataframe outside the method,

object Test extends App {
  val spark: SparkSession = ...

  import spark.implicits._

  case class Person(id: Long, name: String)

  val df: Dataset[Person] =
    spark.range(10).map(i => Person(i, i.toString))

  df.show()
}

Tested with Scala version 2.13.10 and 2.12.17 with Spark version 3.3.1.

CodePudding user response:

The local case class is the reason for provided behaviour. Local class has so called free type and more about that you can check here. You may try to experiment adding TypeTag for Person in local scope to see it it may help.

CodePudding user response:

As you already found out yourself, local Person doesn't have TypeTag. But it has WeakTypeTag. Let's try to define Encoder for such class.

Naive approach with constructing TypeTag doesn't work

How to create a TypeTag manually?

In scala 2.12, why none of the TypeTag created in runtime is serializable?

Scala Spark Encoders.product[X] (where X is a case class) keeps giving me "No TypeTag available for X" error

Spark: DF.as[Type] fails to compile

implicit def ttag[A: WeakTypeTag]: TypeTag[A] = {
  val ttag = null // hiding implicit by name
  val wttagImpl = weakTypeTag[A].asInstanceOf[WeakTypeTag[A] {val mirror: Mirror; val tpec: TypeCreator}]
  TypeTag[A](wttagImpl.mirror, wttagImpl.tpec)
}

java.lang.NoClassDefFoundError: no Java class corresponding to Person found

https://gist.github.com/DmytroMitin/41b7439d2e504e37f29b02e3500d24b1

Similar results is for

def typeToTypeTag[T](
                      tpe: Type,
                      mirror: api.Mirror[universe.type]
                    ): TypeTag[T] = {
  TypeTag(mirror, new TypeCreator {
    def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) = {
      assert(m eq mirror, s"TypeTag[$tpe] defined in $mirror cannot be migrated to $m.")
      tpe.asInstanceOf[U#Type]
    }
  })
}

implicit def ttag[T: WeakTypeTag]: TypeTag[T] = {
  val ttag = null
  typeToTypeTag(weakTypeOf[T], mirror)
}

java.lang.NoClassDefFoundError: no Java class corresponding to Person found

https://gist.github.com/DmytroMitin/c7a24abf1ff1011a1c87aa9d161d6395

implicit val personTtag: TypeTag[Person] = {
  val personTtag = null
  tb.eval(q"org.apache.spark.sql.catalyst.ScalaReflection.universe.typeTag[${weakTypeOf[Person]}]")
    .asInstanceOf[TypeTag[Person]]
}

scala.tools.reflect.ToolBoxError: reflective toolbox failed due to unresolved free type variables

https://gist.github.com/DmytroMitin/6e35c0332f845fcd227d35ec49d4122f

This is how Encoder[T] is defined for T having TypeTag

implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T]

object Encoders {
  def product[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder()
}

object ExpressionEncoder {
  def apply[T : TypeTag](): ExpressionEncoder[T] = {
    val mirror = ScalaReflection.mirror
    val tpe = typeTag[T].in(mirror).tpe

    val cls = mirror.runtimeClass(tpe)
    val serializer = ScalaReflection.serializerForType(tpe)
    val deserializer = ScalaReflection.deserializerForType(tpe)

    new ExpressionEncoder[T](
      serializer,
      deserializer,
      ClassTag[T](cls)
    )
  }
}

Let's try to modify it for T having WeakTypeTag and ClassTag

implicit def apply[T: WeakTypeTag /*: ClassTag*/]: Encoder[T] = {
  val tpe = weakTypeTag[T].in(mirror).tpe

  val cls = mirror.runtimeClass(tpe)
  val serializer = ScalaReflection.serializerForType(tpe)
  val deserializer = ScalaReflection.deserializerForType(tpe)

  new ExpressionEncoder[T](
    serializer,
    deserializer,
    ClassTag[T](cls)
  )
}

java.lang.NoClassDefFoundError: no Java class corresponding to Person found

https://gist.github.com/DmytroMitin/b58848fa6575b6fab0e9b8285095cc60

// (*)
implicit def apply[T/*: WeakTypeTag*/ : ClassTag]: Encoder[T] = {
  val tpe = mirror.classSymbol(classTag[T].runtimeClass).toType
  val serializer = ScalaReflection.serializerForType(tpe)
  val deserializer = ScalaReflection.deserializerForType(tpe)

  new ExpressionEncoder[T](
    serializer,
    deserializer,
    classTag[T]
  )
}

org.apache.spark.SparkException: Task not serializable

Caused by: java.io.NotSerializableException: Main

https://gist.github.com/DmytroMitin/0c86933f96e136d44fff555295ce01dd

So finally let's make Main extend Serializable

 --- ---- 
| id|name|
 --- ---- 
|  0|   0|
|  1|   1|
|  2|   2|
|  3|   3|
|  4|   4|
|  5|   5|
|  6|   6|
|  7|   7|
|  8|   8|
|  9|   9|
 --- ---- 

https://gist.github.com/DmytroMitin/0e9b0bd2ed6237a4a1e1c40d620a9d88

So (*) is correct Encoder.

  • Related