- we have a data frame which we want to write to s3 as parquet format and in overwrite mode.
- every time we write the dataframe it's always a new folder. The code to write the s3 location is as follows:
df.write
.option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
.option("maxRecordsPerFile", maxRecordsPerFile)
.mode("overwrite")
.format(format)
.save(output)
What we observe is, at times we get FilenotFoundException
(full trace below). Can somebody help me understand
- when i am writing to a new s3 location (meaning nobody is reading from the location); why does the writing program throw the below exception?
- how to fix it? --i see couple of stackoverflows pointing to this exception. But they say that it happens when you try to read when write is happening. But my case is not like that. i dont read when write happens.
- my spark is
2.3.2
;EMR-5.18.1
; the code is written inscala
- I am using
s3://
as output folder path. Should i change it to somes3n
ors3a
? will that help?
Caused by: java.io.FileNotFoundException: No such file or directory 's3://BUCKET/snapshots/FOLDER/_bid_9223370368440344985/part-00020-693dfbcb-74e9-45b0-b892-0b19fa92365c-c000.snappy.parquet'
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:131)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:104)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:101)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:853)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:853)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
CodePudding user response:
I finally was able to solve the problem
The
df : DataFrame
was formed on the sames3
folder to which the same is being written inoverwrite
mode.So during the
overwrite
; the source folder is getting cleared --which was resulting into the error
Hope this helps somebody.