I have a problem with pySpark configuration when writing data inside a ceph bucket. With the following Python code snippet I can read data from the Ceph bucket but when I try to write inside the bucket, I get the following error:
22/07/22 10:00:58 DEBUG S3ErrorResponseHandler: Failed in parsing the error response :
org.apache.hadoop.shaded.com.ctc.wstx.exc.WstxEOFException: Unexpected EOF in prolog
at [row,col {unknown-source}]: [1,0]
at org.apache.hadoop.shaded.com.ctc.wstx.sr.StreamScanner.throwUnexpectedEOF(StreamScanner.java:701)
at org.apache.hadoop.shaded.com.ctc.wstx.sr.BasicStreamReader.handleEOF(BasicStreamReader.java:2217)
at org.apache.hadoop.shaded.com.ctc.wstx.sr.BasicStreamReader.nextFromProlog(BasicStreamReader.java:2123)
at org.apache.hadoop.shaded.com.ctc.wstx.sr.BasicStreamReader.next(BasicStreamReader.java:1179)
at com.amazonaws.services.s3.internal.S3ErrorResponseHandler.createException(S3ErrorResponseHandler.java:122)
at com.amazonaws.services.s3.internal.S3ErrorResponseHandler.handle(S3ErrorResponseHandler.java:71)
at com.amazonaws.services.s3.internal.S3ErrorResponseHandler.handle(S3ErrorResponseHandler.java:52)
[...]
22/07/22 10:00:58 DEBUG request: Received error response: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: null; S3 Extended Request ID: null; Proxy: null), S3 Extended Request ID: null
22/07/22 10:00:58 DEBUG AwsChunkedEncodingInputStream: AwsChunkedEncodingInputStream reset (will reset the wrapped stream because it is mark-supported).
Pyspark code (not working):
from pyspark.sql import SparkSession
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages com.amazonaws:aws-java-sdk-bundle:1.12.264,org.apache.spark:spark-sql-kafka-0-10_2.13:3.3.0,org.apache.hadoop:hadoop-aws:3.3.3 pyspark-shell"
spark = (
SparkSession.builder.appName("app") \
.config("spark.hadoop.fs.s3a.access.key", access_key) \
.config("spark.hadoop.fs.s3a.secret.key", secret_key) \
.config("spark.hadoop.fs.s3a.connection.timeout", "10000") \
.config("spark.hadoop.fs.s3a.endpoint", "http://HOST_NAME:88") \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.endpoint.region", "default") \
.getOrCreate()
)
spark.sparkContext.setLogLevel("TRACE")
# This works
spark.read.csv("s3a://test-data/data.csv")
# This throws the provided error
df_to_write = spark.createDataFrame([{"a": "x", "b": "y", "c": "3"}])
df_to_write.write.csv("s3a://test-data/with_love.csv")
Also, referring to the same ceph bucket, I am able to read and write data to the bucket via boto3:
import boto3
from botocore.exceptions import ClientError
from botocore.client import Config
config = Config(connect_timeout=20, retries={'max_attempts': 0})
s3_client = boto3.client('s3', config=config,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name="defaut",
endpoint_url='http://HOST_NAME:88',
verify=False
)
response = s3_client.list_buckets()
# Read
print('Existing buckets:')
for bucket in response['Buckets']:
print(f' {bucket["Name"]}')
# Write
dummy_data = b'Dummy string'
s3_client.put_object(Body=dummy_data, Bucket='test-spark', Key='awesome_key')
Also s3cmd with the same configuration is working fine.
I think I'm missing some pyspark (hadoop-aws) configuration, could anyone help me in identifying the configuration problem? Thanks.
CodePudding user response:
After some research on the web, I was able to solve the problem using this hadoop-aws configuration:
fs.s3a.signing-algorithm: S3SignerType
I configured this property in pySpark with:
spark = (
SparkSession.builder.appName("app") \
.config("spark.hadoop.fs.s3a.access.key", access_key) \
.config("spark.hadoop.fs.s3a.secret.key", secret_key) \
.config("spark.hadoop.fs.s3a.connection.timeout", "10000") \
.config("spark.hadoop.fs.s3a.endpoint", "http://HOST_NAME:88") \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.endpoint.region", "default") \
.config("spark.hadoop.fs.s3a.signing-algorithm", "S3SignerType") \
.getOrCreate()
)
From what I understand, the version of ceph I am using (16.2.3
), does not support the default signing algorithm used in Spark version v3.3.0
on hadoop version 3.3.2
.
For further details see this documentation.