Home > front end >  GCP Dataproc - Failed to construct kafka consumer, Failed to load SSL keystore dataproc.jks of type
GCP Dataproc - Failed to construct kafka consumer, Failed to load SSL keystore dataproc.jks of type

Time:02-03

I'm trying to run a Structured Streaming program on GCP Dataproc, which accesses the data from Kafka and prints it.

Access to Kafka is using SSL, and the truststore and keystore files are stored in buckets. I'm using Google Storage API to access the bucket, and store the file in the current working directory. The truststore and keystores are passed onto the Kafka Consumer/Producer. However - i'm getting an error

Command :

gcloud dataproc jobs submit pyspark /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb2-v2.py  --cluster dataproc-ss-poc  --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 --region us-central1

Code is shown below :

spark = SparkSession.builder.appName('StructuredStreaming_VersaSase').getOrCreate()

kafkaBrokers='<broker-ip>:9094'
topic = "versa-sase"
security_protocol="SSL"

# Google Storage API to access the keys in the buckets
client = storage.Client()
bucket = client.get_bucket('ssl-certs-karan')

blob_ssl_truststore = bucket.get_blob('cap12.jks')
ssl_truststore_location = '{}/{}'.format(os.getcwd(), blob_ssl_truststore.name) 
blob_ssl_truststore.download_to_filename(ssl_truststore_location)

ssl_truststore_password="<ssl_truststore_password>"

blob_ssl_keystore = bucket.get_blob('dataproc-versa-sase-p12-1.jks')
ssl_keystore_location = '{}/{}'.format(os.getcwd(), blob_ssl_keystore.name) 
blob_ssl_keystore.download_to_filename(ssl_keystore_location)


ssl_keystore_password="<ssl_keystore_password>"
consumerGroupId = "versa-sase-grp"
checkpoint = "gs://ss-checkpoint/"

print(" SPARK.SPARKCONTEXT -> ", spark.sparkContext)


df = spark.read.format('kafka')\
    .option("kafka.bootstrap.servers",kafkaBrokers)\
    .option("kafka.security.protocol","SSL") \
    .option("kafka.ssl.truststore.location",ssl_truststore_location) \
    .option("kafka.ssl.truststore.password",ssl_truststore_password) \
    .option("kafka.ssl.keystore.location", ssl_keystore_location)\
    .option("kafka.ssl.keystore.password", ssl_keystore_password)\
    .option("subscribe", topic) \
    .option("kafka.group.id", consumerGroupId)\
    .option("startingOffsets", "earliest") \
    .load()

   print(" df -> ", df)
   query = df.selectExpr("CAST(value AS STRING)", "CAST(key AS STRING)", "topic", "timestamp") \
    .write \
    .format("console") \
    .option("numRows",100)\
    .option("checkpointLocation", checkpoint) \
    .option("outputMode", "complete")\
    .option("truncate", "false") \
    .save("output")

Error :

Traceback (most recent call last):
  File "/tmp/3e7304f8e27d4436a2f382280cebe7c5/StructuredStreaming_Kafka_GCP-Batch-feb2-v2.py", line 83, in <module>
    query = df.selectExpr("CAST(value AS STRING)", "CAST(key AS STRING)", "topic", "timestamp") \
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1109, in save
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError22/02/02 23:11:08 DEBUG org.apache.hadoop.ipc.Client: IPC Client (1416219052) connection to dataproc-ss-poc-m/10.128.0.78:8030 from root sending #171 org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB.allocate
22/02/02 23:11:08 DEBUG org.apache.hadoop.ipc.Client: IPC Client (1416219052) connection to dataproc-ss-poc-m/10.128.0.78:8030 from root got value #171
22/02/02 23:11:08 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine: Call: allocate took 2ms
: An error occurred while calling o84.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (dataproc-ss-poc-w-0.c.versa-kafka-poc.internal executor 1): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)
    at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.createConsumer(KafkaDataConsumer.scala:124)
    at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.<init>(KafkaDataConsumer.scala:61)
    at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:206)
    at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:201)
    at org.apache.commons.pool2.BaseKeyedPooledObjectFactory.makeObject(BaseKeyedPooledObjectFactory.java:60)
    at org.apache.commons.pool2.impl.GenericKeyedObjectPool.create(GenericKeyedObjectPool.java:1041)
    at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:342)
    at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:265)
    at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool.borrowObject(InternalKafkaConsumerPool.scala:84)
    at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.retrieveConsumer(KafkaDataConsumer.scala:573)
    at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.getOrRetrieveConsumer(KafkaDataConsumer.scala:558)
    at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$getAvailableOffsetRange$1(KafkaDataConsumer.scala:359)
    at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
    at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:618)
    at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.getAvailableOffsetRange(KafkaDataConsumer.scala:358)
    at org.apache.spark.sql.kafka010.KafkaSourceRDD.resolveRange(KafkaSourceRDD.scala:123)
    at org.apache.spark.sql.kafka010.KafkaSourceRDD.compute(KafkaSourceRDD.scala:75)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
...

Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/3e7304f8e27d4436a2f382280cebe7c5/dataproc-versa-sase-p12-1.jks of type JKS
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:377)
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.<init>(DefaultSslEngineFactory.java:349)
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:299)
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:161)
    at org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:138)
    at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
    at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:74)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:737)
    ... 53 more
Caused by: java.nio.file.NoSuchFileException: /tmp/3e7304f8e27d4436a2f382280cebe7c5/dataproc-versa-sase-p12-1.jks
    at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
    at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
    at java.nio.file.Files.newByteChannel(Files.java:361)
    at java.nio.file.Files.newByteChannel(Files.java:407)
    at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
    at java.nio.file.Files.newInputStream(Files.java:152)
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:370)

From my mac, I'm using PKCS files (.p12) and am able to access the Kafka cluster in SSL mode. However, in Dataproc - it seems the expected file format is JKS.

here is the command i used to convert .p12 file to JKS format:

keytool -importkeystore -srckeystore dataproc-versa-sase.p12 -srcstoretype pkcs12 -srcalias versa-sase-user -destkeystore dataproc-versa-sase-p12-1.jks -deststoretype jks -deststorepass <password> -destalias versa-sase-user

What needs to be done to fix this ? it seems the JKS file is not accessible to the Spark program ?

tia!

CodePudding user response:

I would add the following option if you want to use jks

.option("kafka.ssl.keystore.type", "JKS")
.option("kafka.ssl.truststore.type", "JKS")

Also this will work with PKCS12 by the way

.option("kafka.ssl.keystore.type", "PKCS12")
.option("kafka.ssl.truststore.type", "PKCS12")

CodePudding user response:

I faced the same kind of issues in the past, and in my case, it was a JDK version issue: the JDK version which created the JKS certificate was greater than the Java version that was reading the JKS certificate (in your case, Dataproc).

  •  Tags:  
  • Related