Home > Blockchain >  Uploading File from HDFS to S3 using FileUtil.copy in Spark causing DiskErrorException: Directory is
Uploading File from HDFS to S3 using FileUtil.copy in Spark causing DiskErrorException: Directory is

Time:05-25

I am trying to write a parquet file to HDFS and then copying it to s3.

I wrote the code in Zeppelin and it worked well. Without any issue it added the file to the s3 file path.

  var outputFolder = "buckent_name/path"
  println("\n ---- TASK 1 ----- \n writing with path "   outputFolder)
  wholeParquetFile
    .withColumn("date_col", to_date(col("timestamp"), "YYYYMMdd"))
    .withColumn("year", year(col("date_col")))
    .withColumn("month", month(col("date_col")))
    .withColumn("day", dayofmonth(col("date_col")))
    .drop("date_col")
    .repartition(1)
    .write.mode(SaveMode.Overwrite)
    .partitionBy("year", "month", "day")
    .parquet(outputFolder)


  val sc = spark.sparkContext
  val fs = FileSystem.get(sc.hadoopConfiguration)
  val allTheFilesThatBennCreated: Array[FileStatus] = fs.globStatus(new Path(outputFolder   "/year=*/month=*/day=*/*"))

  println("------- allTheFilesThatBennCreated   -------"   allTheFilesThatBennCreated.mkString("Array(", ", ", ")"))

  // right now the file path will be outputFile   "/year=2021/month=5/day=17/part-....c000.snappy.parquet
  // converting it to                outputFile   "/2021/5/17/part-....c000.snappy.parquet"
  allTheFilesThatBennCreated.foreach(path => {

    val newPathString = generateOutputFilePathString(path.getPath.toString)
    val outputFilePath = new Path(newPathString)
    val destinationFileSystem = FileSystem.get(outputFilePath.toUri, sc.hadoopConfiguration)
    val sourceFileSystem = FileSystem.get(path.getPath.toUri, sc.hadoopConfiguration)

    println("-------- source filesystem ------------------"   sourceFileSystem)

    println("-------- path.getPath --------------"   path.getPath)

    println("-------- destinationFileSystem ------------- "   destinationFileSystem)

    println("-------- S3 path for Output File ------------"   outputFilePath)

    // uploading to s3 from hdfs
    FileUtil.copy(sourceFileSystem, path.getPath, destinationFileSystem, outputFilePath,true, sc.hadoopConfiguration)
  })

but when I try to run the same code in spark-shell or through a jar file in spark-submit it throughs this error.

22/05/17 09:57:28 WARN LocalDirAllocator$AllocatorPerContext: /mnt/var/lib/hadoop/tmp/s3a is not writable

org.apache.hadoop.util.DiskChecker$DiskErrorException: Directory is not writable: /mnt/var/lib/hadoop/tmp/s3a
    at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:167)
    at org.apache.hadoop.util.DiskChecker.checkDirInternal(DiskChecker.java:100)
    at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:77)
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:315)
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:378)
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:461)
    at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:501)
    at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:66)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:690)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1075)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1056)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:945)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:393)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
    at com.propellyr.driver.ApplicationMain$$anonfun$2.apply(ApplicationMain.scala:86)
    at com.propellyr.driver.ApplicationMain$$anonfun$2.apply(ApplicationMain.scala:70)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at com.propellyr.driver.ApplicationMain$.delayedEndpoint$com$propellyr$driver$ApplicationMain$1(ApplicationMain.scala:70)
    at com.propellyr.driver.ApplicationMain$delayedInit$body.apply(ApplicationMain.scala:7)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at com.propellyr.driver.ApplicationMain$.main(ApplicationMain.scala:7)
    at com.propellyr.driver.ApplicationMain.main(ApplicationMain.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:863)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:938)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:947)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" org.apache.hadoop.util.DiskChecker$DiskErrorException: No space available in any of the local directories.
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:400)
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:461)
    at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:501)
    at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:66)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:690)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1075)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1056)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:945)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:393)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
    at com.propellyr.driver.ApplicationMain$$anonfun$2.apply(ApplicationMain.scala:86)
    at com.propellyr.driver.ApplicationMain$$anonfun$2.apply(ApplicationMain.scala:70)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at com.propellyr.driver.ApplicationMain$.delayedEndpoint$com$propellyr$driver$ApplicationMain$1(ApplicationMain.scala:70)
    at com.propellyr.driver.ApplicationMain$delayedInit$body.apply(ApplicationMain.scala:7)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at com.propellyr.driver.ApplicationMain$.main(ApplicationMain.scala:7)
    at com.propellyr.driver.ApplicationMain.main(ApplicationMain.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:863)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:938)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:947)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Anybody know how to solve this?

CodePudding user response:

You've not said anything about your environments for Zeplin and CLI whether both are submitting to the same cluster or your CLI is using local mode

Nevertheless the clue is in your stack trace

No space available in any of the local directories.

Looking further into this, the error is at FileUtil.copy() it is trying to write temporary output to paths configured by the property mapred.local.dir which is what you could check

  • Related