Home > Software design >  When writing to Eventhub from PySpark, can topics be dynamic?
When writing to Eventhub from PySpark, can topics be dynamic?

Time:08-24

Here is my data:

 --- ----------- ----- 
|key|animal_type|value|
 --- ----------- ----- 
|123|        cat|meows|
|456|        dog|barks|
 --- ----------- ----- 

I am currently writing to Eventhub from databricks like so:

(df.select("key","value").writeStream
 .format("kafka")
 .option("kafka.bootstrap.servers", bootstrap_server)
 .option("topic","cats").option("kafka.security.protocol", "SASL_SSL")
 .option("kafka.sasl.mechanism", "PLAIN")
 .option("kafka.sasl.jaas.config", connection_string)
 .option("kafka.request.timeout.ms", "3600000")
 .option("checkpointLocation", checkpoint_path)
 .start() )

My challenge is, since the dataframe contains records about both cats and dogs, I don't want to hardcode the topic as 'cats' since sometimes it will be 'dogs'. Instead, I would like a way for EventHub to dynamically assign the topic based on the value in the column animal_type.

Is this possible? Or do I need to have a single dataframe/write-config per topic?

CodePudding user response:

You need to alias the animal_type column to topic prior to writing, then remove the topic option.

Refer Spark Structured Streaming documentation for other ways to structure the dataframe when writing to Kafka API

  • Related