Home > Mobile >  How to merge all files within many sub-directories using Spark, but maintaining the directory struct
How to merge all files within many sub-directories using Spark, but maintaining the directory struct

Time:12-05

I currently have data stored as csv files in an s3 bucket. The structure of the data is as follows

s3_bucket/
├── dir_1/
│   ├── part_0000.csv
│   ├── part_0001.csv
│   ├── part_0002.csv
│   └── ...
├── dir_2/
│   ├── part_0000.csv
│   ├── part_0001.csv
│   ├── part_0002.csv
│   └── ...
├── dir_3/
│   └── ...
├── dir_4/
└── ...

I want to write some kind of Spark job to go into each subdirectory dir_n/ and merge all the data into a single file, resulting in the following structure

s3_bucket/
├── dir_1/
│   └── merged.csv
├── dir_2/
│   └── merged.csv
├── dir_3/
│   └── merged.csv
├── dir_4/
└── ...

I was thinking of somehow spawning multiple workers to crawl each subdirectory, read the data into memory, and merge them using repartition(1), however I am not to sure of how to do this. Any help is greatly appreciated!

CodePudding user response:

You can just loop the directories using hadoop, and then read each directory and do a coalesce(1), here's the code in scala:

import org.apache.spark.sql._
import org.apache.hadoop.fs.Path
val path = "/path/s3_bucket/"
val hdfs = new Path(path).getFileSystem(spark.sparkContext.hadoopConfiguration)
hdfs.listStatus(new Path(path)).foreach(file => if (file.isDirectory)
  spark.read.csv(path   file.getPath.getName).coalesce(1)
    .write.mode(SaveMode.Overwrite).csv(path   file.getPath.getName)
)
  • Related