Home > Mobile >  Why does a spark RDD behave differently depending on contents?
Why does a spark RDD behave differently depending on contents?

Time:06-17

Based on this description of datasets and dataframes I wrote this very short test code which works.

import org.apache.spark.sql.functions._
val thing = Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")
val wordsDataset = sc.parallelize(thing).toDS()

If that works... why does running this give me a

error: value toDS is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.catalog.Table]

import org.apache.spark.sql.functions._
val sequence = spark.catalog.listDatabases().collect().flatMap(db =>
  spark.catalog.listTables(db.name).collect()).toSeq
val result = sc.parallelize(sequence).toDS()

CodePudding user response:

toDS() is not a member of RRD[T]. Welcome to the bizarre world of Scala implicits where nothing is what it seems to be.

toDS() is a member of DatasetHolder[T]. In SparkSession, there is an object called implicits. When brought in scope with an expression like import sc.implicits._, an implicit method called rddToDatasetHolder becomes available for resolution:

implicit def rddToDatasetHolder[T](rdd: RDD[T])(implicit arg0: Encoder[T]): DatasetHolder[T]

When you call rdd.toDS(), the compiler first searches the RDD class and all of its superclasses for a method called toDS(). It doesn't find one so what it does is start searching all the compatible implicits in scope. While doing so, it finds the rddToDatasetHolder method which accepts an RDD instance and returns an object of a type which does have a toDS() method. Basically, the compiler rewrites:

sc.parallelize(sequence).toDS()

into

SparkSession.implicits.rddToDatasetHolder(sc.parallelize(sequence)).toDS()

Now, if you look at rddToDatasetHolder itself, it has two argument lists:

(rdd: RDD[T])
(implicit arg0: Encoder[T])

Implicit arguments in Scala are optional and if you do not supply the argument explicitly, the compiler searches the scope for implicits that match the required argument type and passes whatever object it finds or can construct. In this particular case, it looks for an instance of the Encoder[T] type. There are many predefined encoders for the standard Scala types, but for most complex custom types no predefined encoders exist.

So, in short: The existence of a predefined Encoder[String] makes it possible to call toDS() on an instance of RDD[String], but the absence of a predefined Encoder[org.apache.spark.sql.catalog.Table] makes it impossible to call toDS() on an instance of RDD[org.apache.spark.sql.catalog.Table].

By the way, SparkSession.implicits contains the implicit class StringToColumn which has a $ method. This is how the $"foo" expression gets converted to a Column instance for column foo.

Resolving all the implicit arguments and implicit transformations is why compiling Scala code is so dang slow.

  • Related