I am attempting to use Spark for a very simple use case: given a large set of files (90k) with device time-series data for millions of devices group all of the time-series reads for a given device into a single set of files (partition). For now let’s say we are targeting 100 partitions, and it is not critical that a given devices data shows up in the same output file, just the same partition.
Given this problem we’ve come up with two ways to do this - repartition
then write
or write
with partitionBy
applied to the Writer
. The code for either of these is very simple:
repartition
(hash column is added to ensure that comparison to partitionBy
code below is one-to-one):
df = spark.read.format("xml") \
.options(rowTag="DeviceData") \
.load(file_path, schema=meter_data) \
.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
.repartition("partition") \
.write.format("json") \
.option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
.mode("overwrite") \
.save(output_path)
partitionBy
:
df = spark.read.format("xml") \
.options(rowTag="DeviceData") \
.load(file_path, schema=meter_data) \
.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
.write.format("json") \
.partitionBy(“partition”) \
.option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
.mode("overwrite") \
.save(output_path)
In our testing repartition
is 10x faster than partitionBy
. Why is this?
Based on my understanding repartition
will incur a shuffle which my Spark learnings have told me to try to avoid whenever possible. On the other hand, partitionBy
(based on my understanding) only incurs an sort operation local to each node - no shuffle is needed. Am I misunderstanding something that is causing me to think partitionBy
would be faster?
CodePudding user response:
TLDR: Spark triggers a sort when you call partitionBy
, and not a hash re-partitioning. This is why it is much slower in your case.
We can check that with a toy example:
spark.range(1000).withColumn("partition", 'id % 100)
.repartition('partition).write.csv("/tmp/test.csv")
Don't pay attention to the grey stage, it is skipped because it was computed in a previous job.
Then, with partitionBy
:
spark.range(1000).withColumn("partition", 'id % 100)
.write.partitionBy("partition").csv("/tmp/test2.csv")
You can check that you can add repartition
before partitionBy
, the sort will still be there. So what's happening? Notice that the sort in the second DAG does not trigger a shuffle. It is a map partition. In fact, when you call partitionBy
, spark does not shuffle the data as one would expect at first. Spark sorts each partition individually and then each executor writes his data in the according partition, in a separate file. Therefore, note that with partitionBy
you are not writing num_partitions
files but something between num_partitions
and num_partitions * num_executors
files. Each partition has one file per executor containing data belonging to that partition.