Home > Software engineering >  Create a single CSV per partition with Spark
Create a single CSV per partition with Spark

Time:05-31

I have a ~10GB dataframe that should be written as a bunch of CSV files, one per partition.

The CSVs should be partitioned by 3 fields: "system", "date_month" and "customer".

Inside each folder exactly one CSV file should be written, and the data inside the CSV file should be ordered by two other fields: "date_day" and "date_hour".

The filesystem (an S3 bucket) should look like this:

/system=foo/date_month=2022-04/customer=CU000001/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000002/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000003/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000004/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000002/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000003/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000004/part-00000-x.c000.csv

I know I can easily achieve that using coalesce(1) but that will only use one worker and I'd like to avoid that.

I've tried this strategy

  mydataframe.
      repartition($"system", $"date_month", $"customer").
      sort("date_day", "date_hour").
      write.
      partitionBy("system", "date_month", "customer").
      option("header", "false").
      option("sep", "\t").
      format("csv").
      save(s"s3://bucket/spool/")

my idea was that each worker would have gotten a different partition so it would have easily sorted the data and written a single file in the partition path. After running the code I've noticed I have many CSV for each partition, something like this:

/system=foo/date_month=2022-05/customer=CU000001/part-00000-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00001-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00002-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00003-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00004-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00005-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00006-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00007-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv      
[...]                                                                                                                                                                                     

the data in each file is ordered as expected and the concatenation of all the files would create the correct file, but that takes too much time and I'd prefer to rely on Spark.

Is there a way to create a single ordered CSV file per partition, without moving all the data to a single worker with coalesce(1)?

I'm using scala, if that matters.

CodePudding user response:

sort() (and also orderBy()) triggers a shuffle because it sorts the whole dataframe, to sort within the partition you should use the aptly named sortWithinPartitions.

  mydataframe.
      repartition($"system", $"date_month", $"customer").
      sortWithinPartitions("date_day", "date_hour").
      write.
      partitionBy("system", "date_month", "customer").
      option("header", "false").
      option("sep", "\t").
      format("csv").
      save(s"s3://bucket/spool/")
  • Related