Home > Mobile >  Pivot spark dataframe (pyspark) when all column values are unique
Pivot spark dataframe (pyspark) when all column values are unique

Time:01-19

Apologies for the potentially basic question but I have exhausted all possibilities in searching for a solution to this. I am fairly new to pyspark. I have managed to flatten the first level of my json file. I have a dataframe that looks like the following:

There is one row. Each column represents a day between 2019-01-01 to 2020-01-01

| 2019-01-01 | 2019-01-02 |...
| --------   | --------   |...
| USD:1,     | USD: 1,    |...
| JPY:109.61 | JPY:109.62 |...
...     

Schema as follows (with each day between 2019 and 2020 as a column header:

root
 |-- 2019-01-01: struct (nullable = true)
 |    |-- AED: double (nullable = true)
 |    |-- AFN: double (nullable = true)
 |    |-- ALL: double (nullable = true)
 |    |-- ARS: double (nullable = true)
 |    |-- AUD: double (nullable = true)
 |    |-- BAM: double (nullable = true)
 |    |-- BBD: double (nullable = true)
 |    |-- BDT: double (nullable = true)
 |    |-- BGN: double (nullable = true)
 |    |-- BHD: double (nullable = true)
 |    |-- BIF: double (nullable = true)

etc.

I would like each value in each column to be in a speparate row and to pivot the table so that the dates are rows and each country code is a column heading as follows:

|  Date       | USD      |JPY
| ----------- | -------- |-----
| 2019-01-01  | 1        |109.61
| 2019-01-02  | 1        |109.62

etc.

Every solution I have found so far requires that I specify which column to pivot/unnest.

CodePudding user response:

The solution here loads and flattens the json before loading into a PySpark DataFrame.

Set-Up

Here is a sample of the json.

json_input = {'2019-01-01': {'AED': 1.0,
                'AFN': 1.0,
                'ALL': 1.0,
                'ARS': 1.0,
                'AUD': 1.0,
                'BAM': 1.0,
                'BBD': 1.0,
                'BDT': 1.0,
                'BGN': 1.0,
                'BHD': 1.0,
                'BIF': 1.0},
              '2019-01-02': {'AED': 2.0,
                'AFN': 2.0,
                'ALL': 2.0,
                'ARS': 2.0,
                'AUD': 2.0,
                'BAM': 2.0,
                'BBD': 2.0,
                'BDT': 2.0,
                'BGN': 2.0,
                'BHD': 2.0,
                'BIF': 2.0}}

Logic

Next I reformat the json to make the "Date" key at the same level as the country_codes.

json_reformat = []
for date, country_data in json_input.items():
    json_flat = country_data
    json_flat["Date"] = date
    json_reformat.append(json_flat)

output

[{'AED': 1.0,
  'AFN': 1.0,
  'ALL': 1.0,
  'ARS': 1.0,
  'AUD': 1.0,
  'BAM': 1.0,
  'BBD': 1.0,
  'BDT': 1.0,
  'BGN': 1.0,
  'BHD': 1.0,
  'BIF': 1.0,
  'Date': '2019-01-01'},
 {'AED': 2.0,
  'AFN': 2.0,
  'ALL': 2.0,
  'ARS': 2.0,
  'AUD': 2.0,
  'BAM': 2.0,
  'BBD': 2.0,
  'BDT': 2.0,
  'BGN': 2.0,
  'BHD': 2.0,
  'BIF': 2.0,
  'Date': '2019-01-02'}]

Then load this newly formatted object into a PySpark DataFrame

df = spark.read.json(sc.parallelize(json_reformat))
df.show()

output

 --- --- --- --- --- --- --- --- --- --- --- ---------- 
|AED|AFN|ALL|ARS|AUD|BAM|BBD|BDT|BGN|BHD|BIF|      Date|
 --- --- --- --- --- --- --- --- --- --- --- ---------- 
|1.0|1.0|1.0|1.0|1.0|1.0|1.0|1.0|1.0|1.0|1.0|2019-01-01|
|2.0|2.0|2.0|2.0|2.0|2.0|2.0|2.0|2.0|2.0|2.0|2019-01-02|
 --- --- --- --- --- --- --- --- --- --- --- ---------- 

CodePudding user response:

Considering all date columns are in same order and has no missing values you can write for loops:

row_items = ()
rows = []
cols = ['Date']

for i in data.select('2019-01-01').collect():
   cols.append(i[0].split(':')[0])

for item in data.columns:
   row_items  = (item,)
   for i in data.select(item).collect():
      row_items  = (i[0].split(':')[1],)
   rows.append(row_items)
   row_items = ()

result = spark.createDataFrame(rows,cols)
  • Related