I am trying to parse/flatten a JSON data, containing an array and a struct. For every "Id" in "data_array" column, I need to get the "EstValue" from "data_struct" column. Column name in "data_struct" is the actual id (from "data_array"). Tried my best to use a dynamic join, but getting error "Column is not iterable". Can't we use dynamic join conditions in PySpark, like we can in SQL? Is there any better way for achieving this?
JSON Input file:
{
"data_array": [
{
"id": 1,
"name": "ABC"
},
{
"id": 2,
"name": "DEF"
}
],
"data_struct": {
"1": {
"estimated": {
"value": 123
},
"completed": {
"value": 1234
}
},
"2": {
"estimated": {
"value": 456
},
"completed": {
"value": 4567
}
}
}
}
Desired output:
Id Name EstValue CompValue
1 ABC 123 1234
2 DEF 456 4567
My PySpark code:
from pyspark.sql.functions import *
rawDF = spark.read.json([f"abfss://{pADLSContainer}@{pADLSGen2}.dfs.core.windows.net/{pADLSDirectory}/InputFile.json"], multiLine = "true")
idDF = rawDF.select(explode("data_array").alias("data_array")) \
.select(col("data_array.id").alias("id"))
idDF.show(n=2,vertical=True,truncate=150)
finalDF = idDF.join(rawDF, (idDF.id == rawDF.select(col("data_struct." idDF.Id))) )
finalDF.show(n=2,vertical=True,truncate=150)
Error:
def __iter__(self): raise TypeError("Column is not iterable")
CodePudding user response:
Self joins create problems. In this case, you can avoid the join.
You could make arrays from both columns, zip them together and use inline
to extract into columns. The most difficult part is creating array from "data_struct" column. Maybe there's a better way, but I only could think of first transforming it into map type.
Input:
s = """
{
"data_array": [
{
"id": 1,
"name": "ABC"
},
{
"id": 2,
"name": "DEF"
}
],
"data_struct": {
"1": {
"estimated": {
"value": 123
},
"completed": {
"value": 1234
}
},
"2": {
"estimated": {
"value": 456
},
"completed": {
"value": 4567
}
}
}
}
"""
rawDF = spark.read.json(sc.parallelize([s]), multiLine = "true")
Script:
id = F.transform('data_array', lambda x: x.id).alias('Id')
name = F.transform('data_array', lambda x: x['name']).alias('Name')
map = F.from_json(F.to_json("data_struct"), 'map<string, struct<estimated:struct<value:long>,completed:struct<value:long>>>')
est_val = F.transform(id, lambda x: map[x].estimated.value).alias('EstValue')
comp_val = F.transform(id, lambda x: map[x].completed.value).alias('CompValue')
df = rawDF.withColumn('y', F.arrays_zip(id, name, est_val, comp_val))
df = df.selectExpr("inline(y)")
df.show()
# --- ---- -------- ---------
# | Id|Name|EstValue|CompValue|
# --- ---- -------- ---------
# | 1| ABC| 123| 1234|
# | 2| DEF| 456| 4567|
# --- ---- -------- ---------