Home > Software design >  pyspark write parquet creates many files after partitionBy
pyspark write parquet creates many files after partitionBy

Time:12-04

I used to use df.repartition(1200).write.parquet(...) which created 1200 number of files as specified in the repartion argument. I am now using paritionBy, i.e. df.repartition(1200).write.partitionBy("mykey").parquet(...). This works fine, except that it is now creating 1200 files per bucket of mykey. I would like to have 1200 files over all.

Other posts suggest to repartition across certain keys. The relevant documentation for my spark version (2.4.0) seems to suggest that this feature was added later. Is there any other way to achieve it? I guess I could repartition to 1200/len(unique("mykey"). But that's a bit hacky. Is there a better way to do it? I am also worrying that reducing the number of partitions results in out of memory erros.

CodePudding user response:

  • Calling partitionBy on your writer does not change the partitioning scheme of your dataframe. Instead it is used to specify the partitioning scheme of your data once it is written to disk

  • Say you have a dataframe with 200 parititons and you call df.write.partitionBy("mykey").parquet(...)

  • Each of your partitions will bucket it's data by unique values of "mykey"

  • Each bucket in each partition will correspond to one file being written to a disk partition

  • For example lets say the dataframe has 200 values of the field mykey=KEY1

  • And lets say that each of these 200 values are evenly spread accross the 200 partitions with 1 per partition

  • then when we call df.write.partitionBy("mykey").parquet(...)

  • We will get 200 files in the disk partition mykey=KEY1. One from each partition

  • To answer your question, there are a few ways of ensuring that exactly 1200 files are written to disk. All methods require precise control of the number of unique values in your parititons

method 1

  # requires mykey to have exactly 1200 unique values
  df = df.repartition("mykey")
  df.write.partitionBy("mykey").parquet(...)
  • repartitions the data so that dataframe partitions match disk partitions
  • repartition is an expensive operation so should be used sparingly

method 2

  # requires mykey to have exactly 1200 unique values
  df = df.coalesce(1)
  df.write.partitionBy("mykey").parquet(...)
  • This will only work if the final dataset you want to write is small enough to fit into a single partition.

method 3

  # requires mykey to have exactly 1 unique value
  df = df.repartition(1200)
  df.write.partitionBy("mykey").parquet(...)

CodePudding user response:

I'm not quite sure why you want to do both repartition and partitionBy, but you could do

df = df.repartition(1200)

df = your_processing(df)

df.coalesce(1).write.partitionBy("mykey").parquet(...)

coalesce(1) merges the partition into a single one that is then split up again by the partitionBy.

  • Related