Home > Back-end >  How round-robin repartition without key might cause data skew?
How round-robin repartition without key might cause data skew?

Time:01-19

Seems like I'm missing something about repartition in spark.

AFAIK, you can repartition with a key: df.repartition("key") , in which case spark will use a hash partitioning method.

And you can repartition with setting only partitions number: df.repartition(10), in which spark will use a round robin partitioning method.

In which case a round robin partition will have a data skew which will require using salt to randomize the results equally, if repartitioning with only column numbers is done in a round robin manner?

CodePudding user response:

With df.repartition(10) you cannot have a skew. As you mention it, spark uses a round robin partitioning method so that partitions have the same size.

We can check that:

spark.range(100000).repartition(5).explain
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
 - Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [id=#1380]
    - Range (0, 100000, step=1, splits=16)
spark.range(100000).repartition(5).groupBy(spark_partition_id).count
 -------------------- ----- 
|SPARK_PARTITION_ID()|count|
 -------------------- ----- 
|                   0|20000|
|                   1|20000|
|                   2|20000|
|                   3|20000|
|                   4|20000|
 -------------------- ----- 

If you use df.repartition("key"), something different happens:

// let's specify the number of partitions as well
spark.range(100000).repartition(5, 'id).explain
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
 - Exchange hashpartitioning(id#352L, 5), REPARTITION_BY_NUM, [id=#1424]
    - Range (0, 100000, step=1, splits=16)

Let's try:

spark.range(100000).repartition(5, 'id).groupBy(spark_partition_id).count.show
 -------------------- ----- 
|SPARK_PARTITION_ID()|count|
 -------------------- ----- 
|                   0|20128|
|                   1|20183|
|                   2|19943|
|                   3|19940|
|                   4|19806|
 -------------------- ----- 

Each element of the column is hashed and hashes are split between partitions. Therefore partitions have similar sizes but they don't have exactly the same size. However, two rows with the same key necessarily end up in the same partition. So if your key is skewed (one or more particular keys are over-represented in the dataframe), your partitioning will be skewed as well:

 spark.range(100000)
    .withColumn("key", when('id < 1000, 'id).otherwise(lit(0)))
    .repartition(5, 'key)
    .groupBy(spark_partition_id).count.show
 -------------------- ----- 
|SPARK_PARTITION_ID()|count|
 -------------------- ----- 
|                   0|99211|
|                   1|  196|
|                   2|  190|
|                   3|  200|
|                   4|  203|
 -------------------- ----- 
  • Related