Apologies for the potentially basic question but I have exhausted all possibilities in searching for a solution to this. I am fairly new to pyspark. I have managed to flatten the first level of my json file. I have a dataframe that looks like the following:
There is one row. Each column represents a day between 2019-01-01 to 2020-01-01
| 2019-01-01 | 2019-01-02 |...
| -------- | -------- |...
| USD:1, | USD: 1, |...
| JPY:109.61 | JPY:109.62 |...
...
Schema as follows (with each day between 2019 and 2020 as a column header:
root
|-- 2019-01-01: struct (nullable = true)
| |-- AED: double (nullable = true)
| |-- AFN: double (nullable = true)
| |-- ALL: double (nullable = true)
| |-- ARS: double (nullable = true)
| |-- AUD: double (nullable = true)
| |-- BAM: double (nullable = true)
| |-- BBD: double (nullable = true)
| |-- BDT: double (nullable = true)
| |-- BGN: double (nullable = true)
| |-- BHD: double (nullable = true)
| |-- BIF: double (nullable = true)
etc.
I would like each value in each column to be in a speparate row and to pivot the table so that the dates are rows and each country code is a column heading as follows:
| Date | USD |JPY
| ----------- | -------- |-----
| 2019-01-01 | 1 |109.61
| 2019-01-02 | 1 |109.62
etc.
Every solution I have found so far requires that I specify which column to pivot/unnest.
CodePudding user response:
The solution here loads and flattens the json before loading into a PySpark DataFrame.
Set-Up
Here is a sample of the json.
json_input = {'2019-01-01': {'AED': 1.0,
'AFN': 1.0,
'ALL': 1.0,
'ARS': 1.0,
'AUD': 1.0,
'BAM': 1.0,
'BBD': 1.0,
'BDT': 1.0,
'BGN': 1.0,
'BHD': 1.0,
'BIF': 1.0},
'2019-01-02': {'AED': 2.0,
'AFN': 2.0,
'ALL': 2.0,
'ARS': 2.0,
'AUD': 2.0,
'BAM': 2.0,
'BBD': 2.0,
'BDT': 2.0,
'BGN': 2.0,
'BHD': 2.0,
'BIF': 2.0}}
Logic
Next I reformat the json to make the "Date" key at the same level as the country_codes.
json_reformat = []
for date, country_data in json_input.items():
json_flat = country_data
json_flat["Date"] = date
json_reformat.append(json_flat)
output
[{'AED': 1.0,
'AFN': 1.0,
'ALL': 1.0,
'ARS': 1.0,
'AUD': 1.0,
'BAM': 1.0,
'BBD': 1.0,
'BDT': 1.0,
'BGN': 1.0,
'BHD': 1.0,
'BIF': 1.0,
'Date': '2019-01-01'},
{'AED': 2.0,
'AFN': 2.0,
'ALL': 2.0,
'ARS': 2.0,
'AUD': 2.0,
'BAM': 2.0,
'BBD': 2.0,
'BDT': 2.0,
'BGN': 2.0,
'BHD': 2.0,
'BIF': 2.0,
'Date': '2019-01-02'}]
Then load this newly formatted object into a PySpark DataFrame
df = spark.read.json(sc.parallelize(json_reformat))
df.show()
output
--- --- --- --- --- --- --- --- --- --- --- ----------
|AED|AFN|ALL|ARS|AUD|BAM|BBD|BDT|BGN|BHD|BIF| Date|
--- --- --- --- --- --- --- --- --- --- --- ----------
|1.0|1.0|1.0|1.0|1.0|1.0|1.0|1.0|1.0|1.0|1.0|2019-01-01|
|2.0|2.0|2.0|2.0|2.0|2.0|2.0|2.0|2.0|2.0|2.0|2019-01-02|
--- --- --- --- --- --- --- --- --- --- --- ----------
CodePudding user response:
Considering all date columns are in same order and has no missing values you can write for loops:
row_items = ()
rows = []
cols = ['Date']
for i in data.select('2019-01-01').collect():
cols.append(i[0].split(':')[0])
for item in data.columns:
row_items = (item,)
for i in data.select(item).collect():
row_items = (i[0].split(':')[1],)
rows.append(row_items)
row_items = ()
result = spark.createDataFrame(rows,cols)