Home > Blockchain >  Scala Spark Encoders.product[X] (where X is a case class) keeps giving me "No TypeTag available
Scala Spark Encoders.product[X] (where X is a case class) keeps giving me "No TypeTag available

Time:09-27

I am working with Intellij Idea, in a Scala worksheet. I want to create a encoder for a scala case class. From various posts on internet I found the suggestion to use Encoders.product. But it never worked for me.

The following code

import org.apache.spark.sql.*

val spark: SparkSession =
    SparkSession
      .builder()
      .appName("test")
      .master("local")
      .getOrCreate()

import scala3encoders.given

case class classa(i: Int, j: Int, s: String)

val enc = Encoders.product[classa]

keep throwing error:

-- Error: ----------------------------------------------------------------------
1 |val enc = Encoders.product[classa]
  |                                  ^
  |                                  No TypeTag available for classa
1 error found

Does anyone know what's going on there?

The content of build.sbt file is:

scalaVersion := "3.1.3"
scalacOptions   = Seq("-language:implicitConversions", "-deprecation")
libraryDependencies   = Seq(
  excludes(("org.apache.spark" %% "spark-core" % "3.2.0").cross(CrossVersion.for3Use2_13)),
  excludes(("org.apache.spark" %% "spark-sql" % "3.2.0").cross(CrossVersion.for3Use2_13)),
  excludes("io.github.vincenzobaz" %% "spark-scala3" % "0.1.3"),
  "org.scalameta" %% "munit" % "0.7.26" % Test
)

//netty-all replaces all these excludes
def excludes(m: ModuleID): ModuleID =
  m.exclude("io.netty", "netty-common").
    exclude("io.netty", "netty-handler").
    exclude("io.netty", "netty-transport").
    exclude("io.netty", "netty-buffer").
    exclude("io.netty", "netty-codec").
    exclude("io.netty", "netty-resolver").
    exclude("io.netty", "netty-transport-native-epoll").
    exclude("io.netty", "netty-transport-native-unix-common").
    exclude("javax.xml.bind", "jaxb-api").
    exclude("jakarta.xml.bind", "jaxb-api").
    exclude("javax.activation", "activation").
    exclude("jakarta.annotation", "jakarta.annotation-api").
    exclude("javax.annotation", "javax.annotation-api")

// Without forking, ctrl-c doesn't actually fully stop Spark
run / fork := true
Test / fork := true

CodePudding user response:

Encoders.product[classa] is a Scala 2 thing. This method accepts an implicit TypeTag. There are no TypeTags in Scala 3. In Scala 3 the library maintainers propose to work in the following way:

https://github.com/vincenzobaz/spark-scala3/blob/main/examples/src/main/scala/sql/StarWars.scala

