Home > OS >  Extract json data inside array to rows in Spark
Extract json data inside array to rows in Spark

Time:07-01

I'm working with Azure Databricks to parse over a large number of small (4kb) JSON files containing information from a remote monitoring station with the following schema:

root
 |-- Body: struct (nullable = true)
 |    |-- telemetry: struct (nullable = true)
 |    |    |-- temp: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- bankNumber: long (nullable = true)
 |    |    |    |    |-- measurementTime: long (nullable = true)
 |    |    |    |    |-- measurementType: string (nullable = true)
 |    |    |    |    |-- sensorNumber: long (nullable = true)
 |    |    |    |    |-- tempatureReading: double (nullable = true)
 |    |    |-- wind: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- measurementTime: long (nullable = true)
 |    |    |    |    |-- measurementType: string (nullable = true)
 |    |    |    |    |-- sensorNumber: long (nullable = true)
 |    |    |    |    |-- velocityReading: double (nullable = true)
 |-- EnqueuedTimeUtc: string (nullable = true)
 |-- SystemProperties: struct (nullable = true)
 |    |-- connectionAuthMethod: string (nullable = true)
 |    |-- connectionDeviceGenerationId: string (nullable = true)
 |    |-- connectionDeviceId: string (nullable = true)
 |    |-- contentEncoding: string (nullable = true)
 |    |-- contentType: string (nullable = true)
 |    |-- enqueuedTime: string (nullable = true)
 |    |-- messageId: string (nullable = true)

The schema was auto generated when I did the initial read.json() and correctly matches my JSON files.

My issue is in trying to get the data contained within the Body.telemetry.temp and Body.telemetry.wind arrays to be in the correct form. I've tried a couple of different ways to go about getting it to display correctly but nothing has worked. I am trying to get it to look like this (columns):

|Measurement Type|Bank Number|Sensor Number|Temperature Measurement|Measurement Time|

I tried using a series of daisy chained functions of this form:

.withColumon(<alias>, explode("Body.telemetry.temp.measurementTime"))

This gave me the result I was looking for at first, it was unpacking the data into rows. However the resulting table still contained a copy of Body as a full string and when I tried this with all the parts of the temp structs it just kept running out of memory on Azure.

I tried playing around with .select() functions quite a bit but was never able to get more then one row working (using explode).

I've been googling around for a good number of hours and just can't seem to find anything that seems 100% applicable or something that works. The few solutions I did find that seemed like they may work were in scala which I do not know (I'd like to stick to python for the moment if possible).

CodePudding user response:

Per your printSchema output, the body, telemetry and element are structs whereas temp is an array. So you need to work on that first.Below are the samples you can test it out.

    df = spark.read.json("./test_data.json") 
    df.printSchema()
    df2=df.select("temp")
    df2.select("element.measurementTime")

##-OR
from pyspark.sql import functions as F
df.withColumn("temp",F.explode(F.col("temp"))).select("temp.*").show()

Please post a sample json file and progress you have done in case of any concern with the code snippet

  • Related