I have been trying to define the schema for the incoming spark stream but its always returning NULLS below is the incoming schema, I am getting this by casting the binary values as string in the body (aka bodycast)
[
{
"Node":"mm=400;b=fjdhkjfhds5454",
"El":"jdj5544",
"DisplayName":"Tag1",
"Value":{
"Value":61.175327,
"SourceTimestamp":"2025-01-23T20:49:51.885Z"
}
},
{
"Node":"mm=44;b=kjlkjl",
"El":"fdsfdsfsfd",
"DisplayName":"tag2",
"Value":{
"Value":23.957619,
"SourceTimestamp":"2025-10-23T20:49:51.885Z"
}
},
{
"Node":"mm=44;b=gfgfdgfd",
"El":"fdsfdsfs",
"DisplayName":"tag3",
"Value":{
"Value":88.91336,
"SourceTimestamp":"2012-07-23T20:50:52.555Z"
}
},
{
"Node":"mm=87788;b=dsadad",
"El":"fdsfdsfsdf5454fd",
"DisplayName":"tag4",
"Value":{
"Value":89.044495,
"SourceTimestamp":"2012-08-23T20:49:51.885Z"
}
},
{
"Node":"mm=2;b=fdsfdsf",
"EL":"fdfsdfdsfsd",
"DisplayName":"tag9",
"Value":{
"Value":95.07896,
"SourceTimestamp":"2022-11-23T20:49:51.885Z"
}
},
{
"Node":"mm=2;b=AdsadasdasdasdAA=",
"EL":"54f54ds5f4ds5fds",
"DisplayName":"tag5",
"Value":{
"Value":19.866293,
"SourceTimestamp":"1998-08-23T20:49:51.885Z"
}
},
{
"Node":"mm=2;b=54f5sd4f5ds4fds",
"EL":"fkjhfjdshfsjdhjfds1221",
"DisplayName":"tag6",
"Value":{
"Value":611.9143,
"SourceTimestamp":"2201-09-22T20:44:51.017Z"
}
}
]
I have defined the below schema to use from_json function in the dataframe,
structureSchema = StructType([
StructField("Node", StringType()),
StructField("El", StringType()),
StructField("DisplayName", StringType()),
StructField("value_array", ArrayType(
StructType([
StructField("Value", StringType()),
StructField("SourceTimestamp", TimestampType())
])))])
stream_messages.select(stream_messages.Bodycast).withColumn('json', from_json(F.col('Bodycast'), structureSchema)).display()
Can anyone please suggest if the schema defined is wrong ?
Thanks in advance
CodePudding user response:
I realized that the schema needs to be defined with ArrayType as there are multiple records in each row. Later I converted arrays to the required tabular form.