Home > Software engineering >  Why is repartition faster than partitionBy in Spark?
Why is repartition faster than partitionBy in Spark?

Time:11-15

I am attempting to use Spark for a very simple use case: given a large set of files (90k) with device time-series data for millions of devices group all of the time-series reads for a given device into a single set of files (partition). For now let’s say we are targeting 100 partitions, and it is not critical that a given devices data shows up in the same output file, just the same partition.

Given this problem we’ve come up with two ways to do this - repartition then write or write with partitionBy applied to the Writer. The code for either of these is very simple:

repartition (hash column is added to ensure that comparison to partitionBy code below is one-to-one):


df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \
  .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
  .repartition("partition") \
  .write.format("json") \
  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
  .mode("overwrite") \
  .save(output_path)

partitionBy:


df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \
  .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
  .write.format("json") \
  .partitionBy(“partition”) \
  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
  .mode("overwrite") \
  .save(output_path)

In our testing repartition is 10x faster than partitionBy. Why is this?

Based on my understanding repartition will incur a shuffle which my Spark learnings have told me to try to avoid whenever possible. On the other hand, partitionBy (based on my understanding) only incurs an sort operation local to each node - no shuffle is needed. Am I misunderstanding something that is causing me to think partitionBy would be faster?

CodePudding user response:

TLDR: Spark triggers a sort when you call partitionBy, and not a hash re-partitioning. This is why it is much slower in your case.

We can check that with a toy example:

spark.range(1000).withColumn("partition", 'id % 100)
    .repartition('partition).write.csv("/tmp/test.csv")

DAG 1

Don't pay attention to the grey stage, it is skipped because it was computed in a previous job.

Then, with partitionBy:

spark.range(1000).withColumn("partition", 'id % 100)
    .write.partitionBy("partition").csv("/tmp/test2.csv")

DAG2

You can check that you can add repartition before partitionBy, the sort will still be there. So what's happening? Notice that the sort in the second DAG does not trigger a shuffle. It is a map partition. In fact, when you call partitionBy, spark does not shuffle the data as one would expect at first. Spark sorts each partition individually and then each executor writes his data in the according partition, in a separate file. Therefore, note that with partitionBy you are not writing num_partitions files but something between num_partitions and num_partitions * num_executors files. Each partition has one file per executor containing data belonging to that partition.

  • Related