Home > OS >  Databricks streaming query from Kafka endpoint
Databricks streaming query from Kafka endpoint

Time:08-30

I'm familiar with reading Eventhubs using databricks streaming queries. We now have a use case of reading a Kafka endpoint from a 3rd party provider.

I'm exploring Kafka and created a test topic via https://api.cloudkarafka.com/. This has given me a topic, server and a userid and password. I successfully managed to send and consume events using python notebook. I'm trying to extend this to using spark to read the topic. When I check the documentation, it doesn't mention userid and password as configuration options.

I tried using the below code on Azure databricks to print out the message on the console but when I execute this, it seems to be stuck in the Stream Initializing status and doesn't really start the job. I have a feeling I may be missing some options but since this is my first time using Kafka would appreciate assistance from anyone who has this knowledge already.

from pyspark.sql.functions import *
from pyspark.sql.types import *


KAFKA_TOPIC_NAME = "xxxxxxxxxxx"
KAFKA_BOOTSTRAP_SERVER = "xxxxxxxxxxxxxxxxxxxxxxxx"
sampleDataframe = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)
        .option("subscribe", KAFKA_TOPIC_NAME)
        .option("startingOffsets", "latest")
        .option("security.protocol", "SASL_SSL")
        .option("sasl.mechanisms", "SCRAM-SHA-256")
        .option("sasl.username", "xxxxxxxx")
        .option("sasl.password", "xxxxxxxxxxx")
        .load()
    )

base_df = sampleDataframe.selectExpr("CAST(value as STRING)", "timestamp")
base_df.printSchema()

base_df.select("*") \
         .writeStream \
         .outputMode("append") \
         .format("console") \
         .trigger(processingTime='5 seconds') \
         .start() 

EDIT I managed to consume messages using a simple python code but my requirements are to use databricks and use spark.

import sys
import os

from confluent_kafka import Consumer, KafkaException, KafkaError

if __name__ == '__main__':
    topics = topic.split(",")

    # Consumer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    conf = {
        'bootstrap.servers': server,
        'group.id': "%s-consumer" % username,
        'session.timeout.ms': 6000,
        'default.topic.config': {'auto.offset.reset': 'smallest'},
        'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'SCRAM-SHA-256',
        'sasl.username': username,
        'sasl.password': password
    }

    c = Consumer(**conf)
    c.subscribe(topics)
    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                # Error or event
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    # Error
                    raise KafkaException(msg.error())
            else:
                # Proper message
                sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())

    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by user\n')

    # Close down consumer to commit final offsets.
    c.close()

EDIT2 So I finally managed to read my Kafka topic with the below code.

%python
streamingInputDF = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", host)  \
  .option("kafka.security.protocol", "SASL_SSL") \
  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username='{}' password='{}';".format(userid, password)) \
  .option("kafka.ssl.endpoint.identification.algorithm", "https") \
  .option("kafka.sasl.mechanism", sasl_mech) \
  .option("startingOffsets", "earliest") \
  .option("failOnDataLoss", "false") \
  .option("subscribe", topic) \
  .load()

query = (
  streamingInputDF
    .writeStream
    .format("console")        # memory = store in-memory table (for testing only)
    .queryName("raw")        # raw = name of the in-memory table
    .outputMode("append")    # append = add new events#
    .trigger(processingTime='5 seconds')
    .start()
)

However, when I tried to write the dataframe to ADLS gen1, I get a "java.lang.NullPointerException" error.

file_path = '/mnt/test/'
streamingInputDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
    .writeStream \
    .format("json") \
    .option("path", file_path) \
    .outputMode("append") \
    .option("checkpointLocation", file_path) \
    .start()

CodePudding user response:

So I finally managed to get this working. The issue was that instead of putting a folder as the path, I needed to put the actual file name. For my testing purpose I put a fully qualified name but in production I would attach a date and time stamp for each file being written.

%python
userid = "qxxx"
password = "xxxxxxxxxxxxxxxxxx"
host = "rocket-xxxxxxxxxxxxxx.com:9094"
topic = "qxxxxxxxxxxxxxxx"
sasl_mech = "SCRAM-SHA-256"

streamingInputDF = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", host)  \
  .option("kafka.security.protocol", "SASL_SSL") \
  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username='{}' password='{}';".format(userid, password)) \
  .option("kafka.ssl.endpoint.identification.algorithm", "https") \
  .option("kafka.sasl.mechanism", sasl_mech) \
  .option("startingOffsets", "earliest") \
  .option("failOnDataLoss", "false") \
  .option("subscribe", topic) \
  .load()

file_path = '/mnt/test/test.json'
checkpoint = '/mnt/test/check.txt'
streamingInputDF.selectExpr("CAST(value AS STRING)")\
    .writeStream \
    .format("json") \
    .option("path", file_path) \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint) \
    .start()
  • Related