Spark: DF.as[Type] fails to compile


I'm trying to run an example from the Spark book Spark: The Definitive Guide


ThisBuild / scalaVersion := "3.2.1"

libraryDependencies   = Seq(
  ("org.apache.spark" %% "spark-sql" % "3.2.0" % "provided").cross(CrossVersion.for3Use2_13)

Compile / run := Defaults.runTask(Compile / fullClasspath, Compile / run / mainClass, Compile / run / runner).evaluated

lazy val root = (project in file("."))
    name := "scalalearn"


// imports

object spark1 {
  def main(args: String*): Unit = {

    case class Flight(
                       DEST_COUNTRY_NAME: String,
                       ORIGIN_COUNTRY_NAME: String,
                       count: BigInt

    val flightsDF = spark.read

    //    import spark.implicits._ // FAILS
    //    import spark.sqlContext.implicits._ // FAILS

    val flights = flightsDF.as[Flight]

    // in Scala
      .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
      .map(flight_row => flight_row)


Getting an error with the line val flights = flightsDF.as[Flight]

Unable to find encoder for type Flight. An implicit Encoder[Flight] is needed to store Flight
 instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes)
 are supported by importing spark.implicits._ Support for serializing other types will be added in
 future releases.

Any help is appreciated.

Scala - 3.2.1 Spark - 3.2.0

Tried importing implicits from spark.implicits._ and spark.sqlContext.implicits._ The example works on scala 2.x

Looking for a way to convert DF to case class without any third party workarounds

You need to add Scala-3 dependency for Spark codecs


libraryDependencies  = "io.github.vincenzobaz" %% "spark-scala3" % "0.1.3"

and import Scala-3

import scala3encoders.given

instead of Scala-2

import spark.implicits._ // FAILS
import spark.sqlContext.implicits._ // FAILS

Scala Spark Encoders.product[X] (where X is a case class) keeps giving me "No TypeTag available for X" error

Regarding BigInt,

Does Spark support BigInteger type?

Spark does support Java BigIntegers but possibly with some loss of precision. If the numerical value of the BigInteger fits in a long (i.e. between -2^63 and 2^63-1) then it will be stored by Spark as a LongType. Otherwise it will be stored as a DecimalType, but this type only supports 38 digits of precision.

Correct codecs for comparatively small BigInts (fitting into LongType) are

import scala3encoders.derivation.{Deserializer, Serializer}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
import org.apache.spark.sql.types.{DataType, LongType, ObjectType}

given Deserializer[BigInt] with
  def inputType: DataType = LongType

  def deserialize(path: Expression): Expression =
      path :: Nil,
      returnNullable = false

given Serializer[BigInt] with
  def inputType: DataType = ObjectType(classOf[BigInt])

  def serialize(inputObject: Expression): Expression =
    Invoke(inputObject, "longValue", LongType, returnNullable = false)

import scala3encoders.given






For large BigInts (not fitting into LongType when DecimalType is necessary) the codecs are

import scala3encoders.derivation.{Deserializer, Serializer}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
import org.apache.spark.sql.types.{DataType, DataTypes, Decimal, ObjectType}

val decimalType = DataTypes.createDecimalType(38, 0)

given Deserializer[BigInt] with
  def inputType: DataType = decimalType

  def deserialize(path: Expression): Expression =
    Invoke(path, "toScalaBigInt", ObjectType(classOf[scala.math.BigInt]), returnNullable = false)

given Serializer[BigInt] with
  def inputType: DataType = ObjectType(classOf[BigInt])

  def serialize(inputObject: Expression): Expression =
      inputObject :: Nil,
      returnNullable = false

import scala3encoders.given

which is almost the same as

import org.apache.spark.sql.catalyst.DeserializerBuildHelper.createDeserializerForScalaBigInt
import org.apache.spark.sql.catalyst.SerializerBuildHelper.createSerializerForScalaBigInt
import scala3encoders.derivation.{Deserializer, Serializer}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.{DataType, DataTypes, ObjectType}

val decimalType = DataTypes.createDecimalType(38, 0)

given Deserializer[BigInt] with
  def inputType: DataType = decimalType

  def deserialize(path: Expression): Expression =

given Serializer[BigInt] with
  def inputType: DataType = ObjectType(classOf[BigInt])

  def serialize(inputObject: Expression): Expression =

import scala3encoders.given


Runtime exception you observed meant that BigInts from the parquet file are comparatively small (fitting into LongType) and you tried my codecs for large BigInts (DecimalType).

The approach with manually created TypeTags seems to work too (not using scala3encoders)

// libraryDependencies  = scalaOrganization.value % "scala-reflect" % "2.13.10" // in Scala 3
import scala.reflect.api
import scala.reflect.runtime.universe.{NoType, Type, TypeTag, internal}
import scala.reflect.runtime.universe

inline def createTypeTag[T](mirror: api.Mirror[_ <: api.Universe with Singleton], tpe: mirror.universe.Type): mirror.universe.TypeTag[T] = {
    new api.TypeCreator {
      override def apply[U <: api.Universe with Singleton](m: api.Mirror[U]): m.universe.Type = {

val rm = universe.runtimeMirror(this.getClass.getClassLoader)
// val bigIntTpe = internal.typeRef(internal.typeRef(NoType, rm.staticPackage("scala.math"), Nil), rm.staticClass("scala.math.BigInt"), Nil)
// val strTpe = internal.typeRef(internal.typeRef(NoType, rm.staticPackage("java.lang"), Nil), rm.staticClass("java.lang.String"), Nil)
val flightTpe = internal.typeRef(NoType, rm.staticClass("Flight"), Nil)
// given TypeTag[BigInt] = createTypeTag[BigInt](rm, bigIntTpe)
// given TypeTag[String] = createTypeTag[String](rm, strTpe)
given TypeTag[Flight] = createTypeTag[Flight](rm, flightTpe)

import spark.implicits._


