Is there any stable method on the SparkSession/SparkContext/RDD that we can call to easily detect when eviction is happening?
For more context see Disable new Spark behaviour of evicting cached partitions when insufficient memory or When was automatic Spark RDD partition cache eviction implemented?
CodePudding user response:
You can retrieve RddInfo
array from SparkContext, and interrogate its elements for the partition counts of an RDD you're interested in. If some of the partitions were evicted/didnt fit into executor storage, the number numCachedPartitions
will be less than total number of RDD's partitions numPartitions
.
scala> val rdd = sc.textFile("file:///etc/spark/conf/spark-defaults.conf").repartition(10)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at repartition at <console>:27
scala> rdd.persist().count()
res14: Long = 34
scala> val rddStorage = rdd.context.getRDDStorageInfo(0)
rddStorage: org.apache.spark.storage.RDDInfo = RDD "MapPartitionsRDD" (9) StorageLevel: StorageLevel(memory, deserialized, 1 replicas); CachedPartitions: 10; TotalPartitions: 10; MemorySize: 5.1 KB; DiskSize: 0.0 B
scala> val fullyCached = (rddStorage.numCachedPartitions == rddStorage.numPartitions)
fullyCached: Boolean = true
Zero in the above, ...getRDDStorageInfo(0)
, is used for illustration purposes only. In reality, instead of simply using 0
, you'd need to get the id
of an RDD you're interested in (see RDD.id
), and then iterate through the RDDInfo[]
array to find the element with rddInfo.id = id
. You can probably also use rddInfo.name
to do the same thing if you give the RDD a name.
Finally, you could just detect if any RDD has eviction with something like this:
sparkSession
.sparkContext.getRDDStorageInfo.filter(_.isCached)
.find(rdd => rdd.numCachedPartitions < rdd.numPartitions)
.foreach(rdd =>
throw new IllegalArgumentException(s"RDD is being evicted, please configure cluster with more memory. "
s"numCachedPartitions = ${rdd.numCachedPartitions}, "
s"numPartitions = ${rdd.numPartitions}, "
s"name = ${rdd.name}, "
s"id = ${rdd.id}, "
s"memSize = ${rdd.memSize}, "
s"diskSize = ${rdd.diskSize}, "
s"externalBlockStoreSize = ${rdd.externalBlockStoreSize}"
))