This is a follow up of another unresolved problem of a rather straightforward task of updating a large number of columns (~10,000) based on information from the first few columns. The number of rows is about 100M, data size being in the range of terabytes, ruling out collect
method. Column-by-column approach using UDF failed in case of a few hundred columns (cannot do for 1500). Further, it appears that most of the time was spent by Spark using about 100-200% CPU (unlike other operations that use tens of cores). This makes me believe that the Spark dataframe
was suffering from low cache-hit ratio, probably due to memory thrashing.
I then tried to solve using row-by-row approach as Spark keeps rows intact within partition. The most straightforward way of solving perhaps would be to use case class
and have methods that process efficiently within a row and return updated row as a tuple that can be converted to dataframe exploiting the fact that all columns that need modification use the same UDF. However, due to their large number, columns cannot be named in the definition of case class
. So I tried as follows.
scala> case class Gt(id: String*)
scala> val test = Gt("a", "b", "c")
test: Gt = Gt(WrappedArray(a, b, c))
scala> val columns = Seq("C1", "C2", "X1", "X2", "X3", "X4")
columns: Seq[String] = List(C1, C2, X1, X2, X3, X4)
scala> val data = Seq(("abc", "212", "1", "2", "3", "4"),("def", "436", "2", "2", "1", "8"),("abc", "510", "1", "2", "5", "8"))
data: Seq[(String, String, String, String, String, String)] = List((abc,212,1,2,3,4), (def,436,2,2,1,8), (abc,510,1,2,5,8))
scala> val rdd = spark.sparkContext.parallelize(data)
rdd: org.apache.spark.rdd.RDD[(String, String, String, String, String, String)] = ParallelCollectionRDD[220] at parallelize at <console>:34
scala> var df = spark.createDataFrame(rdd).toDF(columns:_*)
df: org.apache.spark.sql.DataFrame = [C1: string, C2: string ... 4 more fields]
scala> var ds = spark.createDataFrame(rdd).toDF(columns:_*).as[Gt]
java.lang.ClassNotFoundException: scala.<repeated>
at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
Gt
works fine if the parameters are defined, not otherwise as seen.
I would be thankful if anyone can suggest a path to update the dataframe row-by-row. Given the complex computation, direct expressions are not possible in the current case.
CodePudding user response:
I finally found two approaches to the problem.
- Process the entire row at once and then apply UDF, and
- Merge all columns into one array and use UDF to process.
While both worked, (2) was faster as it did not have overhead of conversion, PLUS, Spark in any case works on Row-wise data (so no additional benefit from cache locality).