So, importing an implicit member from a created instance works as expected,
object Test extends App {
class Bag {
implicit val ssss: String = "omg"
}
def call(): Unit = {
val bag = new Bag
import bag._
val s = implicitly[String]
println(s)
}
call()
}
But, if I try doing the same with spark.implicits._
object Test extends App {
val spark: SparkSession = ...
def call(): Unit = {
import spark.implicits._
case class Person(id: Long, name: String)
// I can summon an existing encoder
// val enc = implicitly[Encoder[Long]]
// but encoder derivation is failing for some reason
// val encP = implicitly[Encoder[Person]]
val df: Dataset[Person] =
spark.range(10).map(i => Person(i, i.toString))
df.show()
}
}
It fails to derive the Encoder[Person]
,
Unable to find encoder for type Person. An implicit Encoder[Person] is needed to store Person instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
.map(i => Person(i, i.toString)
But, it works if I create the dataframe outside the method,
object Test extends App {
val spark: SparkSession = ...
import spark.implicits._
case class Person(id: Long, name: String)
val df: Dataset[Person] =
spark.range(10).map(i => Person(i, i.toString))
df.show()
}
Tested with Scala version 2.13.10
and 2.12.17
with Spark version 3.3.1
.
CodePudding user response:
The local case class
is the reason for provided behaviour. Local class has so called free type and more about that you can check here. You may try to experiment adding TypeTag
for Person
in local scope to see it it may help.
CodePudding user response:
As you already found out yourself, local Person
doesn't have TypeTag
. But it has WeakTypeTag
. Let's try to define Encoder
for such class.
Naive approach with constructing TypeTag
doesn't work
How to create a TypeTag manually?
In scala 2.12, why none of the TypeTag created in runtime is serializable?
Spark: DF.as[Type] fails to compile
implicit def ttag[A: WeakTypeTag]: TypeTag[A] = {
val ttag = null // hiding implicit by name
val wttagImpl = weakTypeTag[A].asInstanceOf[WeakTypeTag[A] {val mirror: Mirror; val tpec: TypeCreator}]
TypeTag[A](wttagImpl.mirror, wttagImpl.tpec)
}
java.lang.NoClassDefFoundError: no Java class corresponding to Person found
https://gist.github.com/DmytroMitin/41b7439d2e504e37f29b02e3500d24b1
Similar results is for
def typeToTypeTag[T](
tpe: Type,
mirror: api.Mirror[universe.type]
): TypeTag[T] = {
TypeTag(mirror, new TypeCreator {
def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) = {
assert(m eq mirror, s"TypeTag[$tpe] defined in $mirror cannot be migrated to $m.")
tpe.asInstanceOf[U#Type]
}
})
}
implicit def ttag[T: WeakTypeTag]: TypeTag[T] = {
val ttag = null
typeToTypeTag(weakTypeOf[T], mirror)
}
java.lang.NoClassDefFoundError: no Java class corresponding to Person found
https://gist.github.com/DmytroMitin/c7a24abf1ff1011a1c87aa9d161d6395
implicit val personTtag: TypeTag[Person] = {
val personTtag = null
tb.eval(q"org.apache.spark.sql.catalyst.ScalaReflection.universe.typeTag[${weakTypeOf[Person]}]")
.asInstanceOf[TypeTag[Person]]
}
scala.tools.reflect.ToolBoxError: reflective toolbox failed due to unresolved free type variables
https://gist.github.com/DmytroMitin/6e35c0332f845fcd227d35ec49d4122f
This is how Encoder[T]
is defined for T
having TypeTag
implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T]
object Encoders {
def product[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder()
}
object ExpressionEncoder {
def apply[T : TypeTag](): ExpressionEncoder[T] = {
val mirror = ScalaReflection.mirror
val tpe = typeTag[T].in(mirror).tpe
val cls = mirror.runtimeClass(tpe)
val serializer = ScalaReflection.serializerForType(tpe)
val deserializer = ScalaReflection.deserializerForType(tpe)
new ExpressionEncoder[T](
serializer,
deserializer,
ClassTag[T](cls)
)
}
}
Let's try to modify it for T
having WeakTypeTag
and ClassTag
implicit def apply[T: WeakTypeTag /*: ClassTag*/]: Encoder[T] = {
val tpe = weakTypeTag[T].in(mirror).tpe
val cls = mirror.runtimeClass(tpe)
val serializer = ScalaReflection.serializerForType(tpe)
val deserializer = ScalaReflection.deserializerForType(tpe)
new ExpressionEncoder[T](
serializer,
deserializer,
ClassTag[T](cls)
)
}
java.lang.NoClassDefFoundError: no Java class corresponding to Person found
https://gist.github.com/DmytroMitin/b58848fa6575b6fab0e9b8285095cc60
// (*)
implicit def apply[T/*: WeakTypeTag*/ : ClassTag]: Encoder[T] = {
val tpe = mirror.classSymbol(classTag[T].runtimeClass).toType
val serializer = ScalaReflection.serializerForType(tpe)
val deserializer = ScalaReflection.deserializerForType(tpe)
new ExpressionEncoder[T](
serializer,
deserializer,
classTag[T]
)
}
org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: Main
https://gist.github.com/DmytroMitin/0c86933f96e136d44fff555295ce01dd
So finally let's make Main
extend Serializable
--- ----
| id|name|
--- ----
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
| 5| 5|
| 6| 6|
| 7| 7|
| 8| 8|
| 9| 9|
--- ----
https://gist.github.com/DmytroMitin/0e9b0bd2ed6237a4a1e1c40d620a9d88
So (*) is correct Encoder
.