I have developed a Pyspark Glue
job for loading the complete/incremental
dataset. It is working fine. After loading the dataset I have to perform few aggregations
and write it in "overwrite"/"append"
mode in a single location. For this, I have written the below code:
maxDateValuePath = "s3://...../maxValue/"
outputPath = "s3://..../complete-load/"
aggregatedPath = "s3://...../aggregated-output/"
fullLoad = ""
aggregatedView = ""
completeAggregatedPath = "s3://...../aggregated-output/step=complete-load/"
incrAggregatedPath = "s3://....../aggregated-output/step=incremental-load/"
aggregatedView=""
data.createOrReplaceTempView("data")
aggregatedView = spark.sql("""
select catid,count(*) as number_of_catids from data
group by catid""")
if (incrementalLoad == str(0)):
aggregatedView = aggregatedView.withColumn("created_at", current_timestamp())
aggregatedView.write.mode("overwrite").parquet(completeAggregatedPath)
elif (incrementalLoad == str(1)):
aggregatedView = aggregatedView.withColumn("created_at", current_timestamp())
log.info("step 123: " str(aggregatedView.count()))
aggregatedView.write.mode("append").parquet(completeAggregatedPath)
aggregatedView = spark.read.parquet(completeAggregatedPath)
log.info("step 126: " str(aggregatedView.count()))
w = Window.partitionBy("catid").orderBy(col("created_at").desc())
aggregatedView = aggregatedView.withColumn("rw", row_number().over(w)).filter(col("rw") == lit(1)).drop(
"rw")
log.info("step 130: " str(aggregatedView.count()))
log.info(aggregatedView.orderBy(col("created_at").desc()).show())
print("::::::::::::before writing::::::::::::::")
aggregatedView.write.mode("overwrite").parquet(incrAggregatedPath)
Where 0
and 1
stand for full load/incremental load. Now, before writing the transformed dataset I am adding a created_at
column for handling the latest aggregated records after writing the incremental dataset, or else it leads to duplicates.
Everything is working fine as expected but the problem is while I am trying to write the dataset in overwrite mode using this line aggregatedView.write.mode("overwrite").parquet(aggregatedPath)
of the incremental part, the bucket gets deleted in s3 and this operation results in the below error
:
Caused by: java.io.FileNotFoundException: File not present on S3
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
Why the bucket is getting deleted?
CodePudding user response:
The problem is in your code. You are reading and writing to the same location in the incremental section.
aggregatedView = spark.read.parquet(aggregatedPath)
...
...
aggregatedView.write.mode("overwrite").parquet(aggregatedPath)
Since spark does a lazy evaluation, whats happening is that when you specify the mode as overwrite, it clears off the data in the particular folder which leaves you with nothing to read. When it reaches the write part of the code, it starts to read the data, by which time your data has already been cleared by your write action.
CodePudding user response:
So, I fixed my problem by changing the below line of my code:
aggregatedView2 = spark.read.parquet(completeAggregatedPath)
So for aggregated view, there will be one df lineage. Since read and write was being performed on the same s3 location and on the same df lineage, it was deleting the prefix because the source data for df was ambiguous. Therefore, created a new df where it will look for the S3 location and not the previous transformations.
Maybe it will help someone!