I'm familiar with reading Eventhubs using databricks streaming queries. We now have a use case of reading a Kafka endpoint from a 3rd party provider.
I'm exploring Kafka and created a test topic via https://api.cloudkarafka.com/. This has given me a topic, server and a userid and password. I successfully managed to send and consume events using python notebook. I'm trying to extend this to using spark to read the topic. When I check the documentation, it doesn't mention userid and password as configuration options.
I tried using the below code on Azure databricks to print out the message on the console but when I execute this, it seems to be stuck in the Stream Initializing status and doesn't really start the job. I have a feeling I may be missing some options but since this is my first time using Kafka would appreciate assistance from anyone who has this knowledge already.
from pyspark.sql.functions import *
from pyspark.sql.types import *
KAFKA_TOPIC_NAME = "xxxxxxxxxxx"
KAFKA_BOOTSTRAP_SERVER = "xxxxxxxxxxxxxxxxxxxxxxxx"
sampleDataframe = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)
.option("subscribe", KAFKA_TOPIC_NAME)
.option("startingOffsets", "latest")
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanisms", "SCRAM-SHA-256")
.option("sasl.username", "xxxxxxxx")
.option("sasl.password", "xxxxxxxxxxx")
.load()
)
base_df = sampleDataframe.selectExpr("CAST(value as STRING)", "timestamp")
base_df.printSchema()
base_df.select("*") \
.writeStream \
.outputMode("append") \
.format("console") \
.trigger(processingTime='5 seconds') \
.start()
EDIT I managed to consume messages using a simple python code but my requirements are to use databricks and use spark.
import sys
import os
from confluent_kafka import Consumer, KafkaException, KafkaError
if __name__ == '__main__':
topics = topic.split(",")
# Consumer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {
'bootstrap.servers': server,
'group.id': "%s-consumer" % username,
'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'},
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'SCRAM-SHA-256',
'sasl.username': username,
'sasl.password': password
}
c = Consumer(**conf)
c.subscribe(topics)
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
# Error or event
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
# Error
raise KafkaException(msg.error())
else:
# Proper message
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
# Close down consumer to commit final offsets.
c.close()
EDIT2 So I finally managed to read my Kafka topic with the below code.
%python
streamingInputDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", host) \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username='{}' password='{}';".format(userid, password)) \
.option("kafka.ssl.endpoint.identification.algorithm", "https") \
.option("kafka.sasl.mechanism", sasl_mech) \
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.option("subscribe", topic) \
.load()
query = (
streamingInputDF
.writeStream
.format("console") # memory = store in-memory table (for testing only)
.queryName("raw") # raw = name of the in-memory table
.outputMode("append") # append = add new events#
.trigger(processingTime='5 seconds')
.start()
)
However, when I tried to write the dataframe to ADLS gen1, I get a "java.lang.NullPointerException" error.
file_path = '/mnt/test/'
streamingInputDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
.writeStream \
.format("json") \
.option("path", file_path) \
.outputMode("append") \
.option("checkpointLocation", file_path) \
.start()
CodePudding user response:
So I finally managed to get this working. The issue was that instead of putting a folder as the path, I needed to put the actual file name. For my testing purpose I put a fully qualified name but in production I would attach a date and time stamp for each file being written.
%python
userid = "qxxx"
password = "xxxxxxxxxxxxxxxxxx"
host = "rocket-xxxxxxxxxxxxxx.com:9094"
topic = "qxxxxxxxxxxxxxxx"
sasl_mech = "SCRAM-SHA-256"
streamingInputDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", host) \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username='{}' password='{}';".format(userid, password)) \
.option("kafka.ssl.endpoint.identification.algorithm", "https") \
.option("kafka.sasl.mechanism", sasl_mech) \
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.option("subscribe", topic) \
.load()
file_path = '/mnt/test/test.json'
checkpoint = '/mnt/test/check.txt'
streamingInputDF.selectExpr("CAST(value AS STRING)")\
.writeStream \
.format("json") \
.option("path", file_path) \
.outputMode("append") \
.option("checkpointLocation", checkpoint) \
.start()