Home > Back-end >  create a data frame from json coming from kafka using spark structured streaming in python
create a data frame from json coming from kafka using spark structured streaming in python

Time:03-12

I am new to spark's structured streaming and working on a poc that needs to be implemented on structured streaming.

input source : kafka input format: json language: python3 library: spark 3.2

I am trying to format incoming json in spark dataframe of a predefined structure.

So far I am able to fetch json events and able to get the results in console (not in expected format). It will be very helpful if you could nudge me in right direction or suggest a solution.

Below is my code so far.

json from kafka

{"property1" : "hello","property2" : "world"}

structured_kafka.py


"""
 Run the script
    `$ bin/spark-submit structured_kafka.py \
    host1:port1,host2:port2 subscribe topic1,topic2`
"""
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType


if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("""
        Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
        """, file=sys.stderr)
        sys.exit(-1)

    bootstrapServers = sys.argv[1]
    subscribeType = sys.argv[2]
    topics = sys.argv[3]

    spark = SparkSession\
        .builder\
        .appName("StructuredKafkaWordCount")\
        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

    
    schema = StructType([ 
        StructField("property1", StringType(), True),
        StructField("property2" , StringType(), True),
        ])


    lines = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", bootstrapServers)\
        .option(subscribeType, topics)\
        .load()\
        .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))


    df = lines.select('*')
  
    # Start running the query that prints the running counts to the console
    query = df\
        .writeStream\
        .outputMode('Append')\
        .format('console')\
        .start()

    query.awaitTermination()

output

Batch: 1
-------------------------------------------
 -------------------- 
|        parsed_value|
 -------------------- 
|{hello, world}      |
 -------------------- 

expected

 -------------------- -------------------- 
| property1          | property2          |
 -------------------- -------------------- 
|hello               |world               |
 -------------------- ---------------------

If I could get the df in this format , I will be able to apply my usecase.

Kindly suggest.

note: I have looked all existing solutions, most of the solutions are either in scala or not for structured streaming or not for kafka as source.

CodePudding user response:

After line:

.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

add:

.select(col("parsed_value.property1"), col("parsed_value.property2"))

or:

.select(col("parsed_value.*"))
  • Related