I am trying to write a parquet file to HDFS and then copying it to s3.
I wrote the code in Zeppelin and it worked well. Without any issue it added the file to the s3 file path.
var outputFolder = "buckent_name/path"
println("\n ---- TASK 1 ----- \n writing with path " outputFolder)
wholeParquetFile
.withColumn("date_col", to_date(col("timestamp"), "YYYYMMdd"))
.withColumn("year", year(col("date_col")))
.withColumn("month", month(col("date_col")))
.withColumn("day", dayofmonth(col("date_col")))
.drop("date_col")
.repartition(1)
.write.mode(SaveMode.Overwrite)
.partitionBy("year", "month", "day")
.parquet(outputFolder)
val sc = spark.sparkContext
val fs = FileSystem.get(sc.hadoopConfiguration)
val allTheFilesThatBennCreated: Array[FileStatus] = fs.globStatus(new Path(outputFolder "/year=*/month=*/day=*/*"))
println("------- allTheFilesThatBennCreated -------" allTheFilesThatBennCreated.mkString("Array(", ", ", ")"))
// right now the file path will be outputFile "/year=2021/month=5/day=17/part-....c000.snappy.parquet
// converting it to outputFile "/2021/5/17/part-....c000.snappy.parquet"
allTheFilesThatBennCreated.foreach(path => {
val newPathString = generateOutputFilePathString(path.getPath.toString)
val outputFilePath = new Path(newPathString)
val destinationFileSystem = FileSystem.get(outputFilePath.toUri, sc.hadoopConfiguration)
val sourceFileSystem = FileSystem.get(path.getPath.toUri, sc.hadoopConfiguration)
println("-------- source filesystem ------------------" sourceFileSystem)
println("-------- path.getPath --------------" path.getPath)
println("-------- destinationFileSystem ------------- " destinationFileSystem)
println("-------- S3 path for Output File ------------" outputFilePath)
// uploading to s3 from hdfs
FileUtil.copy(sourceFileSystem, path.getPath, destinationFileSystem, outputFilePath,true, sc.hadoopConfiguration)
})
but when I try to run the same code in spark-shell or through a jar file in spark-submit it throughs this error.
22/05/17 09:57:28 WARN LocalDirAllocator$AllocatorPerContext: /mnt/var/lib/hadoop/tmp/s3a is not writable
org.apache.hadoop.util.DiskChecker$DiskErrorException: Directory is not writable: /mnt/var/lib/hadoop/tmp/s3a
at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:167)
at org.apache.hadoop.util.DiskChecker.checkDirInternal(DiskChecker.java:100)
at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:77)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:315)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:378)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:461)
at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:501)
at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:66)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:690)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1075)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1056)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:945)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:393)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
at com.propellyr.driver.ApplicationMain$$anonfun$2.apply(ApplicationMain.scala:86)
at com.propellyr.driver.ApplicationMain$$anonfun$2.apply(ApplicationMain.scala:70)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.propellyr.driver.ApplicationMain$.delayedEndpoint$com$propellyr$driver$ApplicationMain$1(ApplicationMain.scala:70)
at com.propellyr.driver.ApplicationMain$delayedInit$body.apply(ApplicationMain.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.propellyr.driver.ApplicationMain$.main(ApplicationMain.scala:7)
at com.propellyr.driver.ApplicationMain.main(ApplicationMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:863)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:938)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:947)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" org.apache.hadoop.util.DiskChecker$DiskErrorException: No space available in any of the local directories.
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:400)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:461)
at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:501)
at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:66)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:690)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1075)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1056)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:945)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:393)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
at com.propellyr.driver.ApplicationMain$$anonfun$2.apply(ApplicationMain.scala:86)
at com.propellyr.driver.ApplicationMain$$anonfun$2.apply(ApplicationMain.scala:70)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.propellyr.driver.ApplicationMain$.delayedEndpoint$com$propellyr$driver$ApplicationMain$1(ApplicationMain.scala:70)
at com.propellyr.driver.ApplicationMain$delayedInit$body.apply(ApplicationMain.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.propellyr.driver.ApplicationMain$.main(ApplicationMain.scala:7)
at com.propellyr.driver.ApplicationMain.main(ApplicationMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:863)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:938)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:947)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Anybody know how to solve this?
CodePudding user response:
You've not said anything about your environments for Zeplin and CLI whether both are submitting to the same cluster or your CLI is using local mode
Nevertheless the clue is in your stack trace
No space available in any of the local directories.
Looking further into this, the error is at FileUtil.copy()
it is trying to write temporary output to paths configured by the property mapred.local.dir
which is what you could check