Home > Enterprise >  How to (dynamically) join array with a struct, to get a value from the struct for each element in th
How to (dynamically) join array with a struct, to get a value from the struct for each element in th

Time:10-19

I am trying to parse/flatten a JSON data, containing an array and a struct. For every "Id" in "data_array" column, I need to get the "EstValue" from "data_struct" column. Column name in "data_struct" is the actual id (from "data_array"). Tried my best to use a dynamic join, but getting error "Column is not iterable". Can't we use dynamic join conditions in PySpark, like we can in SQL? Is there any better way for achieving this?

JSON Input file:

{
  "data_array": [
    {
      "id": 1,
      "name": "ABC"
    },
    {
      "id": 2,
      "name": "DEF"
    }
  ],
  "data_struct": {
    "1": {
      "estimated": {
        "value": 123
      },
      "completed": {
        "value": 1234
      }
    },
    "2": {
      "estimated": {
        "value": 456
      },
      "completed": {
        "value": 4567
      }
    }
  }
}

Desired output:

Id  Name EstValue CompValue
1   ABC  123      1234
2   DEF  456      4567

My PySpark code:

from pyspark.sql.functions import * 
rawDF = spark.read.json([f"abfss://{pADLSContainer}@{pADLSGen2}.dfs.core.windows.net/{pADLSDirectory}/InputFile.json"], multiLine = "true")

idDF = rawDF.select(explode("data_array").alias("data_array")) \
    .select(col("data_array.id").alias("id")) 

idDF.show(n=2,vertical=True,truncate=150)


finalDF = idDF.join(rawDF, (idDF.id == rawDF.select(col("data_struct."   idDF.Id))) )

finalDF.show(n=2,vertical=True,truncate=150)

Error:

def __iter__(self): raise TypeError("Column is not iterable")

CodePudding user response:

Self joins create problems. In this case, you can avoid the join.

You could make arrays from both columns, zip them together and use inline to extract into columns. The most difficult part is creating array from "data_struct" column. Maybe there's a better way, but I only could think of first transforming it into map type.

Input:

s = """
{
  "data_array": [
    {
      "id": 1,
      "name": "ABC"
    },
    {
      "id": 2,
      "name": "DEF"
    }
  ],
  "data_struct": {
    "1": {
      "estimated": {
        "value": 123
      },
      "completed": {
        "value": 1234
      }
    },
    "2": {
      "estimated": {
        "value": 456
      },
      "completed": {
        "value": 4567
      }
    }
  }
}
"""
rawDF = spark.read.json(sc.parallelize([s]), multiLine = "true")

Script:

id = F.transform('data_array', lambda x: x.id).alias('Id')
name = F.transform('data_array', lambda x: x['name']).alias('Name')
map = F.from_json(F.to_json("data_struct"), 'map<string, struct<estimated:struct<value:long>,completed:struct<value:long>>>')
est_val = F.transform(id, lambda x: map[x].estimated.value).alias('EstValue')
comp_val = F.transform(id, lambda x: map[x].completed.value).alias('CompValue')
df = rawDF.withColumn('y', F.arrays_zip(id, name, est_val, comp_val))
df = df.selectExpr("inline(y)")

df.show()
#  --- ---- -------- --------- 
# | Id|Name|EstValue|CompValue|
#  --- ---- -------- --------- 
# |  1| ABC|     123|     1234|
# |  2| DEF|     456|     4567|
#  --- ---- -------- --------- 
  • Related