Home > OS >  How to writestream to specific kafka cluster in Azure databricks? " Topic mytopic not present i
How to writestream to specific kafka cluster in Azure databricks? " Topic mytopic not present i

Time:10-25

I am trying to write data to Kafka with the writestream method. I have been given the following properties by the source system.

topic = 'mytopic'
host = "myhost.us-west-1.aws.confluent.cloud:9092"
userid = 'myuser'
password ='mypassword'  
Cluster = 'cluster-numeric-test-03'

While i found many exemples of uses of topic,host,userid,password on stackoverflow, i can barely find any documentation on cluster. I tried with the following paramater that i can see in this part of the documentation : "spark.kafka.clusters.cluster.auth.bootstrap.servers" https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#configuration

I am hence using the below code to connect :

(
df.select("key", "value","partition")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("topic", topic)
.trigger(availableNow=True)
.option("kafka.sasl.jaas.config",
     'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{}" password="{}";'.format(userid, password)) 
.option("checkpointLocation", "/mnt/Sales/Markerting/Whiteboards/temp/")
.option('spark.kafka.clusters.cluster.auth.bootstrap.servers',Cluster)
.start()
        )

I am getting the following error :

kafkashaded.org.apache.kafka.common.errors.TimeoutException: Topic mytopic not present in metadata after 60000 ms.

CodePudding user response:

Actually i was missing some stuff there.

.option("kafka.security.protocol", "SASL_SSL") .option("kafka.sasl.mechanism", "PLAIN")

And it works without adding the cluster details.

I am hence using the below code to connect :

    **(
    df.select("key", "value","partition")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", host)
    .option("topic", topic)
    .trigger(availableNow=True)
    .option("kafka.sasl.jaas.config",
         'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{}" password="{}";'.format(userid, password)) 
    .option("checkpointLocation", "/mnt/Sales/Markerting/Whiteboards/temp/")
    .option("kafka.security.protocol", "SASL_SSL") 
  .option("kafka.sasl.mechanism", "PLAIN") 
    .start()
            )**
  • Related