I have a dataframe with 100 million rows and ~ 10,000 columns. The columns are of two types, standard (C_i) followed by dynamic (X_i). This dataframe was obtained after some processing, and the performance was fast. Now only 2 steps remain:
Goal:
- A particular operation needs to be done on every X_i using identical subset of C_i columns.
- Convert each of X-i column into
FloatType
.
Difficulty:
- Performance degrades terribly with increasing number of columns.
- After a while, only 1 executor seems to work (%CPU use < 200%), even on a sample data with 100 rows and 1,000 columns. If I push it to 1,500 columns, it crashes.
Minimal code:
import spark.implicits._
import org.apache.spark.sql.types.FloatType
// sample_udf
val foo = (s_val: String, t_val: String) => {
t_val s_val.takeRight(1)
}
val foos_udf = udf(foo)
spark.udf.register("foos_udf", foo)
val columns = Seq("C1", "C2", "X1", "X2", "X3", "X4")
val data = Seq(("abc", "212", "1", "2", "3", "4"),("def", "436", "2", "2", "1", "8"),("abc", "510", "1", "2", "5", "8"))
val rdd = spark.sparkContext.parallelize(data)
var df = spark.createDataFrame(rdd).toDF(columns:_*)
df.show()
for (cols <- df.columns.drop(2)) {
df = df.withColumn(cols, foos_udf(col("C2"),col(cols)))
}
df.show()
for (cols <- df.columns.drop(2)) {
df = df.withColumn(cols,col(cols).cast(FloatType))
}
df.show()
Error on 1,500 column data:
Exception in thread "main" java.lang.StackOverflowError
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.isStreaming(LogicalPlan.scala:37)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$isStreaming$1.apply(LogicalPlan.scala:37)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$isStreaming$1.apply(LogicalPlan.scala:37)
at scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
at scala.collection.immutable.List.exists(List.scala:84)
...
Thoughts:
- Perhaps
var
could be replaced, but the size of the data is close to 40% of the RAM. - Perhaps
for
loop fordtype
casting could be causing degradation of performance, though I can't see how, and what are the alternatives. From searching on internet, I have seen people suggestingfoldLeft
based approach, but that apparently still gets translated tofor
loop internally.
Any inputs on this would be greatly appreciated.
CodePudding user response:
not sure if this will fix the performance on your side with 10000~ columns, but I was able to run it locally with 1500 using the following code.
I addressed points #1 and #2, which may have had some impact on performance. One note, to my understanding foldLeft
should be a pure recursive function without an internal for loop, so it might have an impact on performance in this case.
Also, the two for loops can be simplified into a single for loop that I refactored as foldLeft
.
We might also get a performance increase if we replace the udf with a spark function.
import spark.implicits._
import org.apache.spark.sql.types.FloatType
import org.apache.spark.sql.functions._
// sample_udf
val foo = (s_val: String, t_val: String) => {
t_val s_val.takeRight(1)
}
val foos_udf = udf(foo)
spark.udf.register("foos_udf", foo)
val numberOfColumns = 1500
val numberOfRows = 100
val colNames = (1 to numberOfColumns).map(s => s"X$s")
val colValues = (1 to numberOfColumns).map(_.toString)
val columns = Seq("C1", "C2") colNames
val schema = StructType(columns.map(field => StructField(field, StringType)))
val rowFields = Seq("abc", "212") colValues
val listOfRows = (1 to numberOfRows).map(_ => Row(rowFields: _*))
val listOfRdds = spark.sparkContext.parallelize(listOfRows)
val df = spark.createDataFrame(listOfRdds, schema)
df.show()
val newDf = df.columns.drop(2).foldLeft(df)((df, colName) => {
df.withColumn(colName, foos_udf(col("C2"), col(colName)) cast FloatType)
})
newDf.show()
Hope this helps!
*** EDIT
Found a way better solution that circumvents loops. Simply make a single expression with SelectExpr
, this way sparks casts all columns in one go without any kind of recursion. From my previous example:
instead of doing fold left, just replace it with these lines. I just tested it with 10k columns 100 rows in my local computer, lasted a few seconds
val selectExpression = Seq("C1", "C2") colNames.map(s => s"cast($s as float)")
val newDf = df.selectExpr(selectExpression:_*)
CodePudding user response:
A faster solution was to call UDF on row itself rather than calling on each column. As Spark stores data as rows, the earlier approach was exhibiting terrible performance.
def my_udf(names: Array[String]) = udf[String,Row]((r: Row) => {
val row = Array.ofDim[String](names.length)
for (i <- 0 until row.length) {
row(i) = r.getAs(i)
}
...
}
...
val df2 = df1.withColumn(r_col,my_udf(df1.columns)(struct("*"))).select(col(results_col))
Type casting can be done as suggested by Riccardo