Lets assume I have a spark application loading a partition into dataframe
:
users/sales/2021/10/20
My cluster resources configured to be able to read X records in this partition. But I don't control the number of records.
Is there a way to do limit/pagination on the records inside the partition ?
So for example if Suddenly I have 2X data in the partition while my application expected to handle 1X at a time, I will read 1X then one more 1X
CodePudding user response:
This question is in fact why performance tuning is an art and not a science. Art refers to a masterpiece that is created. It's a point in time piece of art and unique to the artist current context.
You can request that the data provided partition/file size so that hopefully it grows the number of partitions not the partitions themselves. That's isn't always possible but you can ask. (If you are using HDFS it's ideal if the partition size matches the HDFS block size.) This can help for sure but it's better to solve the problem, and then make art/performance tune. I wouldn't pre-tune for a problem that hasn't happened yet. (I'm not saying you are doing this but encouraging that you cross that bridge if/when it happens not before.)
CodePudding user response:
It would be better if you play with maxPartitionBytes
parameter of Spark.
It's value by default is 128 MB i.e. spark will try to create partitions of 128 MB each. In case you start getting 2X data in source partition you should tune this parameter to create more partitions and based on available cores spark will process these partitions in subsequent iterations.
The below code can help to auto tune this parameter. It will try to optimally utilize the cluster resources in terms of partitions and cores.
def autoTuneMaxPartitionBytes(format:String, path:String, schema:String, maxSteps:Int, startingBytes:Long=134217728):Long = {
var cores = sc.defaultParallelism
var maxPartitionBytes:Long = startingBytes
val originalMaxPartitionBytes = spark.conf.get("spark.sql.files.maxPartitionBytes")
for (step <- 0 to maxSteps) {
maxPartitionBytes = maxPartitionBytes (step * 1024 * 1024)
val maxPartitionMB = maxPartitionBytes / 1024 / 1024
spark.conf.set("spark.sql.files.maxPartitionBytes", f"${maxPartitionBytes}b")
val partitions = spark.read.format(format).schema(schema).load(path).rdd.getNumPartitions
if (partitions % cores == 0) {
println("*** Found it! ***")
println(f"$maxPartitionMB%,d MB with $partitions%,d partitions, iterations: ${partitions/cores.toDouble}")
return maxPartitionBytes
} else {
println(f"$maxPartitionMB%,d MB with $partitions%,d partitions, iterations: ${partitions/cores.toDouble}")
}
}
spark.conf.set("spark.sql.files.maxPartitionBytes", originalMaxPartitionBytes)
throw new IllegalArgumentException("An appropriate maxPartitionBytes was not found")
}