Home > database >  Disable new Spark behaviour of evicting cached partitions when insufficient memory
Disable new Spark behaviour of evicting cached partitions when insufficient memory

Time:10-11

In the past Spark jobs would give OOM if an RDD could not fit into memory (when trying to cache with MEMORY_ONLY). These days it seems Spark jobs will evict partitions from the cache and recompute them from scratch.

We have some jobs that cache an RDD then traverse it 300 times. In order to know when we need to increase the memory on our cluster, we need to know when it has run out of memory.

The new behaviour of Spark makes this difficult ... rather than the job OOMing (like in the past), the job instead just takes forever (our surrounding logic eventually times out the job). Diagnosing why the job failed becomes difficult because it's not immediately obvious from the logs that the job has run out of memory (since no OOM is thrown). One can find "evicted" log lines.

As a bit of a hack, we are using accumulators with mapPartitionsWithIndex and updating a count of the number of times each partition is traversed, then we forcably blow up the job when this count is 2 or more (indicating an evicition). This hack doesn't work very well as it seems to give false positives (RCA not yet understood, it doesn't seem to be speculative execution, nor are tasks failing (egrep "task .* in stage [0-9] \.1" | wc -l gives 0).

Question 1: Is there a way to disable this new behaviour of Spark and make it behave like it used to (i.e. just blow up with OOM) - I've looked in the Spark configuration and cannot find anything like "disable-eviction".

Question 2: Is there any method on the SparkSession or SparkContext that we can call to easily detect when eviction is happening?

If not, then for our use case this is an effective regression - we need a way to make Spark behave predictably, or at least a way to determine automatically when Spark is running slowly due to lack of memory.

See also How does Spark evict cached partitions?

https://issues.apache.org/jira/browse/SPARK-36966

CodePudding user response:

There could be many causes for your job behaviour to change, and taking more time:

What can cause the change in cache eviction behaviour?

  1. The input data for the job
  2. Your application code
  3. The resources used to run the job
  4. Spark logic of cache eviction

As the issue you have opened is new, and there is no acknowledgement from Spark developers, there is no reason to assume its Spark version 2.4.4, unless you provide a reproduce script, which passes consistently on previous version and doesn't on version 2.4.4. You did not mention if you are using resource manager or how you configure it. You also didn't mention if there are any intermediate failures which are being retried.

What control do you have over the job runtime, failures and cache eviction?

  • You can limit the number of retries. For example, when using Yarn - spark.yarn.maxAppAttempts=1
  • You can limit the number of internal failures and the waiting between retries (e.g. spark.shuffle.io.maxRetries, spark.shuffle.io.retryWait)
  • Tweak memory configuration, especially - spark.yarn.executor.memoryOverhead and spark.memory.fraction. From spark configuration:
  • You mentioned using RDDs. Datasets has much better serialization, which consumes less space, and impacts the amount of memory required.

The lower this is, the more frequently spills and cached data eviction occur.

CodePudding user response:

(In continuation to my comment to the question)

If you're planning to do it internally, inside driver process, perhaps you can retrieve RddInfo array from SparkContext, and interrogate it for the partition counts of an RDD you're interested in. FractionCached is just a percentage of cached vs total number. For example, in Scala:

scala> val rdd = sc.textFile("file:///etc/spark/conf/spark-defaults.conf").repartition(10)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at repartition at <console>:27

scala> rdd.persist().count()
res14: Long = 34

scala> val rddStorage = rdd.context.getRDDStorageInfo(0)
rddStorage: org.apache.spark.storage.RDDInfo = RDD "MapPartitionsRDD" (9) StorageLevel: StorageLevel(memory, deserialized, 1 replicas); CachedPartitions: 10; TotalPartitions: 10; MemorySize: 5.1 KB; DiskSize: 0.0 B

scala> val fractionCached = rddStorage.numCachedPartitions * 100 / rddStorage.numPartitions
fractionCached: Int = 100

Otherwise, if you plan to monitor execution via some external process, I guess the easiest would be to use Spark's REST API Storage endpoint. For every persisted RDD in a running application, it will provide summary and detailed memory distribution info. For the previous example RDD and app running on YARN:

# curl http://localhost:4040/api/v1/applications/application_1633847250859_1230003/1/storage/rdd
[ {
  "id" : 9,
  "name" : "MapPartitionsRDD",
  "numPartitions" : 10,
  "numCachedPartitions" : 10,
  "storageLevel" : "Memory Deserialized 1x Replicated",
  "memoryUsed" : 5272,
  "diskUsed" : 0,
  "dataDistribution" : [ {
    "address" : "cluster-node-1:39259",
    "memoryUsed" : 2376,
    "memoryRemaining" : 383991372,
    "diskUsed" : 0,
    "onHeapMemoryUsed" : 2376,
    "offHeapMemoryUsed" : 0,
    "onHeapMemoryRemaining" : 383991372,
    "offHeapMemoryRemaining" : 0
  }, {
    "address" : "cluster-node-2:37383",
    "memoryUsed" : 2896,
    "memoryRemaining" : 383990852,
    "diskUsed" : 0,
    "onHeapMemoryUsed" : 2896,
    "offHeapMemoryUsed" : 0,
    "onHeapMemoryRemaining" : 383990852,
    "offHeapMemoryRemaining" : 0
  } ],
  "partitions" : [ {
    "blockName" : "rdd_9_1",
    "storageLevel" : "Memory Deserialized 1x Replicated",
    "memoryUsed" : 624,
    "diskUsed" : 0,
    "executors" : [ "1" ]
  }, {
    "blockName" : "rdd_9_0",
    "storageLevel" : "Memory Deserialized 1x Replicated",
    "memoryUsed" : 416,
    "diskUsed" : 0,
    "executors" : [ "2" ]
  }, { 
  ... etc ...
  ... etc ...
  } ]
} ]
#

In the latter case, be ready to do some JSON parsing :)

  • Related