I've a GCP Dataproc cluster, and i'm trying to deploy a pyspark job, which produces to a topic using SSL.
the pem files are stored in bucket gs://dataproc_kafka_code/code, and i'm accessing the pem files in the code shown below. However, the code is not able to find the pem files, the error is as shown below :
%3|1638738651.097|SSL|rdkafka#producer-1| [thrd:app]: error:02001002:system library:fopen:No such file or directory: fopen('gs://dataproc_kafka_code/code/caroot.pem','r')
%3|1638738651.097|SSL|rdkafka#producer-1| [thrd:app]: error:2006D080:BIO routines:BIO_new_file:no such file
Traceback (most recent call last):
File "/tmp/my-job6/KafkaProducer.py", line 21, in <module>
producer = Producer(conf)
cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Failed to create producer: ssl.ca.location failed: error:0B084002:x509 certificate routines:X509_load_cert_crl_file:system lib"}
Code :
from confluent_kafka import Producer
kafkaBrokers='<host>:<port>'
# CA Root certificate ca.crt
caRootLocation='gs://dataproc_kafka_code/code/caroot.pem'
# user public (user.crt)
certLocation='gs://dataproc_kafka_code/code/my-bridge-user-crt.pem'
# user.key
keyLocation='gs://dataproc_kafka_code/code/user-with-certs.pem'
password='<password>'
conf = {'bootstrap.servers': kafkaBrokers,
'security.protocol': 'SSL',
'ssl.ca.location':caRootLocation,
'ssl.certificate.location': certLocation,
'ssl.key.location':keyLocation,
'ssl.key.password' : password
}
topic = 'my-topic'
producer = Producer(conf)
for n in range(100):
producer.produce(topic, key=str(n), value=" val -> " str(n*(-1)) " on dec 5 from dataproc ")
producer.flush()
What needs to be done to fix this ? Also, is this the right way to provide the code access to the SSL certs ?
tia!
CodePudding user response:
From error
fopen:No such file or directory: fopen('gs://dataproc_kafka_code/code/caroot.pem','r')
, seems like the Producer
library is trying to download the file from local filesystem.
There are couple of ways you can try to fix this by downloading these keys/certificates to local files and then pointing the conf to them:
- Download using storage client API https://googleapis.dev/python/storage/latest/client.html
- Or use gsutil (comes preinstalled in the VM) to download the files https://cloud.google.com/storage/docs/gsutil/commands/cp