I have a spark dataframe named df
, which is partitioned on the column date
. I need to save on S3 this dataframe with the CSV format. When I write the dataframe, I need to delete the partitions (i.e. the dates) on S3 for which the dataframe has data to be written to. All the other partitions need to remain intact.
I saw here that this is exactly the job of the option spark.sql.sources.partitionoverwritemode
set to dynamic
.
However, it does not seem to work for me with CSV files.
If I use it on parquet with the following command it works perfectly:
df.write
.option("partitionOverwriteMode", "dynamic")
.partitionBy("date")
.format("parquet")
.mode("overwrite")
.save(output_dir)
But if I use it on CSV with the following command it does not work:
df.write
.option("partitionOverwriteMode", "dynamic")
.partitionBy("date")
.format("csv")
.mode("overwrite")
.save(output_dir)
Why is this the case? Any idea of how this behaviour could be implemented with CSV outputs?
CodePudding user response:
What Spark version do you use? For Spark <2.0.0 it may seem impossible to use partitioning along with the csv format
CodePudding user response:
I need to delete the partitions (i.e. the dates) on S3 for which the dataframe has data to be written to
Assuming you have a convenient list of the dates you are processing you can use the replaceWhere
option to determine the partitions to overwrite (delete and replace).
For example:
df.write
.partitionBy("date")
.option("replaceWhere", "date >= '2020-12-14' AND date <= '2020-12-15'")
.format("csv")
.mode("overwrite")
.save(output_dir)
A more dynamic way is if you have the start_date
and end_date
stored in variables:
start_date = "2022-01-01"
end_date = "2022-01-14"
condition = f"date >= '{start_date}' AND date <= '{end_date}'"
df.write
.partitionBy("date")
.option("replaceWhere", condition)
.format("csv")
.mode("overwrite")
.save(output_dir)