Home > Software design >  Pyspark - normalize a dataframe
Pyspark - normalize a dataframe

Time:09-21

I have this API response:

{
  '02/09/2021': {
    'ABC': {
      'emp': 'A1',
      'value': '12421'
    },
    'DEF': {
      'emp': 'D1',
      'value': '3345'
    },
    'GHI': {
      'emp': 'G2',
      'value': '260048836600'
    },
    'JKL': {
      'emp': 'J1',
      'value': '66654654'
    }
  }
}

and would like to normalize a table in this format:

CODE | EMP | VALUE        | DATE
========================================
ABC  | A1  | 12421        | 02/09/2021
DEF  | D1  | 3445         | 02/09/2021
GHI  | G2  | 260048836600 | 02/09/2021
JKL  | J1  | 66654654     | 02/09/2021

I tried to make an explode but I couldn't do it, how can I get this result?


To reproduce it:

import json

api_response = {'02/09/2021':{'ABC':{'emp':'A1','value':'12421'},'DEF':{'emp':'D1','value':'3345'},'GHI':{'emp':'G2','value':'260048836600'},'JKL':{'emp':'J1','value':'66654654'}}}

rdd = spark.sparkContext.parallelize([json.dumps(api_response)])
input_df = spark.read.json(rdd)

CodePudding user response:

The easier way to parse your api response into dataframe is first transforming the result, removing the key date and putting it into the body. To transforming it into a table it's required to use stack:

import json

def transform_api_content(api_response):
  for key, value in api_response.items():
    value['DATE'] = key
    yield json.dumps(value)


api_response = {'02/09/2021':{'ABC':{'emp':'A1','value':'12421'},'DEF':{'emp':'D1','value':'3345'},'GHI':{'emp':'G2','value':'260048836600'},'JKL':{'emp':'J1','value':'66654654'}}}
input_df = spark.read.json(sc.parallelize(transform_api_content(api_response)))

# Hard-coded
stack = 'stack(4, "ABC", ABC.emp, ABC.value, "DEF", DEF.emp, DEF.value, "GHI", GHI.emp, GHI.value, "JKL", JKL.emp, JKL.value) AS (CODE, EMP, VALUE)'
output_df = input_df.selectExpr(stack, 'DATE')
output_df.show(truncate=False)

# Dynamic
def stack_columns(dataframe):
  struct_cols = [column for column, schema in dataframe.dtypes if schema.startswith('struct')]
  stack_expr = ['"{0}", {0}.emp, {0}.value'.format(column) for column in struct_cols]
  return 'stack({length}, {stack}) AS (CODE, EMP, VALUE)'.format(length=len(struct_cols), stack=','.join(stack_expr))


output_df = input_df.selectExpr(stack_columns(input_df), 'DATE')
output_df.show(truncate=False)

Output

 ---- --- ------------ ---------- 
|CODE|EMP|VALUE       |DATE      |
 ---- --- ------------ ---------- 
|ABC |A1 |12421       |02/09/2021|
|DEF |D1 |3345        |02/09/2021|
|GHI |G2 |260048836600|02/09/2021|
|JKL |J1 |66654654    |02/09/2021|
 ---- --- ------------ ---------- 
  • Related