Home > front end >  structured streaming - writing dataframe into Kafka row by row, dataframe has a struct column
structured streaming - writing dataframe into Kafka row by row, dataframe has a struct column

Time:02-22

I'm using StructuredStreaming .. i have a pyspark dataframe which i need to write to Kafka.

Schema of the dataframe is shown below:

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = false)
 |    |-- end: timestamp (nullable = false)
 |-- processedAlarmCnt: integer (nullable = false)
 |-- totalAlarmCnt: integer (nullable = false)

I my current code, i'm converting the pyspark DataFrame to pandas, looping over each row, adding the data to a hashmap

def writeCountToKafka(df):
       if df.count()>0:
          hm = {}
          df_pandas = df.toPandas()
          for _, row in df_pandas.iterrows():
               hm["window"] = [datetime.timestamp(row["window"]["start"]),datetime.timestamp(row["window"]["end"])]
               hm["processedAlarmCnt"] = row["processedAlarmCnt"]
               hm["totalAlarmCnt"] = row["totalAlarmCnt"]
               
               # Python Kafka Producer
               kafka_producer.send(topic_count, json.dumps(mymap).encode('utf-8'))
                    kafka_producer.flush()

Few questions:

  1. How do i make this code more efficient - possibly not having to loop over every row to get the values, and store in hashmap ?

  2. Does it make sense to use StructuredStreaming Kafka Producer instead of the python KafkaProducer (import - from kafka import KafkaProducer) ? With the StructuredStreaming kafka producer (i.e. , it requires a "value", seems i cannot cast the window(struct) as value... so not sure what should be put as "value" ?

What is the best way to design/code this ?

tia!

CodePudding user response:

  1. You don't need pandas. Spark should be able to do everything you need to transform your data. Using loops over Dataframe rows is almost always a sign you've done something wrong

  2. No, don't import KafkaProducer library; in fact, you don't need any other Python library installed to produce to Kafka. As written in the Spark Structured Streaming documentation, your dataframe needs to only contain a value column of type bytes or str (key / topic / timestamp columns are all optional).

You need to define a UDF function that accepts a Struct and serializes the three root columns into a single value (as json string, or any other type)

  • Related