I am working with Intellij Idea, in a Scala worksheet. I want to create a encoder for a scala case class. From various posts on internet I found the suggestion to use Encoders.product. But it never worked for me.
The following code
import org.apache.spark.sql.*
val spark: SparkSession =
SparkSession
.builder()
.appName("test")
.master("local")
.getOrCreate()
import scala3encoders.given
case class classa(i: Int, j: Int, s: String)
val enc = Encoders.product[classa]
keep throwing error:
-- Error: ----------------------------------------------------------------------
1 |val enc = Encoders.product[classa]
| ^
| No TypeTag available for classa
1 error found
Does anyone know what's going on there?
The content of build.sbt file is:
scalaVersion := "3.1.3"
scalacOptions = Seq("-language:implicitConversions", "-deprecation")
libraryDependencies = Seq(
excludes(("org.apache.spark" %% "spark-core" % "3.2.0").cross(CrossVersion.for3Use2_13)),
excludes(("org.apache.spark" %% "spark-sql" % "3.2.0").cross(CrossVersion.for3Use2_13)),
excludes("io.github.vincenzobaz" %% "spark-scala3" % "0.1.3"),
"org.scalameta" %% "munit" % "0.7.26" % Test
)
//netty-all replaces all these excludes
def excludes(m: ModuleID): ModuleID =
m.exclude("io.netty", "netty-common").
exclude("io.netty", "netty-handler").
exclude("io.netty", "netty-transport").
exclude("io.netty", "netty-buffer").
exclude("io.netty", "netty-codec").
exclude("io.netty", "netty-resolver").
exclude("io.netty", "netty-transport-native-epoll").
exclude("io.netty", "netty-transport-native-unix-common").
exclude("javax.xml.bind", "jaxb-api").
exclude("jakarta.xml.bind", "jaxb-api").
exclude("javax.activation", "activation").
exclude("jakarta.annotation", "jakarta.annotation-api").
exclude("javax.annotation", "javax.annotation-api")
// Without forking, ctrl-c doesn't actually fully stop Spark
run / fork := true
Test / fork := true
CodePudding user response:
Encoders.product[classa]
is a Scala 2 thing. This method accepts an implicit TypeTag
. There are no TypeTag
s in Scala 3. In Scala 3 the library maintainers propose to work in the following way:
https://github.com/vincenzobaz/spark-scala3/blob/main/examples/src/main/scala/sql/StarWars.scala
package sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Dataset, DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
object StarWars extends App:
val spark = SparkSession.builder().master("local").getOrCreate
import spark.implicits.localSeqToDatasetHolder
import scala3encoders.given
extension [T: Encoder] (seq: Seq[T])
def toDS: Dataset[T] =
localSeqToDatasetHolder(seq).toDS
case class Friends(name: String, friends: String)
val df: Dataset[Friends] = Seq(
("Yoda", "Obi-Wan Kenobi"),
("Anakin Skywalker", "Sheev Palpatine"),
("Luke Skywalker", "Han Solo, Leia Skywalker"),
("Leia Skywalker", "Obi-Wan Kenobi"),
("Sheev Palpatine", "Anakin Skywalker"),
("Han Solo", "Leia Skywalker, Luke Skywalker, Obi-Wan Kenobi, Chewbacca"),
("Obi-Wan Kenobi", "Yoda, Qui-Gon Jinn"),
("R2-D2", "C-3PO"),
("C-3PO", "R2-D2"),
("Darth Maul", "Sheev Palpatine"),
("Chewbacca", "Han Solo"),
("Lando Calrissian", "Han Solo"),
("Jabba", "Boba Fett")
).toDS.map((n,f) => Friends(n, f))
val friends = df.as[Friends]
friends.show()
case class FriendsMissing(who: String, friends: Option[String])
val dsMissing: Dataset[FriendsMissing] = Seq(
("Yoda", Some("Obi-Wan Kenobi")),
("Anakin Skywalker", Some("Sheev Palpatine")),
("Luke Skywalker", Option.empty[String]),
("Leia Skywalker", Some("Obi-Wan Kenobi")),
("Sheev Palpatine", Some("Anakin Skywalker")),
("Han Solo", Some("Leia Skywalker, Luke Skywalker, Obi-Wan Kenobi"))
).toDS
.map((a, b) => FriendsMissing(a, b))
dsMissing.show()
case class Character(
name: String,
height: Int,
weight: Option[Int],
eyecolor: Option[String],
haircolor: Option[String],
jedi: String,
species: String
)
val characters: Dataset[Character] = spark.sqlContext
.read
.option("header", "true")
.option("delimiter", ";")
.option("inferSchema", "true")
.csv("StarWars.csv")
.as[Character]
characters.show()
val sw_df = characters.join(friends, Seq("name"))
sw_df.show()
case class SW(
name: String,
height: Int,
weight: Option[Int],
eyecolor: Option[String],
haircolor: Option[String],
jedi: String,
species: String,
friends: String
)
val sw_ds = sw_df.as[SW]
So if you really need Encoders.product[classa]
compile this part of your code with Scala 2
src/App.scala
// this is Scala 3
object App {
def main(args: Array[String]): Unit = {
println(App1.schema)
// Seq(StructField(i,IntegerType,false), StructField(j,IntegerType,false), StructField(s,StringType,true))
}
}
scala2/src/main/scala/App1.scala
import org.apache.spark.sql._
// this is Scala 2
object App1 {
val schema = Encoders.product[classa].schema
}
common/src/main/scala/classa.scala
case class classa(i: Int, j: Int, s: String)
build.sbt
lazy val sparkCore = "org.apache.spark" %% "spark-core" % "3.2.0"
lazy val sparkSql = "org.apache.spark" %% "spark-sql" % "3.2.0"
lazy val scala3V = "3.1.3"
lazy val scala2V = "2.13.8"
lazy val root = project
.in(file("."))
.settings(
scalaVersion := scala3V,
scalacOptions = Seq("-language:implicitConversions", "-deprecation"),
libraryDependencies = Seq(
excludes(sparkCore.cross(CrossVersion.for3Use2_13)),
excludes(sparkSql.cross(CrossVersion.for3Use2_13)),
excludes("io.github.vincenzobaz" %% "spark-scala3" % "0.1.3"),
"org.scalameta" %% "munit" % "0.7.26" % Test
)
)
.dependsOn(scala2, common)
lazy val scala2 = project
.settings(
scalaVersion := scala2V,
libraryDependencies = Seq(
sparkCore,
sparkSql
)
)
.dependsOn(common)
lazy val common = project
.settings(
scalaVersion := scala3V,
crossScalaVersions := Seq(scala2V, scala3V)
)
//netty-all replaces all these excludes
def excludes(m: ModuleID): ModuleID =
m.exclude("io.netty", "netty-common").
exclude("io.netty", "netty-handler").
exclude("io.netty", "netty-transport").
exclude("io.netty", "netty-buffer").
exclude("io.netty", "netty-codec").
exclude("io.netty", "netty-resolver").
exclude("io.netty", "netty-transport-native-epoll").
exclude("io.netty", "netty-transport-native-unix-common").
exclude("javax.xml.bind", "jaxb-api").
exclude("jakarta.xml.bind", "jaxb-api").
exclude("javax.activation", "activation").
exclude("jakarta.annotation", "jakarta.annotation-api").
exclude("javax.annotation", "javax.annotation-api")
// Without forking, ctrl-c doesn't actually fully stop Spark
run / fork := true
Test / fork := true
Update. Alternatively, in Scala 3 you can calculate TypeTag
with Scala 2 runtime compilation (reflective Toolbox): How to compile and execute scala code at run-time in Scala3?
Scala 2 macros don't work, so we can't do runtime.currentMirror
or q"..."
but can do universe.runtimeMirror
, tb.parse
. It turns out this still works in Scala 3.
// this is Scala 3
import org.apache.spark.sql.*
import scala.tools.reflect.ToolBox
import scala.reflect.runtime.universe
import scala.reflect.runtime.universe.*
import mypackage.classa
val rm = universe.runtimeMirror(getClass.getClassLoader)
val tb = rm.mkToolBox()
val typeTag = tb.eval(tb.parse(
"scala.reflect.runtime.universe.typeTag[mypackage.classa]"
)).asInstanceOf[TypeTag[classa]]
Encoders.product[classa](typeTag).schema
// Seq(StructField(i,IntegerType,false), StructField(j,IntegerType,false), StructField(s,StringType,true))
build.sbt
lazy val sparkCore = "org.apache.spark" %% "spark-core" % "3.2.0"
lazy val sparkSql = "org.apache.spark" %% "spark-sql" % "3.2.0"
lazy val scala3V = "3.1.3"
lazy val scala2V = "2.13.8"
lazy val root = project
.in(file("."))
.settings(
scalaVersion := scala3V,
scalacOptions = Seq(
"-language:implicitConversions",
"-deprecation"
),
libraryDependencies = Seq(
excludes(sparkCore.cross(CrossVersion.for3Use2_13)),
excludes(sparkSql.cross(CrossVersion.for3Use2_13)),
excludes("io.github.vincenzobaz" %% "spark-scala3" % "0.1.3"),
"org.scalameta" %% "munit" % "0.7.26" % Test,
scalaOrganization.value % "scala-reflect" % scala2V,
scalaOrganization.value % "scala-compiler" % scala2V,
),
)
def excludes(m: ModuleID): ModuleID =
m.exclude("io.netty", "netty-common").
exclude("io.netty", "netty-handler").
exclude("io.netty", "netty-transport").
exclude("io.netty", "netty-buffer").
exclude("io.netty", "netty-codec").
exclude("io.netty", "netty-resolver").
exclude("io.netty", "netty-transport-native-epoll").
exclude("io.netty", "netty-transport-native-unix-common").
exclude("javax.xml.bind", "jaxb-api").
exclude("jakarta.xml.bind", "jaxb-api").
exclude("javax.activation", "activation").
exclude("jakarta.annotation", "jakarta.annotation-api").
exclude("javax.annotation", "javax.annotation-api")
// Without forking, ctrl-c doesn't actually fully stop Spark
run / fork := true
Test / fork := true