I have a json file which contains a dictionary in the following format:
{"a1":{"b1":["c1","c2"], "b2":["c4","c3"]}, "a2":{"b3":["c1","c4"]}}
Is it possible to convert this dictionary into a PySpark dataframe as the following?
col1 | col2 | col3
----------------------
| a1 | b1 | c1 |
----------------------
| a1 | b1 | c2 |
----------------------
| a1 | b2 | c4 |
----------------------
| a1 | b2 | c3 |
----------------------
| a2 | b3 | c1 |
----------------------
| a2 | b3 | c4 |
I have seen the standard format of converting json to PySpark dataframe (example in this link) but was wondering about nested dictionaries that contain lists as well.
CodePudding user response:
Interesting problem! The main struggle I realized with this problem is your when reading from JSON, your schema is likely has struct type, making it harder to solve, because basically a1
has different type than a2
.
My idea is using somehow converting your struct type to map type, then stack them together, then apply a few explode
s:
This is your df
----------------------------------
|data |
----------------------------------
|{{[c1, c2], [c4, c3]}, {[c1, c4]}}|
----------------------------------
root
|-- data: struct (nullable = true)
| |-- a1: struct (nullable = true)
| | |-- b1: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- b2: array (nullable = true)
| | | |-- element: string (containsNull = true)
| |-- a2: struct (nullable = true)
| | |-- b3: array (nullable = true)
| | | |-- element: string (containsNull = true)
Create a temporary df to handle JSON's first level
first_level_df = df.select('data.*')
first_level_df.show()
first_level_cols = first_level_df.columns # ['a1', 'a2']
-------------------- ----------
| a1| a2|
-------------------- ----------
|{[c1, c2], [c4, c3]}|{[c1, c4]}|
-------------------- ----------
Some helper variables
map_cols = [F.from_json(F.to_json(c), T.MapType(T.StringType(), T.StringType())).alias(c) for c in first_level_cols]
# [Column<'entries AS a1'>, Column<'entries AS a2'>]
stack_cols = ', '.join([f"'{c}', {c}" for c in first_level_cols])
# 'a1', a1, 'a2', a2
Main transformation
(first_level_df
.select(map_cols)
.select(F.expr(f'stack(2, {stack_cols})').alias('AA', 'temp'))
.select('AA', F.explode('temp').alias('BB', 'temp'))
.select('AA', 'BB', F.explode(F.from_json('temp', T.ArrayType(T.StringType()))).alias('CC'))
.show(10, False)
)
--- --- ---
|AA |BB |CC |
--- --- ---
|a1 |b1 |c1 |
|a1 |b1 |c2 |
|a1 |b2 |c4 |
|a1 |b2 |c3 |
|a2 |b3 |c1 |
|a2 |b3 |c4 |
--- --- ---