Home > Mobile >  convert python dictionary into pyspark dataframe
convert python dictionary into pyspark dataframe

Time:04-09

I have a json file which contains a dictionary in the following format:

{"a1":{"b1":["c1","c2"], "b2":["c4","c3"]}, "a2":{"b3":["c1","c4"]}}

Is it possible to convert this dictionary into a PySpark dataframe as the following?

 col1 |  col2 |  col3
----------------------
| a1  |   b1  |  c1  |
----------------------
| a1  |   b1  |  c2  |
----------------------
| a1  |   b2  |  c4  |
----------------------
| a1  |   b2  |  c3  |
----------------------
| a2  |   b3  |  c1  |
----------------------
| a2  |   b3  |  c4  |

I have seen the standard format of converting json to PySpark dataframe (example in this link) but was wondering about nested dictionaries that contain lists as well.

CodePudding user response:

Interesting problem! The main struggle I realized with this problem is your when reading from JSON, your schema is likely has struct type, making it harder to solve, because basically a1 has different type than a2.

My idea is using somehow converting your struct type to map type, then stack them together, then apply a few explodes:

This is your df
 ---------------------------------- 
|data                              |
 ---------------------------------- 
|{{[c1, c2], [c4, c3]}, {[c1, c4]}}|
 ---------------------------------- 

root
 |-- data: struct (nullable = true)
 |    |-- a1: struct (nullable = true)
 |    |    |-- b1: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- b2: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |-- a2: struct (nullable = true)
 |    |    |-- b3: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
Create a temporary df to handle JSON's first level
first_level_df = df.select('data.*')
first_level_df.show()
first_level_cols = first_level_df.columns # ['a1', 'a2']

 -------------------- ---------- 
|                  a1|        a2|
 -------------------- ---------- 
|{[c1, c2], [c4, c3]}|{[c1, c4]}|
 -------------------- ---------- 
Some helper variables
map_cols = [F.from_json(F.to_json(c), T.MapType(T.StringType(), T.StringType())).alias(c) for c in first_level_cols]
# [Column<'entries AS a1'>, Column<'entries AS a2'>]

stack_cols = ', '.join([f"'{c}', {c}" for c in first_level_cols])
# 'a1', a1, 'a2', a2
Main transformation
(first_level_df
    .select(map_cols)
    .select(F.expr(f'stack(2, {stack_cols})').alias('AA', 'temp'))
    .select('AA', F.explode('temp').alias('BB', 'temp'))
    .select('AA', 'BB', F.explode(F.from_json('temp', T.ArrayType(T.StringType()))).alias('CC'))
    .show(10, False)
)

 --- --- --- 
|AA |BB |CC |
 --- --- --- 
|a1 |b1 |c1 |
|a1 |b1 |c2 |
|a1 |b2 |c4 |
|a1 |b2 |c3 |
|a2 |b3 |c1 |
|a2 |b3 |c4 |
 --- --- --- 
  • Related