Home > Software design >  Can I send messages to KAFKA cluster via Azure Databricks as a batch job (close my connection once t
Can I send messages to KAFKA cluster via Azure Databricks as a batch job (close my connection once t

Time:10-23

I want to send messages once a day to Kafka via Azure Databricks. I want the messages received as a batch job.

I need to send them to a kafka server, but we don't want to have a cluster on all day running for this job.

I saw the databricks writeStream method (i can't make it work yet, but that is not the purpose of my question). It looks like i need to be streaming day and night to make it run.

Is there a way to use it as a batch job? Can i send the messages to Kafka server, and close my cluster once they are received?

df = spark \
        .readStream \
        .format("delta") \
        .option("numPartitions", 5) \
        .option("rowsPerSecond", 5) \
        .load('/mnt/sales/marketing/numbers/DELTA/')

(df.select("Sales", "value")
 .writeStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "rferferfeez.eu-west-1.aws.confluent.cloud:9092")
 .option("topic", "bingofr")
 .option("kafka.sasl.username", "jakich")
 .option("kafka.sasl.password", 'ozifjoijfziaihufzihufazhufhzuhfzuoehza')
 .option("checkpointLocation", "/mnt/sales/marketing/numbers/temp/")
 .option("spark.kafka.clusters.cluster.sasl.token.mechanism", "cluster-buyit")
 .option("request.timeout.ms",30) \
 .option("includeHeaders", "true") \
 .start()
)

kafkashaded.org.apache.kafka.common.errors.TimeoutException: Topic bingofr not present in metadata after 60000 ms.

enter image description here

It is worth noting we also have event hub. Would i be better off sending messages to our event hub, and implement a triggered function that writes to kafka ?

CodePudding user response:

Normally KAFKA is a continuous service/capability. At least, where I have been.

I would consider a Cloud Service like AZURE where an Event Hub is used on a per message basis with KAFKA API used. Always on, pay per message.

Otherwise, you will need to have a batch job that starts KAFKA, do your execution, then stop KAFKA. You do not state of all on Databricks, though.

CodePudding user response:

Just want to elaborate on @Alex Ott comment as it seems to work.

By adding ".trigger(availableNow=True)",you can

"periodically spin up a cluster, process everything that is available since the last period, and then shutdown the cluster. In some case, this may lead to significant cost savings."

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

**(
df.select("key", "value","partition")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("topic", topic)
.trigger(availableNow=True)
.option("kafka.sasl.jaas.config",
     'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{}" password="{}";'.format(userid, password)) 
.option("checkpointLocation", "/mnt/Sales/Markerting/Whiteboards/temp/")
.option("kafka.security.protocol", "SASL_SSL") 
  • Related