I'm wanting to automate the processing of daily-uploaded files using transient EMR clusters. My files on S3 are partitioned using a date key like this:
2022-07-27-stats.csv
2022-07-28-stats.csv
...
Daily, I upload a single file from S3 into 'hdfs:///raw/' and I use * in the filepath 'hdfs:///raw/*.csv' for the read in the Spark script so I don't have to manually update the read path in the script every day to account for the changing date keys.
This works fine, but I want to be able to grab the date key part of the file in 'hdfs:///raw' so that I can add this to the write filepath part of the script using string interpolation; this way the files written to the S3 output bucket can be similarly partitioned. Is there a method that returns the file name?
CodePudding user response:
You can use:
(spark.read.csv...)(dataset).withColumn("path", input_file_name())
This will create a new column called path
and the full path that the file was read from (really good when you want to differentiate wildcard files).
For example, I have data-first.csv
with one row (first
) and a second dataset data-second.csv
with one row (second
).
If I read the files and add an input_file_name()
sparkSession.read.option("header", value = true)
.csv("C:/Users/myUser/Desktop/data-*")
.withColumn("path", input_file_name())
I get this output:
------ ------------------------------------------------
|name |path |
------ ------------------------------------------------
|second|file:///C:/Users/myUser/Desktop/data-second.csv |
|first |file:///C:/Users/myUser/Desktop/data-first.csv |
------ ------------------------------------------------
Your can do some transformations to extract the date, but this should do the work!
CodePudding user response:
files on S3 are partitioned using a date key like this
Spark/Hive do not prefer you to partition that way.
If you're able to modify your S3 writer, you can instead write into these paths, for example
s3://bucket/prefix/year=2022/month=07/day=27/stats.csv
s3://bucket/prefix/year=2022/month=07/day=28/stats.csv
And then if you configure Spark/Hive to read just s3://bucket/prefix
, it'll automatically have a partitioned dataframe with columns for year, month and day. This way, you shouldn't need the actual filename so that you'd have to parse the date value
I didn't understand what you meant by your raw folder, but same logic applies for HDFS. Ideally, if HDFS is "raw CSV", then you'd read this data in Spark from HDFS, apply a schema, and potentially aggregate/clean the dataframe, then write as Parquet or ORC somewhere else, such as S3. Then you'll have efficient ways to query that data.