Home > Enterprise >  spark scala: Performance degrade with simple UDF over large number of columns
spark scala: Performance degrade with simple UDF over large number of columns

Time:07-31

I have a dataframe with 100 million rows and ~ 10,000 columns. The columns are of two types, standard (C_i) followed by dynamic (X_i). This dataframe was obtained after some processing, and the performance was fast. Now only 2 steps remain:

Goal:

  1. A particular operation needs to be done on every X_i using identical subset of C_i columns.
  2. Convert each of X-i column into FloatType.

Difficulty:

  1. Performance degrades terribly with increasing number of columns.
  2. After a while, only 1 executor seems to work (%CPU use < 200%), even on a sample data with 100 rows and 1,000 columns. If I push it to 1,500 columns, it crashes.

Minimal code:

import spark.implicits._
import org.apache.spark.sql.types.FloatType

// sample_udf
val foo = (s_val: String, t_val: String) => {
    t_val   s_val.takeRight(1)
}
val foos_udf = udf(foo)
spark.udf.register("foos_udf", foo)

val columns = Seq("C1", "C2", "X1", "X2", "X3", "X4")
val data = Seq(("abc", "212", "1", "2", "3", "4"),("def", "436", "2", "2", "1", "8"),("abc", "510", "1", "2", "5", "8"))

val rdd = spark.sparkContext.parallelize(data)
var df = spark.createDataFrame(rdd).toDF(columns:_*)
df.show()

for (cols <- df.columns.drop(2)) {
    df = df.withColumn(cols, foos_udf(col("C2"),col(cols)))
}
df.show()

for (cols <- df.columns.drop(2)) {
    df = df.withColumn(cols,col(cols).cast(FloatType))
}
df.show()

Error on 1,500 column data:

Exception in thread "main" java.lang.StackOverflowError
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.isStreaming(LogicalPlan.scala:37)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$isStreaming$1.apply(LogicalPlan.scala:37)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$isStreaming$1.apply(LogicalPlan.scala:37)
    at scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
    at scala.collection.immutable.List.exists(List.scala:84)
...

Thoughts:

  1. Perhaps var could be replaced, but the size of the data is close to 40% of the RAM.
  2. Perhaps for loop for dtype casting could be causing degradation of performance, though I can't see how, and what are the alternatives. From searching on internet, I have seen people suggesting foldLeft based approach, but that apparently still gets translated to for loop internally.

Any inputs on this would be greatly appreciated.

CodePudding user response:

not sure if this will fix the performance on your side with 10000~ columns, but I was able to run it locally with 1500 using the following code.

I addressed points #1 and #2, which may have had some impact on performance. One note, to my understanding foldLeft should be a pure recursive function without an internal for loop, so it might have an impact on performance in this case.

Also, the two for loops can be simplified into a single for loop that I refactored as foldLeft.

We might also get a performance increase if we replace the udf with a spark function.

  import spark.implicits._
  import org.apache.spark.sql.types.FloatType
  import org.apache.spark.sql.functions._

  // sample_udf
  val foo = (s_val: String, t_val: String) => {
    t_val   s_val.takeRight(1)
  }
  val foos_udf = udf(foo)
  spark.udf.register("foos_udf", foo)

  val numberOfColumns = 1500
  val numberOfRows = 100


  val colNames = (1 to numberOfColumns).map(s => s"X$s")
  val colValues = (1 to numberOfColumns).map(_.toString)

  val columns = Seq("C1", "C2")    colNames
  val schema = StructType(columns.map(field => StructField(field, StringType)))

  val rowFields = Seq("abc", "212")    colValues
  val listOfRows = (1 to numberOfRows).map(_ => Row(rowFields: _*))
  val listOfRdds = spark.sparkContext.parallelize(listOfRows)
  val df = spark.createDataFrame(listOfRdds, schema)

  df.show()

  val newDf = df.columns.drop(2).foldLeft(df)((df, colName) => {
    df.withColumn(colName, foos_udf(col("C2"), col(colName)) cast FloatType)
  })

  newDf.show()

Hope this helps!

*** EDIT

Found a way better solution that circumvents loops. Simply make a single expression with SelectExpr, this way sparks casts all columns in one go without any kind of recursion. From my previous example:

instead of doing fold left, just replace it with these lines. I just tested it with 10k columns 100 rows in my local computer, lasted a few seconds

  val selectExpression = Seq("C1", "C2")    colNames.map(s => s"cast($s as float)")
  val newDf = df.selectExpr(selectExpression:_*)

CodePudding user response:

A faster solution was to call UDF on row itself rather than calling on each column. As Spark stores data as rows, the earlier approach was exhibiting terrible performance.

def my_udf(names: Array[String]) = udf[String,Row]((r: Row) => {
    val row = Array.ofDim[String](names.length)
    for (i <- 0 until row.length) {
            row(i) = r.getAs(i)
    }
    ...
}
...
val df2 = df1.withColumn(r_col,my_udf(df1.columns)(struct("*"))).select(col(results_col))

Type casting can be done as suggested by Riccardo

  • Related