I'm using StructuredStreaming .. i have a pyspark dataframe which i need to write to Kafka.
Schema of the dataframe is shown below:
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = false)
| |-- end: timestamp (nullable = false)
|-- processedAlarmCnt: integer (nullable = false)
|-- totalAlarmCnt: integer (nullable = false)
I my current code, i'm converting the pyspark DataFrame to pandas, looping over each row, adding the data to a hashmap
def writeCountToKafka(df):
if df.count()>0:
hm = {}
df_pandas = df.toPandas()
for _, row in df_pandas.iterrows():
hm["window"] = [datetime.timestamp(row["window"]["start"]),datetime.timestamp(row["window"]["end"])]
hm["processedAlarmCnt"] = row["processedAlarmCnt"]
hm["totalAlarmCnt"] = row["totalAlarmCnt"]
# Python Kafka Producer
kafka_producer.send(topic_count, json.dumps(mymap).encode('utf-8'))
kafka_producer.flush()
Few questions:
How do i make this code more efficient - possibly not having to loop over every row to get the values, and store in hashmap ?
Does it make sense to use StructuredStreaming Kafka Producer instead of the python KafkaProducer (import - from kafka import KafkaProducer) ? With the StructuredStreaming kafka producer (i.e. , it requires a "value", seems i cannot cast the window(struct) as value... so not sure what should be put as "value" ?
What is the best way to design/code this ?
tia!
CodePudding user response:
You don't need pandas. Spark should be able to do everything you need to transform your data. Using loops over Dataframe rows is almost always a sign you've done something wrong
No, don't import KafkaProducer library; in fact, you don't need any other Python library installed to produce to Kafka. As written in the Spark Structured Streaming documentation, your dataframe needs to only contain a
value
column of type bytes or str (key / topic / timestamp columns are all optional).
You need to define a UDF function that accepts a Struct and serializes the three root columns into a single value
(as json string, or any other type)