Home > Software engineering >  Spark AQE coalesce not working as expected
Spark AQE coalesce not working as expected

Time:10-24

I wrote a small PySpark code to test the working of spark AQE, and doesn't seem to coalesce the partitions as per the parameters passed to it.

Following is my code :

df = spark.read.format("csv").option("header", "true").load(<path to my csv file>)

spark.conf.set("spark.sql.adaptive.enabled","true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions","50")
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "60")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes","200000")
spark.conf.set("spark.sql.adaptive.coalescePartitions.parallelismFirst","false")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "200000")

df3 = df.groupby("Loan title").agg({"*":"count"}).withColumnRenamed('count(1)','cnt')
df3.show()

The file is ~ 1.8 Gb and gets read into 14 partitions and its shuffle write is ~ 1.8MB and I have set the advisoryPartitionSizeInBytes and minPartitionSize as 200 kb, so I expected the number of coalesce partitions to be around 9 (1M/200kb).

But eventhough we see 8 coalesced partitions in AQEshuffle reader in the final plan, the number of tasks in the final stage is still 1 which is confusing.

Please find the spark ui images below :

physical plan

stages

Could anyone help me in figuring out this behavior ? Thanks in advance!!

CodePudding user response:

After some trials I figured out the issue. The shuffle write for the final stage was not equal to the shuffle read because of the df3.show() command. This was only reading some of the input to take to the driver as all the answers are not shown.

Once I changed this to .write or df3.rdd.getNumPartitions() I can see the expected number of tasks/partitions getting created because now all the partitions are being read.

Please find the screenshots below :

Stages

Stages 18 - 20 : df3.show()

Stages 21 - 23 : df3.write.format("csv").save(..)

  • Related