I'm trying to implement a continuous data generator from Databricks to an Event Hub.
My idea was to generate some data in a .csv
file and then create a data frame with the data. In a loop I call a function that executes a query to stream that data to the Event Hub. Not sure if the idea was good or if spark can handle writing from the same data frame or if I understood correctly how queries work.
The code looks like this:
def write_to_event_hub(
df: DataFrame,
topic: str,
bootstrap_servers: str,
config: str,
checkpoint_path: str,
):
return (
df.writeStream.format("kafka")
.option("topic", topic)
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", config)
.option("checkpointLocation", checkpoint_path)
.trigger(once=True)
.start()
)
while True:
query = write_to_event_hub(
streaming_df,
topic,
bootstrap_servers,
sasl_jaas_config,
"/checkpoint",
)
query.awaitTermination()
print("Wrote once")
time.sleep(5)
I want to mention that this is how I read data from the CSV file (I have it in DBFS) and I also have the schema for it:
streaming_df = (
spark.readStream.format("csv")
.option("header", "true")
.schema(location_schema)
.load(f"{path}")
)
It looks like no data is written event though I have the message "Wrote once" printed. Any ideas how to handle this? Thank you!
CodePudding user response:
The problem is that you're using readStream
to get the CSV data, so it will wait until new data will be pushed to the directory with CSV files. But really, you don't need to use readStream
/writeStream
- Kafka connector works just fine in batch mode, so your code should be:
df = read_csv_file()
while True:
write_to_kafka(df)
sleep(5)