I'm trying to load data from the ExactOnline API into a spark DataFrame. Data comes out of the API in a very ugly format. I have multiple lines of valid JSON objects in one JSON file. One line of JSON looks as follows:
{
"d": {
"results": [
{
"__metadata": {
"uri": "https://start.exactonline.nl/api_endpoint",
"type": "Exact.Web.Api.Models.Account"
},
"Accountant": null,
"AccountManager": null,
"AccountManagerFullName": null,
"AccountManagerHID": null,
...
},
{
"__metadata": {
"uri": "https://start.exactonline.nl/api_endpoint",
"type": "Exact.Web.Api.Models.Account"
},
"Accountant": null,
"AccountManager": null,
"AccountManagerFullName": null,
"AccountManagerHID": null,
...
}
]
}
}
What I need is that the keys of the dictionary's in the results list become the dataframe columns, and the number of dictionary's in the results become my rows. In the example I provided above, that would result in a dataframe with the following columns:
__metadata|Accountant|AccountManager|AccountManagerFullName|AccountManagerHID
And two rows, one for each entry in the "results" list.
In Python on my local machine, I am easily able to achieve this by using the following code snippet:
import json
import pandas as pd
folder_path = "path_to_json_file"
def flatten(l):
return [item for sublist in l for item in sublist]
with open(folder_path) as f:
# Extract relevant data from each line in the JSON structure and create a nested list,
#Where the "inner" lists are lists with dicts
# (1 line of JSON in my file = 1 inner list, so if my JSON file has 6
# lines the nested list will have 6 lists with a number of dictionaries)
data = [json.loads(line)['d']['results'] for line in f]
# Flatten the nested lists into one giant list
flat_data = flatten(data)
# Create a dataframe from that flat list.
df = pd.DataFrame(flat_data)
However, I'm using a Pyspark Notebook in Azure Synapse, and the JSON files reside in our Data Lake so I cannot use with open
to open files. I am limited to using spark functions. I have tried to achieve what I described above using spark.explode
and spark.select
:
from pyspark.sql import functions as sf
df = spark.read.json(path=path_to_json_file_in_data_lake)
df_subset = df.select('d.results')
df_exploded = df_subset.withColumn("results", sf.explode(sf.col('results')))
df_exploded has the right number of rows, but not the proper columns. I think I'm searching in the right direction but cannot wrap my head around it. Some assistance would be greatly appreciated.
CodePudding user response:
you can directly read JSON files in spark with spark.read.json()
, but use the multiLine
option as a single JSON is spread across multiple lines. then use inline
sql function to explode and create new columns using the struct fields inside the array.
json_sdf = spark.read.option('multiLine', 'true'). \
json('./drive/MyDrive/samplejsonsparkread.json')
# root
# |-- d: struct (nullable = true)
# | |-- results: array (nullable = true)
# | | |-- element: struct (containsNull = true)
# | | | |-- AccountManager: string (nullable = true)
# | | | |-- AccountManagerFullName: string (nullable = true)
# | | | |-- AccountManagerHID: string (nullable = true)
# | | | |-- Accountant: string (nullable = true)
# | | | |-- __metadata: struct (nullable = true)
# | | | | |-- type: string (nullable = true)
# | | | | |-- uri: string (nullable = true)
# use `inline` sql function to explode and create new fields from array of structs
json_sdf. \
selectExpr('d.results as results'). \
selectExpr('inline(results)'). \
show(truncate=False)
# -------------- ---------------------- ----------------- ---------- -------------------------------------------------------------------------
# |AccountManager|AccountManagerFullName|AccountManagerHID|Accountant|__metadata |
# -------------- ---------------------- ----------------- ---------- -------------------------------------------------------------------------
# |null |null |null |null |{Exact.Web.Api.Models.Account, https://start.exactonline.nl/api_endpoint}|
# |null |null |null |null |{Exact.Web.Api.Models.Account, https://start.exactonline.nl/api_endpoint}|
# -------------- ---------------------- ----------------- ---------- -------------------------------------------------------------------------
# root
# |-- AccountManager: string (nullable = true)
# |-- AccountManagerFullName: string (nullable = true)
# |-- AccountManagerHID: string (nullable = true)
# |-- Accountant: string (nullable = true)
# |-- __metadata: struct (nullable = true)
# | |-- type: string (nullable = true)
# | |-- uri: string (nullable = true)
CodePudding user response:
I tried your code, it is working fine. Just missing one last step :
df_exploded = df_subset.withColumn("results", sf.explode(sf.col('results')))
df_exploded.select("results.*").show()
-------------- ---------------------- ----------------- ---------- --------------------
|AccountManager|AccountManagerFullName|AccountManagerHID|Accountant| __metadata|
-------------- ---------------------- ----------------- ---------- --------------------
| null| null| null| null|[Exact.Web.Api.Mo...|
| null| null| null| null|[Exact.Web.Api.Mo...|
-------------- ---------------------- ----------------- ---------- --------------------