Home > Mobile >  Understanding Total Size of Serialized results in Spark
Understanding Total Size of Serialized results in Spark

Time:12-24

I am doing a pretty simple operation on a dataframe of a huge Delta table hosted in Databricks. The problem I am facing is , after running for few hours the code fails with an error saying that "Job aborted due to stage failure: Total size of serialized results of 59014 tasks (4.0 GiB) is bigger than spark.driver.maxResultSize 4.0 GiB".

The task I am performing is, I read the delta table in a dataframe, add a new column for creating buckets (20 buckets), and save the table in overwrite mode adding the bucket as another partition (there are already 3 partitions, this new column will be the 4th partition). So I am not doing anything in the application code which would cause huge amount of data back into the driver. Given below is the sample code

    bucket_number = 20
    inputDataframe = spark.read.table("huge_delta_table")
    inputDataframe = inputDataframe.withColumn("bucket_key", (translate( substring(col("some_column"), 0, 16), "abcdefghijklmnopqrstuvwxyz", "01234567890123456789012345").cast(LongType()) %buckets_number) 1)
    inputDatafrme.write.format("delta")
    input.write.format("delta").mode("overwrite").option("overwriteSchema", "true").partitionBy("existing_partition_column1","existing_partition_column2","existing_partition_column3","bucket_key") \
    .saveAsTable("huge_delta_table")

I wanted to know, is it because of huge number of tasks that spark's internal results metadata is becoming huge (when communicated back to driver for coordination purposes)?

CodePudding user response:

As this SO post's answer discusses, when you get to a large number of tasks (you have 59k) you can get into this issue because each task will send some data to the driver. To quote that answer:

Spark will try to send data back the driver beyond just when you explicitly call collect. It will also send back accumulator results for each task if you are using accumulators, data for broadcast joins, and some small status data about each task. If you have LOTS of partitions (20k in my experience) you can sometimes see this error

We can have a look at the source code to see where this error is triggered. The error is triggered on this line in TaskSetManager.scala's canFetchMoreResults method.

This canFetchMoreResults method is luckily only called in 1 place: in enqueueSuccessfulTask of TaskResultGetter that runs on the driver. Without pasting the whole function's contents in here, we can find an interesting piece of info:

// Set the task result size in the accumulator updates received from the executors.
// We need to do this here on the driver because if we did this on the executors then
// we would have to serialize the result again after updating the size.
result.accumUpdates = result.accumUpdates.map { a =>
  if (a.name == Some(InternalAccumulator.RESULT_SIZE)) {
    val acc = a.asInstanceOf[LongAccumulator]
    assert(acc.sum == 0L, "task result size should not have been set on the executors")
    acc.setValue(size.toLong)
    acc
  } else {
    a
  }
}

scheduler.handleSuccessfulTask(taskSetManager, tid, result)

So as you can see, for each task we're sending some data to the driver.

Your problem is that you have too many tasks for your spark.driver.maxResultSize setting. What is the size of your partitions? Typically, a partition size of around 100MB is a healthy size. Try out the following:

  • If your partitions are much smaller than 100MB, make them bigger so you have less of them. Maybe you can try to make spark.files.maxPartitionBytes parameter a bit bigger (for example, twice the default size) because it does not seem like you're doing any crazy calculations with them.
  • If you can't play with your task/partition size, you'll have to increase spark.driver.maxResultSize to larger than what you have or set it to 0 for an unlimited size. Be aware that you'll possibly need to increase your driver's memory to avoid OOM errors.

Hope this helps!

  • Related