Home > OS >  Parsing json multiline schema in spark streaming
Parsing json multiline schema in spark streaming

Time:08-30

I have been trying to define the schema for the incoming spark stream but its always returning NULLS below is the incoming schema, I am getting this by casting the binary values as string in the body (aka bodycast)

    [
   {
      "Node":"mm=400;b=fjdhkjfhds5454",
      "El":"jdj5544",
      "DisplayName":"Tag1",
      "Value":{
         "Value":61.175327,
         "SourceTimestamp":"2025-01-23T20:49:51.885Z"
      }
   },
   {
      "Node":"mm=44;b=kjlkjl",
      "El":"fdsfdsfsfd",
      "DisplayName":"tag2",
      "Value":{
         "Value":23.957619,
         "SourceTimestamp":"2025-10-23T20:49:51.885Z"
      }
   },
   {
      "Node":"mm=44;b=gfgfdgfd",
      "El":"fdsfdsfs",
      "DisplayName":"tag3",
      "Value":{
         "Value":88.91336,
         "SourceTimestamp":"2012-07-23T20:50:52.555Z"
      }
   },
   {
      "Node":"mm=87788;b=dsadad",
      "El":"fdsfdsfsdf5454fd",
      "DisplayName":"tag4",
      "Value":{
         "Value":89.044495,
         "SourceTimestamp":"2012-08-23T20:49:51.885Z"
      }
   },
   {
      "Node":"mm=2;b=fdsfdsf",
      "EL":"fdfsdfdsfsd",
      "DisplayName":"tag9",
      "Value":{
         "Value":95.07896,
         "SourceTimestamp":"2022-11-23T20:49:51.885Z"
      }
   },
   {
      "Node":"mm=2;b=AdsadasdasdasdAA=",
      "EL":"54f54ds5f4ds5fds",
      "DisplayName":"tag5",
      "Value":{
         "Value":19.866293,
         "SourceTimestamp":"1998-08-23T20:49:51.885Z"
      }
   },
   {
      "Node":"mm=2;b=54f5sd4f5ds4fds",
      "EL":"fkjhfjdshfsjdhjfds1221",
      "DisplayName":"tag6",
      "Value":{
         "Value":611.9143,
         "SourceTimestamp":"2201-09-22T20:44:51.017Z"
      }
   } 
]

I have defined the below schema to use from_json function in the dataframe,

structureSchema = StructType([
StructField("Node", StringType()),
StructField("El", StringType()),
StructField("DisplayName", StringType()),
StructField("value_array", ArrayType(
StructType([
    StructField("Value", StringType()),
    StructField("SourceTimestamp", TimestampType())
])))])

stream_messages.select(stream_messages.Bodycast).withColumn('json', from_json(F.col('Bodycast'), structureSchema)).display()

Can anyone please suggest if the schema defined is wrong ?

Thanks in advance

CodePudding user response:

I realized that the schema needs to be defined with ArrayType as there are multiple records in each row. Later I converted arrays to the required tabular form.

  • Related