Home > Mobile >  How to control number of files generated while setting large partitions in spark?
How to control number of files generated while setting large partitions in spark?

Time:12-15

Because of large number of input data, I set large shuffle partitions of spark (spark.sql.shuffle.partitions=1000). However, the output file is small (~1GB), but it creates lots of small files (3000 files, each smaller than 1Mb). How can I combine these small files to one big file?

Another question is, why the number of output files is 3 times the number of shuffle partitions?

CodePudding user response:

As per Spark docs, spark.sql.shuffle.partitions parameter Configures the number of partitions to use when shuffling data for joins or aggregations.. To control the number of output files use the repartition() method before writing the output. So something like this:

df
.filter(...)  // some transformations
.join(...)
.repartition(1)  // move data into a single partition
.write
.format(...)
.save(...)

The snippet above would result in a single output file.

You are not limited to repartitioning your data once - you can repartition as much as you need, but bare in mind that this is a costly operation:

df
.filter(...)  // some transformations
.repartition(...)  // repartition to improve join performance
.join(...)
.repartition(1)  // move data into a single partition
.write
.format(...)
.save(...)

If you want a good explanation of how repartition works, here is a great answer: Spark - repartition() vs coalesce()

For more information on how to improve the performance of the joins, refer to the Spark docs: https://spark.apache.org/docs/latest/sql-performance-tuning.html#join-strategy-hints-for-sql-queries

CodePudding user response:

Since you have a large number of partitions. You may need to coalesce on your date frame. coalesce will decrease the number of partitions.

val df_res = df.coalesce(10)

This should decrease the number of output files from 1000 to just 10. or you can coalesce(1) to create one big file.

Coalesce uses existing partitions and minimizes shuffled data. The results may be different sizes.

The number of output files is equal to the number of partitions. the property (spark.sql.shuffle.partitions) is used when shuffling data for joins or aggregations.

You can perform df.repartition() to your dataframe to increase/decrease the partitions.

  • Related