Here is my data:
--- ----------- -----
|key|animal_type|value|
--- ----------- -----
|123| cat|meows|
|456| dog|barks|
--- ----------- -----
I am currently writing to Eventhub from databricks like so:
(df.select("key","value").writeStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server)
.option("topic","cats").option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config", connection_string)
.option("kafka.request.timeout.ms", "3600000")
.option("checkpointLocation", checkpoint_path)
.start() )
My challenge is, since the dataframe contains records about both cats and dogs, I don't want to hardcode the topic as 'cats' since sometimes it will be 'dogs'. Instead, I would like a way for EventHub to dynamically assign the topic based on the value in the column animal_type.
Is this possible? Or do I need to have a single dataframe/write-config per topic?
CodePudding user response:
You need to alias the animal_type column to topic
prior to writing, then remove the topic
option.
Refer Spark Structured Streaming documentation for other ways to structure the dataframe when writing to Kafka API