I have a nested json body like below:
{ "MainTag": { "GroupId": "10C81", "IDArray": [ "ABC-XYZ-123" ], "IDStruct": { "DSA-ASA-211": null, "BSA-ASA-211": null, "ABC-XYZ-123": [ { "BagId": "42425fsdfs", "TravelerId": "1234567", "LegCorr": [ { "DelID": "SQH", "SegID": "PQR-UVW" }, { "DelID": "GFS", "SegID": "GHS-UVW" } ] } ] } } }
Note: The IDStruct tag will only have value for the keys present in array tag IDArray.
Can someone please help me here in achieving this using PySpark.
Below is the printSchema() output:
CodePudding user response:
Basically you can do it using function explode
.
First, I will create the structure for our data. Then create a data frame and explode all the arrays between.
data = { "MainTag": { "GroupId": "10C81", "IDArray": [ "ABC-XYZ-123"],` "IDStruct": { "DSA-ASA-211": "null", "BSA-ASA-211": "null", "ABC-XYZ-123": [ { "BagId": "42425fsdfs", "TravelerId": "1234567", "LegCorr": [ { "DelID": "SQH", "SegID": "PQR-UVW" }, { "DelID": "GFS", "SegID": "GHS-UVW" } ] } ] } } }
from pyspark.sql.types import *
schema_main = StructType([
StructField('MainTag', StructType([
StructField('GroupId', StringType()),
StructField('IDArray', ArrayType(StringType())),
StructField('IDStruct', StructType([
StructField('DSA-ASA-211', StringType()),
StructField('BSA-ASA-211', StringType()),
StructField('ABC-XYZ-123', ArrayType(
StructType([
StructField('BagId', StringType()),
StructField('TravelerId', StringType()),
StructField('LegCorr', ArrayType(
StructType([
StructField('DelID', StringType()),
StructField('SegID', StringType())
])
))
])
))
]))
]))
])
df = spark.createDataFrame(data=[data], schema=schema_main)
from pyspark.sql.functions import explode
df.select("MainTag", "MainTag.GroupId", explode("MainTag.IDArray").alias("ID")) \
.select("GroupId", "ID", explode("MainTag.IDStruct.ABC-XYZ-123").alias("ABC")) \
.select("GroupId", "ID", "ABC.BagId", "ABC.TravelerId", explode("ABC.LegCorr").alias("Legs")) \
.select("GroupId", "ID", "BagId", "TravelerId", "Legs.DelID", "Legs.SegID") \
.show(truncate=False)
Here is the result:
------- ----------- ---------- ---------- ----- -------
|GroupId|ID |BagId |TravelerId|DelID|SegID |
------- ----------- ---------- ---------- ----- -------
|10C81 |ABC-XYZ-123|42425fsdfs|1234567 |SQH |PQR-UVW|
|10C81 |ABC-XYZ-123|42425fsdfs|1234567 |GFS |GHS-UVW|
------- ----------- ---------- ---------- ----- -------
CodePudding user response:
It's convenient to use inline
to expand arrays of struct. In the following code it's used 3 times.
This is your data:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[({"GroupId":"10C81","IDArray":["ABC-XYZ-123"],"IDStruct":{"DSA-ASA-211":None,"BSA-ASA-211":None,"ABC-XYZ-123":[{"BagId":"42425fsdfs","TravelerId":"1234567","LegCorr":[{"DelID":"SQH","SegID":"PQR-UVW"},{"DelID":"GFS","SegID":"GHS-UVW"}]}]}},)],
"MainTag struct<GroupId:string, IDArray:array<string>, IDStruct:struct<`ABC-XYZ-123`:array<struct<BagId:string,LegCorr:array<struct<DelID:string,SegID:string>>,TravelerId:string>>,`DSA-ASA-211`:string,`BSA-ASA-211`:string>>")
df.printSchema()
# root
# |-- MainTag: struct (nullable = true)
# | |-- GroupId: string (nullable = true)
# | |-- IDArray: array (nullable = true)
# | | |-- element: string (containsNull = true)
# | |-- IDStruct: struct (nullable = true)
# | | |-- ABC-XYZ-123: array (nullable = true)
# | | | |-- element: struct (containsNull = true)
# | | | | |-- BagId: string (nullable = true)
# | | | | |-- LegCorr: array (nullable = true)
# | | | | | |-- element: struct (containsNull = true)
# | | | | | | |-- DelID: string (nullable = true)
# | | | | | | |-- SegID: string (nullable = true)
# | | | | |-- TravelerId: string (nullable = true)
# | | |-- DSA-ASA-211: string (nullable = true)
# | | |-- BSA-ASA-211: string (nullable = true)
Script:
df = df.selectExpr("inline(array(MainTag))")
df = df.selectExpr(
*[c for c in df.columns if c != 'IDStruct'],
"inline(IDStruct.`ABC-XYZ-123`)")
df = df.selectExpr(
*[c for c in df.columns if c != 'LegCorr'],
"inline(LegCorr)")
df = df.withColumn("IDArray", F.explode("IDArray"))
df.show()
# ------- ----------- ---------- ---------- ----- -------
# |GroupId| IDArray| BagId|TravelerId|DelID| SegID|
# ------- ----------- ---------- ---------- ----- -------
# | 10C81|ABC-XYZ-123|42425fsdfs| 1234567| SQH|PQR-UVW|
# | 10C81|ABC-XYZ-123|42425fsdfs| 1234567| GFS|GHS-UVW|
# ------- ----------- ---------- ---------- ----- -------