I would like to get a list of the exact files that were written as the result of a Spark dataframe save
API command. I'm specifically using append
mode to update a common set of partitions and would like to know which files were added with every operation.
Example, with base data:
val jsonString = """[
{"eventId": 1, "date": "20210101", "attributes": ["test", "foo", "bar"]},
{"eventId": 2, "date": "20210101", "attributes": ["test", "foo", "bar"]},
{"eventId": 3, "date": "20210101", "attributes": ["test", "foo", "bar"]},
{"eventId": 4, "date": "20210101", "attributes": ["test", "foo", "bar", "baz", "bro"]}]
Load and save once
spark.read.json(Seq(jsonString).toDS)
.write
.mode("append")
.json("s3://bucket/test")
Load, modify, and save again
spark.read.json(Seq(jsonString).toDS)
.withColumn("modified", lit("yes"))
.write
.mode("append")
.json("s3://bucket/test")
How do I know which files came from which dataframe operation?
I'm on EMR dealing purely with S3 for what it is worth.
CodePudding user response:
Why not add a column that is "updateTimestamp" and just pull the data you need from teh data frame? I'm not sure you actually want to deal with files, just the data that was included. If you added updateTimestamp and actually added a table partition then you would be grouping the data (in the files system) together.
CodePudding user response:
You asked about files but you may wish to actually do away with partitions in S3 and use a better storage format like avro/ORC/Parquet. Directory Scanning performs poorly in S3, it's better at throughput of large files. You may get better performance storing massive files and using spark(CPU) to search the files rather than using million of small partitions. (That incur S3 performance hits as it searches for the small files.)
I have seen performance increases of 00 in S3 by removing partitions and using large files. This doesn't answer your direct question but if the underlying question is how do a I get better performance this is your answer.