I currently have data stored as csv files in an s3 bucket. The structure of the data is as follows
s3_bucket/
├── dir_1/
│ ├── part_0000.csv
│ ├── part_0001.csv
│ ├── part_0002.csv
│ └── ...
├── dir_2/
│ ├── part_0000.csv
│ ├── part_0001.csv
│ ├── part_0002.csv
│ └── ...
├── dir_3/
│ └── ...
├── dir_4/
└── ...
I want to write some kind of Spark job to go into each subdirectory dir_n/
and merge all the data into a single file, resulting in the following structure
s3_bucket/
├── dir_1/
│ └── merged.csv
├── dir_2/
│ └── merged.csv
├── dir_3/
│ └── merged.csv
├── dir_4/
└── ...
I was thinking of somehow spawning multiple workers to crawl each subdirectory, read the data into memory, and merge them using repartition(1)
, however I am not to sure of how to do this.
Any help is greatly appreciated!
CodePudding user response:
You can just loop the directories using hadoop, and then read each directory and do a coalesce(1), here's the code in scala:
import org.apache.spark.sql._
import org.apache.hadoop.fs.Path
val path = "/path/s3_bucket/"
val hdfs = new Path(path).getFileSystem(spark.sparkContext.hadoopConfiguration)
hdfs.listStatus(new Path(path)).foreach(file => if (file.isDirectory)
spark.read.csv(path file.getPath.getName).coalesce(1)
.write.mode(SaveMode.Overwrite).csv(path file.getPath.getName)
)