I'm trying to run an example from the Spark book Spark: The Definitive Guide
build.sbt
ThisBuild / scalaVersion := "3.2.1"
libraryDependencies = Seq(
("org.apache.spark" %% "spark-sql" % "3.2.0" % "provided").cross(CrossVersion.for3Use2_13)
)
Compile / run := Defaults.runTask(Compile / fullClasspath, Compile / run / mainClass, Compile / run / runner).evaluated
lazy val root = (project in file("."))
.settings(
name := "scalalearn"
)
main.scala
// imports
...
object spark1 {
@main
def main(args: String*): Unit = {
...
case class Flight(
DEST_COUNTRY_NAME: String,
ORIGIN_COUNTRY_NAME: String,
count: BigInt
)
val flightsDF = spark.read
.parquet(s"$dataRootPath/data/flight-data/parquet/2010-summary.parquet/")
// import spark.implicits._ // FAILS
// import spark.sqlContext.implicits._ // FAILS
val flights = flightsDF.as[Flight]
// in Scala
flights
.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
.map(flight_row => flight_row)
.take(5)
spark.stop()
}
}
Getting an error with the line val flights = flightsDF.as[Flight]
Unable to find encoder for type Flight. An implicit Encoder[Flight] is needed to store Flight
instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes)
are supported by importing spark.implicits._ Support for serializing other types will be added in
future releases.
Any help is appreciated.
Scala - 3.2.1 Spark - 3.2.0
Tried importing implicits from spark.implicits._
and spark.sqlContext.implicits._
The example works on scala 2.x
Looking for a way to convert DF to case class without any third party workarounds
CodePudding user response:
You need to add Scala-3 dependency for Spark codecs
https://github.com/vincenzobaz/spark-scala3
libraryDependencies = "io.github.vincenzobaz" %% "spark-scala3" % "0.1.3"
and import Scala-3
import scala3encoders.given
instead of Scala-2
import spark.implicits._ // FAILS
import spark.sqlContext.implicits._ // FAILS
Regarding BigInt
,
Does Spark support BigInteger type?
Spark does support Java
BigInteger
s but possibly with some loss of precision. If the numerical value of theBigInteger
fits in along
(i.e. between -2^63 and 2^63-1) then it will be stored by Spark as aLongType
. Otherwise it will be stored as aDecimalType
, but this type only supports 38 digits of precision.
Correct codecs for comparatively small BigInt
s (fitting into LongType
) are
import scala3encoders.derivation.{Deserializer, Serializer}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
import org.apache.spark.sql.types.{DataType, LongType, ObjectType}
given Deserializer[BigInt] with
def inputType: DataType = LongType
def deserialize(path: Expression): Expression =
StaticInvoke(
BigInt.getClass,
ObjectType(classOf[BigInt]),
"apply",
path :: Nil,
returnNullable = false
)
given Serializer[BigInt] with
def inputType: DataType = ObjectType(classOf[BigInt])
def serialize(inputObject: Expression): Expression =
Invoke(inputObject, "longValue", LongType, returnNullable = false)
import scala3encoders.given
(https://github.com/databricks/Spark-The-Definitive-Guide)
https://github.com/yashwanthreddyg/spark_stackoverflow/pull/1
https://gist.github.com/DmytroMitin/3c0fe6983a254b350ff9feedbb066bef
https://github.com/vincenzobaz/spark-scala3/pull/22
For large BigInt
s (not fitting into LongType
when DecimalType
is necessary) the codecs are
import scala3encoders.derivation.{Deserializer, Serializer}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
import org.apache.spark.sql.types.{DataType, DataTypes, Decimal, ObjectType}
val decimalType = DataTypes.createDecimalType(38, 0)
given Deserializer[BigInt] with
def inputType: DataType = decimalType
def deserialize(path: Expression): Expression =
Invoke(path, "toScalaBigInt", ObjectType(classOf[scala.math.BigInt]), returnNullable = false)
given Serializer[BigInt] with
def inputType: DataType = ObjectType(classOf[BigInt])
def serialize(inputObject: Expression): Expression =
StaticInvoke(
Decimal.getClass,
decimalType,
"apply",
inputObject :: Nil,
returnNullable = false
)
import scala3encoders.given
which is almost the same as
import org.apache.spark.sql.catalyst.DeserializerBuildHelper.createDeserializerForScalaBigInt
import org.apache.spark.sql.catalyst.SerializerBuildHelper.createSerializerForScalaBigInt
import scala3encoders.derivation.{Deserializer, Serializer}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.{DataType, DataTypes, ObjectType}
val decimalType = DataTypes.createDecimalType(38, 0)
given Deserializer[BigInt] with
def inputType: DataType = decimalType
def deserialize(path: Expression): Expression =
createDeserializerForScalaBigInt(path)
given Serializer[BigInt] with
def inputType: DataType = ObjectType(classOf[BigInt])
def serialize(inputObject: Expression): Expression =
createSerializerForScalaBigInt(inputObject)
import scala3encoders.given
https://gist.github.com/DmytroMitin/8124d2a4cd25c8488c00c5a32f244f64
Runtime exception you observed meant that BigInt
s from the parquet file are comparatively small (fitting into LongType
) and you tried my codecs for large BigInt
s (DecimalType
).
The approach with manually created TypeTag
s seems to work too (not using scala3encoders
)
// libraryDependencies = scalaOrganization.value % "scala-reflect" % "2.13.10" // in Scala 3
import scala.reflect.api
import scala.reflect.runtime.universe.{NoType, Type, TypeTag, internal}
import scala.reflect.runtime.universe
inline def createTypeTag[T](mirror: api.Mirror[_ <: api.Universe with Singleton], tpe: mirror.universe.Type): mirror.universe.TypeTag[T] = {
mirror.universe.TypeTag.apply[T](mirror.asInstanceOf[api.Mirror[mirror.universe.type]],
new api.TypeCreator {
override def apply[U <: api.Universe with Singleton](m: api.Mirror[U]): m.universe.Type = {
tpe.asInstanceOf[m.universe.Type]
}
}
)
}
val rm = universe.runtimeMirror(this.getClass.getClassLoader)
// val bigIntTpe = internal.typeRef(internal.typeRef(NoType, rm.staticPackage("scala.math"), Nil), rm.staticClass("scala.math.BigInt"), Nil)
// val strTpe = internal.typeRef(internal.typeRef(NoType, rm.staticPackage("java.lang"), Nil), rm.staticClass("java.lang.String"), Nil)
val flightTpe = internal.typeRef(NoType, rm.staticClass("Flight"), Nil)
// given TypeTag[BigInt] = createTypeTag[BigInt](rm, bigIntTpe)
// given TypeTag[String] = createTypeTag[String](rm, strTpe)
given TypeTag[Flight] = createTypeTag[Flight](rm, flightTpe)
import spark.implicits._
https://gist.github.com/DmytroMitin/bb0ccd5f1c533b2baec1756da52f8824