How many files does a pyspark parquet write generate? I have read that the output is one file per in memory partition. However, this does not seem to always be true.
I am running a 6 executors cluster with 6G executor memory per executor. All the rest (pyspark, overhead, offheap) are 2G
using the following data:
dummy_data = spark.createDataFrame(pd.DataFrame({'a':np.random.choice([1,2,3,4,5,6,7,8,9,10],100000)}))
The following code where I repartition without specifying a column to repartition by always produces the number of files equal to the number of memory partitions:
df_dummy = dummy_data.repartition(200)
df_dummy.rdd.getNumPartitions()
df_dummy.write.format("parquet").save("gs://monsoon-credittech.appspot.com/spark_datasets/test_writes/df_dummy_repart_wo_id")
#files generated 200
However, the following code where I specify the column to repartition the data by produces some random number of files
df_dummy = dummy_data.repartition(200,'a')
df_dummy.rdd.getNumPartitions()
df_dummy.write.format("parquet").save("gs://monsoon-credittech.appspot.com/spark_datasets/test_writes/df_dummy_repart_w_id")
#files generated 11
Can you help me understand the number of output files that get generated by the pyspark parquet writer.
CodePudding user response:
This is an answer that does not explain everything you're noticing, but probably contains useful enough information that it would be a pity not to share it.
The reason why you're seeing a different amount of output files is because of the order of your data after those 2 partitions.
dummy_data.repartition(200)
repartitions your individual rows using a Round Robin Partitioning (which is basically just hash partitioning on your rows)- the result is that your data is chopped up in a rather random fashion (hashing the rows won't really create a specific "order" in your partitions)
dummy_data.repartition(200,'a')
does the same operation, but according to the columna
's values- the result is that your data is chopped up in a very specific order: hashing the column values will put values where
a == 1
always in the same partition - since your nr of partitions is smaller than the distinct amount of possible values, each partition will contain only 1 distinct
a
value.
- the result is that your data is chopped up in a very specific order: hashing the column values will put values where
Now, there is a pattern in the amount of output part-files you receive:
- In the case of
dummy_data.repartition(200)
, you simply get the same number of part-files as partitions. 200 in your example. - In the other case, you get 11 part-files. If you have a look at the content of those part-files, you will see that there is 1 empty file 10 filled files. 1 for each distinct value of your original dataset. So this leads to the conclusion that while writing your files, something is being smart and merging those minuscule and identical files. I'm not sure whether this is Spark, or the
PARQUET_OUTPUT_COMMITTER_CLASS
, or something else.
Conclusion
In general, you get the same amount of part-files as the amount of partitions.
In your specific case, when you're repartitioning by the column (which is the only value in the Row), your parquet part-files will contain a bunch of the same values. It seems that something (I don't know what) is being smart and merging files with the same values.
In your case, you got 11 part-files because there is 1 empty file and 10 files for each distinct value in your dataframe. Try changing np.random.choice([1,2,3,4,5,6,7,8,9,10]
to np.random.choice([1,2,3,4,5,6,7,8]
and you will see you'll get 9 part-files (8 1).