Home > Back-end >  How to create a spark DataFrame from Nested JSON structure
How to create a spark DataFrame from Nested JSON structure

Time:11-08

I'm trying to load data from the ExactOnline API into a spark DataFrame. Data comes out of the API in a very ugly format. I have multiple lines of valid JSON objects in one JSON file. One line of JSON looks as follows:

{
  "d": {
    "results": [
      {
        "__metadata": {
          "uri": "https://start.exactonline.nl/api_endpoint",
          "type": "Exact.Web.Api.Models.Account"
        },
        "Accountant": null,
        "AccountManager": null,
        "AccountManagerFullName": null,
        "AccountManagerHID": null,
        ...
      },
      {
        "__metadata": {
          "uri": "https://start.exactonline.nl/api_endpoint",
          "type": "Exact.Web.Api.Models.Account"
        },
        "Accountant": null,
        "AccountManager": null,
        "AccountManagerFullName": null,
        "AccountManagerHID": null,
        ...
        
      }
    ]
  }
}

What I need is that the keys of the dictionary's in the results list become the dataframe columns, and the number of dictionary's in the results become my rows. In the example I provided above, that would result in a dataframe with the following columns:

__metadata|Accountant|AccountManager|AccountManagerFullName|AccountManagerHID

And two rows, one for each entry in the "results" list.

In Python on my local machine, I am easily able to achieve this by using the following code snippet:

import json
import pandas as pd

folder_path = "path_to_json_file"


def flatten(l):
    return [item for sublist in l for item in sublist]

with open(folder_path) as f:
    # Extract relevant data from each line in the JSON structure and create a nested list, 
    #Where the "inner" lists are lists with dicts 
    # (1 line of JSON in my file = 1 inner list, so if my JSON file has 6 
    # lines the nested list will have 6 lists with a number of dictionaries)
    data = [json.loads(line)['d']['results'] for line in f]
    # Flatten the nested lists into one giant list        
    flat_data = flatten(data)

# Create a dataframe from that flat list.   
df = pd.DataFrame(flat_data)

However, I'm using a Pyspark Notebook in Azure Synapse, and the JSON files reside in our Data Lake so I cannot use with open to open files. I am limited to using spark functions. I have tried to achieve what I described above using spark.explode and spark.select:

from pyspark.sql import functions as sf
df = spark.read.json(path=path_to_json_file_in_data_lake)
df_subset = df.select('d.results')
df_exploded = df_subset.withColumn("results", sf.explode(sf.col('results')))

df_exploded has the right number of rows, but not the proper columns. I think I'm searching in the right direction but cannot wrap my head around it. Some assistance would be greatly appreciated.

CodePudding user response:

you can directly read JSON files in spark with spark.read.json(), but use the multiLine option as a single JSON is spread across multiple lines. then use inline sql function to explode and create new columns using the struct fields inside the array.

json_sdf = spark.read.option('multiLine', 'true'). \
    json('./drive/MyDrive/samplejsonsparkread.json')

# root
#  |-- d: struct (nullable = true)
#  |    |-- results: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- AccountManager: string (nullable = true)
#  |    |    |    |-- AccountManagerFullName: string (nullable = true)
#  |    |    |    |-- AccountManagerHID: string (nullable = true)
#  |    |    |    |-- Accountant: string (nullable = true)
#  |    |    |    |-- __metadata: struct (nullable = true)
#  |    |    |    |    |-- type: string (nullable = true)
#  |    |    |    |    |-- uri: string (nullable = true)

# use `inline` sql function to explode and create new fields from array of structs
json_sdf. \
    selectExpr('d.results as results'). \
    selectExpr('inline(results)'). \
    show(truncate=False)

#  -------------- ---------------------- ----------------- ---------- ------------------------------------------------------------------------- 
# |AccountManager|AccountManagerFullName|AccountManagerHID|Accountant|__metadata                                                               |
#  -------------- ---------------------- ----------------- ---------- ------------------------------------------------------------------------- 
# |null          |null                  |null             |null      |{Exact.Web.Api.Models.Account, https://start.exactonline.nl/api_endpoint}|
# |null          |null                  |null             |null      |{Exact.Web.Api.Models.Account, https://start.exactonline.nl/api_endpoint}|
#  -------------- ---------------------- ----------------- ---------- ------------------------------------------------------------------------- 

# root
#  |-- AccountManager: string (nullable = true)
#  |-- AccountManagerFullName: string (nullable = true)
#  |-- AccountManagerHID: string (nullable = true)
#  |-- Accountant: string (nullable = true)
#  |-- __metadata: struct (nullable = true)
#  |    |-- type: string (nullable = true)
#  |    |-- uri: string (nullable = true)

CodePudding user response:

I tried your code, it is working fine. Just missing one last step :

df_exploded = df_subset.withColumn("results", sf.explode(sf.col('results')))

df_exploded.select("results.*").show()

 -------------- ---------------------- ----------------- ---------- -------------------- 
|AccountManager|AccountManagerFullName|AccountManagerHID|Accountant|          __metadata|
 -------------- ---------------------- ----------------- ---------- -------------------- 
|          null|                  null|             null|      null|[Exact.Web.Api.Mo...|
|          null|                  null|             null|      null|[Exact.Web.Api.Mo...|
 -------------- ---------------------- ----------------- ---------- -------------------- 
  • Related