Home > other >  Nested JSON parsing pyspark
Nested JSON parsing pyspark

Time:10-04

I have a nested json body like below:

{ "MainTag": { "GroupId": "10C81", "IDArray": [ "ABC-XYZ-123" ], "IDStruct": { "DSA-ASA-211": null, "BSA-ASA-211": null, "ABC-XYZ-123": [ { "BagId": "42425fsdfs", "TravelerId": "1234567", "LegCorr": [ { "DelID": "SQH", "SegID": "PQR-UVW" }, { "DelID": "GFS", "SegID": "GHS-UVW" } ] } ] } } }

My desired output is: enter image description here

Note: The IDStruct tag will only have value for the keys present in array tag IDArray.

Can someone please help me here in achieving this using PySpark.

Below is the printSchema() output:

enter image description here

CodePudding user response:

Basically you can do it using function explode. First, I will create the structure for our data. Then create a data frame and explode all the arrays between.

data = { "MainTag": { "GroupId": "10C81", "IDArray": [ "ABC-XYZ-123"],` "IDStruct": { "DSA-ASA-211": "null", "BSA-ASA-211": "null", "ABC-XYZ-123": [ { "BagId": "42425fsdfs", "TravelerId": "1234567", "LegCorr": [ { "DelID": "SQH", "SegID": "PQR-UVW" }, { "DelID": "GFS", "SegID": "GHS-UVW" } ] } ] } } }
    
    from pyspark.sql.types import *
    schema_main = StructType([
        StructField('MainTag', StructType([
            StructField('GroupId', StringType()),
            StructField('IDArray', ArrayType(StringType())),
            StructField('IDStruct', StructType([
                StructField('DSA-ASA-211', StringType()),
                StructField('BSA-ASA-211', StringType()),
                StructField('ABC-XYZ-123', ArrayType(
                    StructType([
                        StructField('BagId', StringType()),
                        StructField('TravelerId', StringType()),
                        StructField('LegCorr', ArrayType(
                            StructType([
                                StructField('DelID', StringType()),
                                StructField('SegID', StringType())
                            ])
                        ))
                    ])
                ))
            ]))
        ]))
    ])
    
    df = spark.createDataFrame(data=[data], schema=schema_main)
    
    from pyspark.sql.functions import explode
    df.select("MainTag", "MainTag.GroupId", explode("MainTag.IDArray").alias("ID")) \
    .select("GroupId", "ID", explode("MainTag.IDStruct.ABC-XYZ-123").alias("ABC")) \
    .select("GroupId", "ID", "ABC.BagId", "ABC.TravelerId", explode("ABC.LegCorr").alias("Legs")) \
    .select("GroupId", "ID", "BagId", "TravelerId", "Legs.DelID", "Legs.SegID") \
    .show(truncate=False)

Here is the result:

 ------- ----------- ---------- ---------- ----- ------- 
|GroupId|ID         |BagId     |TravelerId|DelID|SegID  |
 ------- ----------- ---------- ---------- ----- ------- 
|10C81  |ABC-XYZ-123|42425fsdfs|1234567   |SQH  |PQR-UVW|
|10C81  |ABC-XYZ-123|42425fsdfs|1234567   |GFS  |GHS-UVW|
 ------- ----------- ---------- ---------- ----- ------- 

CodePudding user response:

It's convenient to use inline to expand arrays of struct. In the following code it's used 3 times.

This is your data:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [({"GroupId":"10C81","IDArray":["ABC-XYZ-123"],"IDStruct":{"DSA-ASA-211":None,"BSA-ASA-211":None,"ABC-XYZ-123":[{"BagId":"42425fsdfs","TravelerId":"1234567","LegCorr":[{"DelID":"SQH","SegID":"PQR-UVW"},{"DelID":"GFS","SegID":"GHS-UVW"}]}]}},)],
    "MainTag struct<GroupId:string, IDArray:array<string>, IDStruct:struct<`ABC-XYZ-123`:array<struct<BagId:string,LegCorr:array<struct<DelID:string,SegID:string>>,TravelerId:string>>,`DSA-ASA-211`:string,`BSA-ASA-211`:string>>")
df.printSchema()
# root
#  |-- MainTag: struct (nullable = true)
#  |    |-- GroupId: string (nullable = true)
#  |    |-- IDArray: array (nullable = true)
#  |    |    |-- element: string (containsNull = true)
#  |    |-- IDStruct: struct (nullable = true)
#  |    |    |-- ABC-XYZ-123: array (nullable = true)
#  |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |-- BagId: string (nullable = true)
#  |    |    |    |    |-- LegCorr: array (nullable = true)
#  |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |-- DelID: string (nullable = true)
#  |    |    |    |    |    |    |-- SegID: string (nullable = true)
#  |    |    |    |    |-- TravelerId: string (nullable = true)
#  |    |    |-- DSA-ASA-211: string (nullable = true)
#  |    |    |-- BSA-ASA-211: string (nullable = true)

Script:

df = df.selectExpr("inline(array(MainTag))")
df = df.selectExpr(
    *[c for c in df.columns if c != 'IDStruct'],
    "inline(IDStruct.`ABC-XYZ-123`)")
df = df.selectExpr(
    *[c for c in df.columns if c != 'LegCorr'],
    "inline(LegCorr)")
df = df.withColumn("IDArray", F.explode("IDArray"))

df.show()
#  ------- ----------- ---------- ---------- ----- ------- 
# |GroupId|    IDArray|     BagId|TravelerId|DelID|  SegID|
#  ------- ----------- ---------- ---------- ----- ------- 
# |  10C81|ABC-XYZ-123|42425fsdfs|   1234567|  SQH|PQR-UVW|
# |  10C81|ABC-XYZ-123|42425fsdfs|   1234567|  GFS|GHS-UVW|
#  ------- ----------- ---------- ---------- ----- ------- 
  • Related