package sql

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.{Dataset, DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql._


object StarWars extends App:
  val spark = SparkSession.builder().master("local").getOrCreate
  import spark.implicits.localSeqToDatasetHolder
  import scala3encoders.given

  extension [T: Encoder] (seq: Seq[T])
    def toDS: Dataset[T] =
      localSeqToDatasetHolder(seq).toDS

  case class Friends(name: String, friends: String)
  val df: Dataset[Friends] = Seq(
      ("Yoda",             "Obi-Wan Kenobi"),
      ("Anakin Skywalker", "Sheev Palpatine"),
      ("Luke Skywalker",   "Han Solo, Leia Skywalker"),
      ("Leia Skywalker",   "Obi-Wan Kenobi"),
      ("Sheev Palpatine",  "Anakin Skywalker"),
      ("Han Solo",         "Leia Skywalker, Luke Skywalker, Obi-Wan Kenobi, Chewbacca"),
      ("Obi-Wan Kenobi",   "Yoda, Qui-Gon Jinn"),
      ("R2-D2",            "C-3PO"),
      ("C-3PO",            "R2-D2"),
      ("Darth Maul",       "Sheev Palpatine"),
      ("Chewbacca",        "Han Solo"),
      ("Lando Calrissian", "Han Solo"),
      ("Jabba",            "Boba Fett")
    ).toDS.map((n,f) => Friends(n, f))


  val friends = df.as[Friends]
  friends.show()
  case class FriendsMissing(who: String, friends: Option[String])
  val dsMissing: Dataset[FriendsMissing] = Seq( 
      ("Yoda",             Some("Obi-Wan Kenobi")),
      ("Anakin Skywalker", Some("Sheev Palpatine")),
      ("Luke Skywalker",   Option.empty[String]),
      ("Leia Skywalker",   Some("Obi-Wan Kenobi")),
      ("Sheev Palpatine",  Some("Anakin Skywalker")),
      ("Han Solo",         Some("Leia Skywalker, Luke Skywalker, Obi-Wan Kenobi"))
    ).toDS
     .map((a, b) => FriendsMissing(a, b)) 

  dsMissing.show()

  case class Character(
    name: String, 
    height: Int, 
    weight: Option[Int], 
    eyecolor: Option[String], 
    haircolor: Option[String], 
    jedi: String,
    species: String
  )

  val characters: Dataset[Character] = spark.sqlContext
    .read
    .option("header", "true")
    .option("delimiter", ";")
    .option("inferSchema", "true")
    .csv("StarWars.csv")
    .as[Character]

  characters.show()
  val sw_df = characters.join(friends, Seq("name"))
  sw_df.show()

  case class SW(
    name: String,
    height: Int,
    weight: Option[Int],
    eyecolor: Option[String],
    haircolor: Option[String],
    jedi: String,
    species: String,
    friends: String
  )

  val sw_ds = sw_df.as[SW]

So if you really need Encoders.product[classa] compile this part of your code with Scala 2

src/App.scala

// this is Scala 3
object App {
  def main(args: Array[String]): Unit = {
    println(App1.schema)
    // Seq(StructField(i,IntegerType,false), StructField(j,IntegerType,false), StructField(s,StringType,true))
  }
}

scala2/src/main/scala/App1.scala

import org.apache.spark.sql._

// this is Scala 2
object App1 {
  val schema = Encoders.product[classa].schema
}

common/src/main/scala/classa.scala

case class classa(i: Int, j: Int, s: String)

build.sbt

lazy val sparkCore = "org.apache.spark" %% "spark-core" % "3.2.0"
lazy val sparkSql = "org.apache.spark" %% "spark-sql" % "3.2.0"
lazy val scala3V = "3.1.3"
lazy val scala2V = "2.13.8"

lazy val root = project
  .in(file("."))
  .settings(
    scalaVersion := scala3V,
    scalacOptions   = Seq("-language:implicitConversions", "-deprecation"),
    libraryDependencies   = Seq(
      excludes(sparkCore.cross(CrossVersion.for3Use2_13)),
      excludes(sparkSql.cross(CrossVersion.for3Use2_13)),
      excludes("io.github.vincenzobaz" %% "spark-scala3" % "0.1.3"),
      "org.scalameta" %% "munit" % "0.7.26" % Test
    )
  )
  .dependsOn(scala2, common)

lazy val scala2 = project
  .settings(
    scalaVersion := scala2V,
    libraryDependencies   = Seq(
      sparkCore,
      sparkSql
    )
  )
  .dependsOn(common)

lazy val common = project
  .settings(
    scalaVersion := scala3V,
    crossScalaVersions := Seq(scala2V, scala3V)
  )

//netty-all replaces all these excludes
def excludes(m: ModuleID): ModuleID =
  m.exclude("io.netty", "netty-common").
    exclude("io.netty", "netty-handler").
    exclude("io.netty", "netty-transport").
    exclude("io.netty", "netty-buffer").
    exclude("io.netty", "netty-codec").
    exclude("io.netty", "netty-resolver").
    exclude("io.netty", "netty-transport-native-epoll").
    exclude("io.netty", "netty-transport-native-unix-common").
    exclude("javax.xml.bind", "jaxb-api").
    exclude("jakarta.xml.bind", "jaxb-api").
    exclude("javax.activation", "activation").
    exclude("jakarta.annotation", "jakarta.annotation-api").
    exclude("javax.annotation", "javax.annotation-api")

// Without forking, ctrl-c doesn't actually fully stop Spark
run / fork := true
Test / fork := true

Update. Alternatively, in Scala 3 you can calculate TypeTag with Scala 2 runtime compilation (reflective Toolbox): How to compile and execute scala code at run-time in Scala3?

Scala 2 macros don't work, so we can't do runtime.currentMirror or q"..." but can do universe.runtimeMirror, tb.parse. It turns out this still works in Scala 3.

// this is Scala 3
import org.apache.spark.sql.*
import scala.tools.reflect.ToolBox
import scala.reflect.runtime.universe
import scala.reflect.runtime.universe.*
import mypackage.classa

val rm = universe.runtimeMirror(getClass.getClassLoader)
val tb = rm.mkToolBox()
val typeTag = tb.eval(tb.parse(
    "scala.reflect.runtime.universe.typeTag[mypackage.classa]"
  )).asInstanceOf[TypeTag[classa]]

Encoders.product[classa](typeTag).schema
// Seq(StructField(i,IntegerType,false), StructField(j,IntegerType,false), StructField(s,StringType,true))

build.sbt

lazy val sparkCore = "org.apache.spark" %% "spark-core" % "3.2.0"
lazy val sparkSql = "org.apache.spark" %% "spark-sql" % "3.2.0"
lazy val scala3V = "3.1.3"
lazy val scala2V = "2.13.8"

lazy val root = project
  .in(file("."))
  .settings(
    scalaVersion := scala3V,
    scalacOptions   = Seq(
      "-language:implicitConversions",
      "-deprecation"
    ),
    libraryDependencies   = Seq(
      excludes(sparkCore.cross(CrossVersion.for3Use2_13)),
      excludes(sparkSql.cross(CrossVersion.for3Use2_13)),
      excludes("io.github.vincenzobaz" %% "spark-scala3" % "0.1.3"),
      "org.scalameta" %% "munit" % "0.7.26" % Test,
      scalaOrganization.value % "scala-reflect" % scala2V,
      scalaOrganization.value % "scala-compiler" % scala2V,
    ),
  )

def excludes(m: ModuleID): ModuleID =
  m.exclude("io.netty", "netty-common").
    exclude("io.netty", "netty-handler").
    exclude("io.netty", "netty-transport").
    exclude("io.netty", "netty-buffer").
    exclude("io.netty", "netty-codec").
    exclude("io.netty", "netty-resolver").
    exclude("io.netty", "netty-transport-native-epoll").
    exclude("io.netty", "netty-transport-native-unix-common").
    exclude("javax.xml.bind", "jaxb-api").
    exclude("jakarta.xml.bind", "jaxb-api").
    exclude("javax.activation", "activation").
    exclude("jakarta.annotation", "jakarta.annotation-api").
    exclude("javax.annotation", "javax.annotation-api")

// Without forking, ctrl-c doesn't actually fully stop Spark
run / fork := true
Test / fork := true
  • Related