I want to send messages once a day to Kafka via Azure Databricks. I want the messages received as a batch job.
I need to send them to a kafka server, but we don't want to have a cluster on all day running for this job.
I saw the databricks writeStream method (i can't make it work yet, but that is not the purpose of my question). It looks like i need to be streaming day and night to make it run.
Is there a way to use it as a batch job? Can i send the messages to Kafka server, and close my cluster once they are received?
df = spark \
.readStream \
.format("delta") \
.option("numPartitions", 5) \
.option("rowsPerSecond", 5) \
.load('/mnt/sales/marketing/numbers/DELTA/')
(df.select("Sales", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "rferferfeez.eu-west-1.aws.confluent.cloud:9092")
.option("topic", "bingofr")
.option("kafka.sasl.username", "jakich")
.option("kafka.sasl.password", 'ozifjoijfziaihufzihufazhufhzuhfzuoehza')
.option("checkpointLocation", "/mnt/sales/marketing/numbers/temp/")
.option("spark.kafka.clusters.cluster.sasl.token.mechanism", "cluster-buyit")
.option("request.timeout.ms",30) \
.option("includeHeaders", "true") \
.start()
)
kafkashaded.org.apache.kafka.common.errors.TimeoutException: Topic bingofr not present in metadata after 60000 ms.
It is worth noting we also have event hub. Would i be better off sending messages to our event hub, and implement a triggered function that writes to kafka ?
CodePudding user response:
Normally KAFKA is a continuous service/capability. At least, where I have been.
I would consider a Cloud Service like AZURE where an Event Hub is used on a per message basis with KAFKA API used. Always on, pay per message.
Otherwise, you will need to have a batch job that starts KAFKA, do your execution, then stop KAFKA. You do not state of all on Databricks, though.
CodePudding user response:
Just want to elaborate on @Alex Ott comment as it seems to work.
By adding ".trigger(availableNow=True)",you can
"periodically spin up a cluster, process everything that is available since the last period, and then shutdown the cluster. In some case, this may lead to significant cost savings."
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
**(
df.select("key", "value","partition")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("topic", topic)
.trigger(availableNow=True)
.option("kafka.sasl.jaas.config",
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{}" password="{}";'.format(userid, password))
.option("checkpointLocation", "/mnt/Sales/Markerting/Whiteboards/temp/")
.option("kafka.security.protocol", "SASL_SSL")