I have this API response:
{
'02/09/2021': {
'ABC': {
'emp': 'A1',
'value': '12421'
},
'DEF': {
'emp': 'D1',
'value': '3345'
},
'GHI': {
'emp': 'G2',
'value': '260048836600'
},
'JKL': {
'emp': 'J1',
'value': '66654654'
}
}
}
and would like to normalize a table in this format:
CODE | EMP | VALUE | DATE
========================================
ABC | A1 | 12421 | 02/09/2021
DEF | D1 | 3445 | 02/09/2021
GHI | G2 | 260048836600 | 02/09/2021
JKL | J1 | 66654654 | 02/09/2021
I tried to make an explode but I couldn't do it, how can I get this result?
To reproduce it:
import json
api_response = {'02/09/2021':{'ABC':{'emp':'A1','value':'12421'},'DEF':{'emp':'D1','value':'3345'},'GHI':{'emp':'G2','value':'260048836600'},'JKL':{'emp':'J1','value':'66654654'}}}
rdd = spark.sparkContext.parallelize([json.dumps(api_response)])
input_df = spark.read.json(rdd)
CodePudding user response:
The easier way to parse your api response into dataframe is first transforming the result, removing the key date and putting it into the body. To transforming it into a table it's required to use stack
:
import json
def transform_api_content(api_response):
for key, value in api_response.items():
value['DATE'] = key
yield json.dumps(value)
api_response = {'02/09/2021':{'ABC':{'emp':'A1','value':'12421'},'DEF':{'emp':'D1','value':'3345'},'GHI':{'emp':'G2','value':'260048836600'},'JKL':{'emp':'J1','value':'66654654'}}}
input_df = spark.read.json(sc.parallelize(transform_api_content(api_response)))
# Hard-coded
stack = 'stack(4, "ABC", ABC.emp, ABC.value, "DEF", DEF.emp, DEF.value, "GHI", GHI.emp, GHI.value, "JKL", JKL.emp, JKL.value) AS (CODE, EMP, VALUE)'
output_df = input_df.selectExpr(stack, 'DATE')
output_df.show(truncate=False)
# Dynamic
def stack_columns(dataframe):
struct_cols = [column for column, schema in dataframe.dtypes if schema.startswith('struct')]
stack_expr = ['"{0}", {0}.emp, {0}.value'.format(column) for column in struct_cols]
return 'stack({length}, {stack}) AS (CODE, EMP, VALUE)'.format(length=len(struct_cols), stack=','.join(stack_expr))
output_df = input_df.selectExpr(stack_columns(input_df), 'DATE')
output_df.show(truncate=False)
Output
---- --- ------------ ----------
|CODE|EMP|VALUE |DATE |
---- --- ------------ ----------
|ABC |A1 |12421 |02/09/2021|
|DEF |D1 |3345 |02/09/2021|
|GHI |G2 |260048836600|02/09/2021|
|JKL |J1 |66654654 |02/09/2021|
---- --- ------------ ----------