Home > Enterprise >  understanding spark.default.parallelism
understanding spark.default.parallelism

Time:12-12

As per the documentation:

spark.default.parallelism:Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user

spark.default.parallelism: For distributed shuffle operations like reduceByKey and join, the largest number of partitions in a parent RDD

I am not able to produce the documented behaviour

Dataframe:

I create 2 DFs with partitions 3 and 50 respectively and join them. The output should have 50 partitions but, always has the number of partitions equal to the number of partitions of the larger DF

df1 = spark.range(1,10000).repartition(50)
df2 = spark.range(1,100000).repartition(3)
df3 = df1.join(df2,df1.id==df2.id,'inner')
df3.rdd.getNumPartitions() #3

RDD:

I create 2 DFs with partitions 3 and 50 respectively and join the underlying RDDs of them. The output RDD should have 50 partitions but, has 53 partitions

df1 = spark.range(1,10000).repartition(50)
df2 = spark.range(1,100000).repartition(3)
df3 = df1.rdd.join(df2.rdd)
df3.getNumPartitions() #53

How to understand the conf spark.default.parallelism?

CodePudding user response:

For now i can answer df part, i am experimenting with rdd so maybe i will add edit later

For df there is parameter spark.sql.shuffle.partitions which is used during joins etc, its set to 200 by default. In your case its not used and there may be two reasons:

Your datasets are smaller than 10mb so one dataset is broadcasted and there is no shuffle during join so you end up with number of partitions from bigger dataset

You may have AQE enabled which is changing the number of partitions.

I did a quick check with broadcast and AQE disabled and results are as expected

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.shuffle.partitions",100)
​
df1 = spark.range(1,10000).repartition(50)
df2 = spark.range(1,100000).repartition(3)
df3 = df1.join(df2,df1.id==df2.id,'inner')
df3.rdd.getNumPartitions() #100
Out[35]: 100

For second case with rdds i think that rdds are co-partitioned and due to that Spark is not triggering full shuffle: enter image description here

  • Related