Home > Blockchain >  Spark i/o with S3
Spark i/o with S3

Time:12-05

Reading this below from https://blog.duyet.net/2021/04/spark-kubernetes-performance-tuning.html

I/O with S3

It’s longer time to append data to an existing dataset and in particular, all of Spark jobs have finished, but your command has not finished, it is because driver node is moving the output files of tasks from the job temporary directory to the final destination one-by-one, which is slow with cloud storage (e.g. S3).

Enable this optimization: spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2

  • I am wanting to check if the bold statement is true. I have never heard that the Spark Driver writes files / controls writing with S3. Sure, not an HDFS Cluster, and Spark Driver does work necessarily on reading from S3. My knowledge is that the Executors write the data to data at rest, or KAFKA, even if running Spark on AWS. But, presumably I am wrong, or not?

  • If true, same for ADLS2?

The comment "I have faced the same issue, and I found It was quicker to write the content on a temporary HDFS directory and the move the content with a command such as s3-dist-cp to S3" is not what I am asking about.

CodePudding user response:

Whoever wrote that post does not full understand the whole job commit problem and is dangerously misleading.

  1. job: the whole execution of a query/rdd write
  2. task attempts perform work on different processes; generating output locally. TAs may fail, so this is done in isolation.
  3. task commit promotes the work of a task attempt to that of a job. MUST be atomic, so if a task fails midway through (or worse, partitions from the spark driver but keeps going), another task attempt may be executed and committed.
  4. job commit takes all the work of committed tasks (and nothing from uncommitted/failed tasks) and promotes the final dir.

The v1 commit algorithm consists of

  1. Task commit: rename task attempt src tree to a job attempt dir (under _tmp/). relies on dir rename being (1) atomic and (2) fast. Neither requirement is met for s3
  2. Job commit. List all task attempt directory trees, rename dirs/files to destination, one by one. Expects listing and rename to be fast.

The v2 commit algorithm is

  1. Task commit: list of all files in task attempt src tree, and rename one by one to dest dir. This is not atomic, and does not meet spark's requirements.
  2. job commit. write 0 byte _SUCCESS file.

The blog author is correct in noting that v1 is slow in job commit. It's real issue is not performance though, it is correctness due to task commit not being atomic on s3.

However, v2 is incorrect everywhere, even on hdfs, because the v2 task commit is non-atomic. Which is why, even if faster, you shouldn't use it. Anywhere. Really.

For s3 then, if you want to write data into classic "directory tree" layouts

  • ASF spark/hadoop releases: use the s3a committers built into recent hadoop-aws versions. read the hadoop docs to see how.
  • EMR use the EMR S3 committer.

Both of these avoid renames by writing the files to the final destination as S3 multipart uploads, but not finishing the uploads until job commit. This makes job commit faster as it is nothing but listing/loading the single manifest file created by each task attempt (which lists its incomplete uploads), then POSTing the completion. No renames, and ask task commit is a PUT of a JSON file, fast and atomic.

If true, same for ADLS2?

v1 works there though as listing is slow and rename not great, it is a bit slower than HDFS. It can throttle under the load of a job commit with the odd "quirky" failure wherein renames are reported as 503/throttle but in fact take place...this complicates revoer.

Hadoop 3.3.5 adds an Intermediate Manifest committer for performance on Azure and Google GCS. These also commit work by writing a manifest in task commit. Job commit is parallelised list/load of these, then parallelized rename. View it as a v3 commit algorithm.

  • GCS: task commit becomes atomic and fast (its dir rename is nonatomic O(files), which is why v1 is unsafe)
  • ABFS: does listing in task commit, so avoids it job commit (time, IOPs), rename is parallelised and yet rate limited (scale) and by recording the etags of source files, capable of recovering from throttle-related rename failure misreporting (i.e. if dest file exists and etag==source etag all is good)

Finally, there's cloud first formats: Iceberg, delta lake, Hudi, These commit jobs atomically by writing a single manifest file somewhere; query planning becomes the work of listing/loading the chain of manifest files, so identifying data files to process. These are broadly recognised by everyone who works in the problem of spark/hive cloud performance as the future. If you can use those your life is better.

Further reading:

The whole mechanism for committing work to persistent storage in the presence of failures is a fascinating piece of distributed computing. If you read the Zero Rename Committer paper, the final chapter actually discusses where things still went wrong in production. This is a better read in hindsight than it was at the time. Everyone should document their production problems.

  • Related