Home > Software design >  Get PySpark to output one file per column value (repartition / partitionBy not working)
Get PySpark to output one file per column value (repartition / partitionBy not working)

Time:09-25

I've seen many answers and blob posts suggesting that:

df.repartition('category').write().partitionBy('category')

Will output one file per category, but this doesn't appear to be true if the number of unique 'category' values in df is less than the number of default partitions (usually 200).

When I use the above code on a file with 100 categories, I end up with 100 folders each containing between 1 and 3 "part" files, rather than having all rows with a given "category" value in the same "part". The answer at https://stackoverflow.com/a/42780452/529618 seems to explain this.

What is the fastest way get exactly one file per partition value?


Things I've tried

I've seen many claims that

df.repartition(1, 'category').write().partitionBy('category')
df.repartition(2, 'category').write().partitionBy('category')

Will create "exactly one file per category" and "exactly two files per category" respectively, but this doesn't appear to be how this parameter works. The documentation makes it clear that the numPartitions argument is the total number of partitions to create, not the number of partitions per column value. Based on that documentation, specifying this argument as 1 should (accidentally) output a single file per partition when the file is written, but presumably only because it removes all parallelism and forces your entire RDD to be shuffled / recalculated on a single node.

required_partitions = df.select('category').distinct().count()
df.repartition(required_partitions, 'category').write().partitionBy('category')

The above seems like a workaround based on the documented behaviour, but one that would be costly for several reasons. For one, a separate count if df is expensive and not cached (and/or so big that it would be wasteful to cache just for this purpose), and also any repartitioning of a dataframe can cause unnecessary shuffling in a multi-stage workflow that has various dataframe outputss along the way.

CodePudding user response:

The "fastest" way probably depends on the actual hardware set-up and actual data (in case it is skewed). To my knowledge, I also agree that df.repartition('category').write().partitionBy('category') will not help solving your problem.

We faced a similar problem in our application but instead of doing first a count and then the repartition, we separated the writing of the data and the requirement to have only a single file per partition into two different Spark jobs. The first job is optimized to write the data. The second job just iterates over the partitioned folder structure and simply reads the data per folder/partition, coalesces its data to one partition and overwrites them back. Again, I can not tell if that is the fastest way also to your environment, but for us it did the trick.

Having done some research on this topic lead to the Auto Optimize Writes feature on Databricks for writing to a Delta Table. Here, they use a similar approach: First writing the data and then running a separate OPTIMIZE job to aggregate the files into a single file. In the mentioned link you will find this explanation:

"After an individual write, Azure Databricks checks if files can further be compacted, and runs an OPTIMIZE job [...] to further compact files for partitions that have the most number of small files."

As a side note: Make sure to keep the configuration spark.sql.files.maxRecordsPerFile to 0 (default value) or to a negative number. Otherwise, this configuration alone could lead to multiple files for data with the same value in the column "category".

CodePudding user response:

You can try coalesce(n); coalesce is used to decrease the number of partitions, which is an optimized version of repartition.

n = The number of partitions you want to be output.

  • Related