Home > front end >  Spark magic output committer settings not recognized
Spark magic output committer settings not recognized

Time:01-03

I'm trying to play around with different Spark output committer settings for s3, and wanted to try out the magic committer. So far I didn't manage to get my jobs to use the magic committer, and they always seem to fall back on the file output committer.

The Spark job I'm running is a simple PySpark test job that runs a simple query, repartitions the data and outputs parquet to s3:

df = spark.sql("select * from some_table where some_condition")

df.write \
  .partitionBy("some_column") \
  .parquet("s3://some-bucket/some-folder", mode="overwrite")

The relevant spark settings are (taken from the Spark UI, job's environment tab):

spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a   org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.hadoop.fs.s3a.committer.magic.enabled true
spark.hadoop.fs.s3a.committer.name  magic
spark.hadoop.fs.s3a.committer.staging.tmp.path  tmp/staging
spark.hadoop.fs.s3a.committer.staging.unique-filenames  true
spark.sql.parquet.output.committer.class    org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass   org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
mapreduce.output.fileoutputformat.compress  false
mapreduce.output.fileoutputformat.compress.codec    org.apache.hadoop.io.compress.DefaultCodec
mapreduce.output.fileoutputformat.compress.type RECORD
mapreduce.outputcommitter.factory.scheme.s3a    org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
mapreduce.fileoutputcommitter.algorithm.version 1
mapreduce.fileoutputcommitter.task.cleanup.enabled  false
mapreduce.outputcommitter.factory.scheme.s3a    org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory

Hadoop properties:

fs.s3a.committer.magic.enabled  true
fs.s3a.committer.name   magic

(Let me know if any other settings are relevant)

I'm basing the observation of file committer being used instead of magic committer on a couple of things:

  1. Different log lines produced by the spark job seem to indicate the file output committer being used:
"class":"org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter","file_line":"FileOutputCommitter.java:601","func":"commitTask","message":"Saved output of task 'attempt_2021...' to s3://some-bucket/some-folder/_temporary/0/
task_2021..."
"class":"org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat","file_line":"ParquetFileFormat.scala:54","message":"U
sing user defined output committer for Parquet: org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter"      
"class":"org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter","file_line":"FileOutputCommitter.java:141","func":"<init>","message":"File Outpu
t Committer Algorithm version is 1"                                                                                  
"class":"org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter","file_line":"FileOutputCommitter.java:156","func":"<init>","message":"FileOutput
Committer skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false"    
  1. When setting the file committer's algo to an invalid number, like so:
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version -7

an exception is raised from the file committer's constructor saying the value is invalid - implicating that the file committer was initialized instead of the magic committer.

I'm not seeing any logs indicating usage of the magic committer, or any failure to initialize a committer which could explain falling back on the file committer.

Spark version is 3.1.2 using this spark-hadoop-cloud JAR. Let me know if there's any other officially published JAR I can try or if there are any other log indications that may be relevant.

Any thoughts?

===== EDIT:

Below is the stack trace I see when setting the file committer algo to an invalid value. It seems that the call to org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.setupCommitter ends up calling org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory.createOutputCommitter which in turn initializes the incorrect type org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter instead of the configured type org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

    Py4JJavaError: An error occurred while calling o259.parquet.
: java.io.IOException: Only 1 or 2 algorithm version is supported
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:143)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:117)
    at org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.createFileOutputCommitter(PathOutputCommitterFactory.java:134)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory.createOutputCommitter(FileOutputCommitterFactory.java:35)
    at org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.createCommitter(PathOutputCommitterFactory.java:201)
    at org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.setupCommitter(PathOutputCommitProtocol.scala:88)
    at org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.setupCommitter(PathOutputCommitProtocol.scala:49)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:177)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:874)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

CodePudding user response:

Mystery solved - the failure to initialize the magic committer was due to a mismatch between the committer factory scheme setting to the scheme of the actual destination URL. Consider this:

The committer factory configuration was set using the key: spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a - meaning that the setting is made for s3a protocol URLs.

While th URL sent to the write method was: s3://some-bucket/some-folder - using s3 protocol instead of s3a.

The PathOutputCommitterFactory hadoop class searches for a config key with pattern mapreduce.outputcommitter.factory.scheme.%s to recognize which factory to use for the given output URL. In case the pattern set in the config key (in this case s3a) does not match the pattern in the destination URL (in this case s3) - the committer factory setting will not be recognized and the factory type will fall back on FileOutputCommitter.

Solution - make sure the outputcommitter.factory.scheme.<protocol> setting matches the protocol in the destination URL. I've successfully tested using both s3 and s3a in the URL & config key.

CodePudding user response:

this does sound like a binding problem but I cannot see immediately where it is. At a glance you have all the right settings.

The easiest way to check that an S3 a committee is being used is to look at the _SUCCESS file . If it is a piece of JSON then a new committer was used… The text inside will then tell you more about the committer.

a 0 byte file means that the classic file output committer was still used

  • Related