Using Python on an Azure HDInsight cluster, we are saving Spark dataframes as Parquet files to an Azure Data Lake Storage Gen2, using the following code:
df.write.parquet('abfs://, 'overwrite', compression='snappy')
Often this works, but when we recently upgraded our cluster to run more scripts at the same time (around ten to fifteen) we consistently get the following exception for a varying small fraction of the scripts:
Py4JJavaError: An error occurred while calling o2232.parquet. : Operation failed: "The specified path does not exist.", 404, PUT,, PathNotFound, "The specified path does not exist."
I think all the Spark jobs and tasks actually succeed, also the one that saves the table, but then the Python script exits with the exception.
Background information
We are using Spark Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_265, Hadoop
File "/usr/hdp/current/spark2-client/python/pyspark/sql/", line 843, in parquet
df_to_save.write.parquet(blob_path, mode, compression='snappy')
File "/usr/hdp/current/spark2-client/python/lib/", line 1257, in __call__
answer, self.gateway_client, self.target_id,
File "/usr/hdp/current/spark2-client/python/pyspark/sql/", line 63, in deco
return f(*a, **kw)
File "/usr/hdp/current/spark2-client/python/lib/", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2232.parquet.
: Operation failed: "The specified path does not exist.", 404, PUT,, PathNotFound, "The specified path does not exist. RequestId:1870ec49-e01f-0101-72f8-f260fe000000 Time:2021-12-17T03:42:35.8434071Z"
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.mkdirs(
at org.apache.hadoop.fs.FileSystem.mkdirs(
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:139)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
at java.lang.reflect.Method.invoke(
at py4j.reflection.MethodInvoker.invoke(
at py4j.reflection.ReflectionEngine.invoke(
at py4j.Gateway.invoke(
at py4j.commands.AbstractCommand.invokeMethod(
at py4j.commands.CallCommand.execute(
21/12/17 03:42:02 INFO DAGScheduler [Thread-11]: Job 2 finished: saveAsTable at, took 1.120535 s
21/12/17 03:42:02 INFO FileFormatWriter [Thread-11]: Write Job 11fc45a5-d398-4f9a-8350-f928c3722886 committed.
21/12/17 03:42:02 INFO FileFormatWriter [Thread-11]: Finished processing stats for write job 11fc45a5-d398-4f9a-8350-f928c3722886.
21/12/17 03:42:05 INFO ParquetFileFormat [Thread-11]: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
21/12/17 03:42:05 INFO FileOutputCommitter [Thread-11]: File Output Committer Algorithm version is 2
21/12/17 03:42:05 INFO FileOutputCommitter [Thread-11]: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false, move _temporary folders into Trash: false
21/12/17 03:42:05 INFO SQLHadoopMapReduceCommitProtocol [Thread-11]: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
21/12/17 03:42:05 INFO FileOutputCommitter [Thread-11]: File Output Committer Algorithm version is 2
21/12/17 03:42:05 INFO FileOutputCommitter [Thread-11]: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false, move _temporary folders into Trash: false
21/12/17 03:42:05 INFO SQLHadoopMapReduceCommitProtocol [Thread-11]: Using output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
21/12/17 03:42:28 ERROR ApplicationMaster [Driver]: User application exited with status 1
21/12/17 03:42:28 INFO ApplicationMaster [Driver]: Final app status: FAILED, exitCode: 1, (reason: User application exited with status 1)
There is also another version of this exception which does occur in a Spark task which then fails, but Spark automatically restarts the failed task and usually it succeeds then. In some cases, the AM will report the app as failed, but I don't understand why because all jobs succeeded.
Possible causes
As seen in Spark _temporary creation reason I would expect that the _temporary directory will not be moved until all tasks are done.
Looking at the stacktrace, it happens in AzureBlobFileSystem.mkdirs, which suggests to me that it's trying to create subdirectories somewhere under _temporary/0
, but it cannot find the 0
directory. I'm not sure if the _temporary
directory exists at that point.
Related questions
- It does sound similar, but I don't see tasks being restarted because they take long, and this should have been fixed a long time ago anyway. I'm not completely sure if speculative execution is visible in the Spark UI though.
- Saving dataframe to local file system results in empty results We are not saving to any local file system (even though the error message does say https, the stacktrace shows AzureBlobFileSystem.
- Spark Dataframe Write to CSV creates _temporary directory file in Standalone Cluster Mode We are using HDFS and also file output committer 2
- Multiple spark jobs appending parquet data to same base path with partitioning I don't think two jobs make use of the same directory here
- I don't think this is a permissions issue, as most of the time it does work.
- Extremely slow S3 write times from EMR/ Spark We don't have any problems regarding slow renaming, as far as I know (the files aren't very large anyway). I think it fails before renaming, so a zero-rename committer wouldn't help here?
- Suggests to look in the namenode audit log of hdfs, but haven't yet found it.
- Since the stacktrace shows it fails at mkdirs, I'm guessing the
itself doesn't exist, but I don't understand why mkdirs doesn't create it. But I don't think AzureBlobFileSystem is open source? - I did find some version of but based on the stacktrace it would go to checkException with a
flag which doesn't make sense to me.
Possible options to try:
- Pyspark dataframe write parquet without deleting /_temporary folder What we could try, is first saving to a different HDFS and then copy the final files. I'm not sure why it would help, because we're already saving to HDFS (well, an extension of it, ADFS).
- We could try using append and delete the files ourselves.
- Change spark _temporary directory path Using our own FileOutputCommitter sounds overkill for this problem
CodePudding user response:
ABFS is a "real" file system, so the S3A zero rename committers are not needed. Indeed, they won't work. And the client is entirely open source - look into the hadoop-azure module.
the ADLS gen2 store does have scale problems, but unless you are trying to commit 10,000 files, or clean up massively deep directory trees -you won't hit these. If you do get error messages about Elliott to rename individual files and you are doing Jobs of that scale (a) talk to Microsoft about increasing your allocated capacity and (b) pick this up
This isn't it. I would guess that actually you have multiple jobs writing to the same output path, and one is cleaning up while the other is setting up. In particular -they both seem to have a job ID of "0". Because of the same job ID is being used, what only as task set up and task cleanup getting mixed up, it is possible that when an job one commits it includes the output from job 2 from all task attempts which have successfully been committed.
I believe that this has been a known problem with spark standalone deployments, though I can't find a relevant JIRA. SPARK-24552 is close, but should have been fixed in your version. SPARK-33402 Jobs launched in same second have duplicate MapReduce JobIDs. That is about job IDs just coming from the system current time, not 0. But: you can try upgrading your spark version to see if it goes away.
My suggestions
- make sure your jobs are not writing to the same table simultaneously. Things will get in a mess.
- grab the most recent version spark you are happy with