Home > database >  structred Spark Streaming : writeStream display null dataframe
structred Spark Streaming : writeStream display null dataframe

Time:09-27

I have a task of real time processing with structred spark streaming python so the first step was ingest csv file into kafka topic : Done

the second step was readStream from the topic kafka

    df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
  .option("subscribe", kafka_topic_name) \
  .option("startingOffsets", "latest")\
  .load()

then I use schema to cast my columns of my dataframe

from pyspark.sql.types import *

schema = StructType() \
      .add("DriverId",IntegerType(),True) \
      .add("time",TimestampType(),True) \
      .add("Longitude",DoubleType(),True) \
      .add("Latitude",DoubleType(),True) \
      .add("SPEED",DoubleType(),True) \
      .add("EngineSpeed",IntegerType(),True) \
      .add("MAF",IntegerType(),True) \
      .add("FuelType",IntegerType(),True) \

the second step was to display on console what I stream juste to see if I am on the right way :

query1 = df\
    .writeStream\
    .format("console")\
    .outputMode("append")\
    .option("truncate", False)\
    .start()\
    .awaitTermination()
   

but the result was a dataframe withe null values enter image description here

So , I have returned to schema and cast all column to stringType the result was :

enter image description here

Something like a json !

**my Question is ** how can I cast right my dataframe and how to display under each column the value and note like format json

CodePudding user response:

  • When spark reads data from kafka, it creates a dataframe with 2 columns - key and value (These correspond to the key and value you send to kafka

  • The initial data type of these 2 columns is ByteType

  • You must convert them to StringType

    df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    
  • Then, assuming your value column is a json string, you can convert it to a struct type with the schema you have specified

    df = df.select(F.from_json(df.value,schema=my_schema).alias("value"))
    df = df.selectExpr("value.*")
    

More info is available here

  • Related