Home > Enterprise >  Converting values-oriented JSON in PySpark
Converting values-oriented JSON in PySpark

Time:03-02

For a project I'm working on I need to read JSON output from an API into a Spark DataFrame to process further into Data Lake storage. However, the JSON is not just a regular JSON I'm used to work with. I would like to convert the below JSON into a Spark DataFrame so it can be used for Delta Lake processing. Does anyone know how to efficiently convert this into a DataFrame?

[
  [
    {
      "name": "Id", 
      "value": "1"
    }, 
    {
      "name": "Firstname", 
      "value": "Foo"
    }
  ],
  [
    {
      "name": "Id", 
      "value": "2"
    }, 
    {
      "name": "Firstname", 
      "value": "Foo"
    },
    {
      "name": "Lastname", 
      "value": "Bar"
    }
  ]
]

Sidenotes:

  • If a value is empty in the source system, the property will not be available.
  • We need to be able to distribute efficiently across Databricks nodes so we prefer sticking with PySpark DataFrames and not Pandas. Also because we're working with Delta Lake files.

The preferred output is a PySpark DataFrame that looks as follows: foobar

I have tried reading the JSON and then process further into a new DataFrame but this seems very unefficient and it also can't handle missing columns in certain rows.

text = [[{"name": "Id", "value": "1"}, {"name": "Firstname","value": "Foo"}],[{"name": "Id", "value": "2"}, {"name": "Firstname","value": "Foo"}]]
df = spark.createDataFrame(text)

for itemIndex, item in enumerate(df.collect()):
  print('New record')
  for columnIndex, column in enumerate(df.columns):
    print(item[columnIndex]['name'], ': ', item[columnIndex]['value'])
  print('\n')

CodePudding user response:

You can use spark.read.json I would suggest to first change your json to another format

[
  [
    {
       "Id": "1",
 
       "Firstname": "Foo"
    }
  ],
  [
    {
      "Id": "2",
     "Firstname": "Foo",
     "Lastname": "Bar"
    }
  ]
]



For that you can use the following code

import json
original_json=json.loads(json_string)

newjson=[]

for items in original_json:
    temp_dict={item['name']:item['value'] for item in items}
    newjson.append(temp_dict)

    
newjson=json.dumps(newjson)
print (newjson)

f=open('yourjsonfile.json','w')
f.write(newjson)
f.close()
df = spark.read.json("your_json_file.json")
df.printSchema()
df.show()

also to note that your json is multiline hence you can use

df=spark.read.option("multiline","true") \
      .json("your_json_file.json")

CodePudding user response:

Not sure if your JSON field's order is always the same, but this is an option for you. Note that I wrapped your JSON in two square brackets in order to make it one single column.

from pyspark.sql import functions as F

text = [[data]] # note two square brackets
df = spark.createDataFrame(text)

(df
    .select(F.explode('_1').alias('data'))
    .select(
        F.col('data')[0]['value'].alias('Id'),
        F.col('data')[1]['value'].alias('firstname'),
        F.col('data')[2]['value'].alias('lastname'),
    )
    .show(10, False)
)

 --- --------- -------- 
|id |firstname|lastname|
 --- --------- -------- 
|1  |Foo      |null    |
|2  |Foo      |Bar     |
 --- --------- -------- 
  • Related