I am trying to write data to Kafka with the writestream method. I have been given the following properties by the source system.
topic = 'mytopic'
host = "myhost.us-west-1.aws.confluent.cloud:9092"
userid = 'myuser'
password ='mypassword'
Cluster = 'cluster-numeric-test-03'
While i found many exemples of uses of topic,host,userid,password on stackoverflow, i can barely find any documentation on cluster. I tried with the following paramater that i can see in this part of the documentation : "spark.kafka.clusters.cluster.auth.bootstrap.servers" https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#configuration
I am hence using the below code to connect :
(
df.select("key", "value","partition")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("topic", topic)
.trigger(availableNow=True)
.option("kafka.sasl.jaas.config",
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{}" password="{}";'.format(userid, password))
.option("checkpointLocation", "/mnt/Sales/Markerting/Whiteboards/temp/")
.option('spark.kafka.clusters.cluster.auth.bootstrap.servers',Cluster)
.start()
)
I am getting the following error :
kafkashaded.org.apache.kafka.common.errors.TimeoutException: Topic mytopic not present in metadata after 60000 ms.
CodePudding user response:
Actually i was missing some stuff there.
.option("kafka.security.protocol", "SASL_SSL") .option("kafka.sasl.mechanism", "PLAIN")
And it works without adding the cluster details.
I am hence using the below code to connect :
**(
df.select("key", "value","partition")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("topic", topic)
.trigger(availableNow=True)
.option("kafka.sasl.jaas.config",
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{}" password="{}";'.format(userid, password))
.option("checkpointLocation", "/mnt/Sales/Markerting/Whiteboards/temp/")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.start()
)**