Home > Software design >  how to get json keys and values of a column stacked in different columns respectively Pyspark
how to get json keys and values of a column stacked in different columns respectively Pyspark

Time:10-17

I am trying to get keys and values of column which has some rows as json and others as string/None. I want to get each json key and it's value stacked into new different columns respectively. Json can be nested type and I don't know the schema of json rows beforehand (like how many keys are/ how nested it is).

Data set has 100s millions of rows.

Sample data:

 ----------- ---------- ------------------------------------------------------- 
|col1 |col2 | col3     |     col4                                              | 
 ----- ----- ---------- --------------------------------------------------------
|1    |A    | hello1   |   time1                                               |    
|2    |B    | hello2   |    None
|3    |C    | hello3   |  {'world1': 'how are you?','world2':{'help me!':'please'}}                
|4    |D    | hello4   |  {'world3':'ola!'}                                    |                 
 ----- ----- ---------- ------------------------------------------------------- 

Expected dataframe:

 -------- ------------------- ------------------------------- 
|col1 |col2 | col3     | new_col_keys   |   new_col_values   |
 -------- ------------------- ---------- -------------------- 
|1    |A    | hello1   |   time1                Null         |                        
|2    |B    | hello2   |   Null                 Null         |
|3    |C    | hello3   |  world1                how are you? |
|3    |C    | hello3   |  world2        {'help me!':'please'}|                  
|4    |D    | hello4   |  world3                ola!         |
 ----- ----- ---------- -------------------------------------               

Here I am adding new columns for keys and values differently and deleting the original one.

NOTE: All columns in Sample data are of StringType

CodePudding user response:

You may try the following :

  1. Using the type and function imports from pyspark.sql
  2. Filter out non-json type records using pattern matching (eg. LIKE) not (~) like as json dictionary/map type values have the pattern {key:value} and store in a dataframe df_simple
  3. Filter out json type dictionary records using pattern matching using LIKE and store results in a dataframe df_map.
  4. Cast records to MapType with string keys and string values using from_json
  5. Use select to retrieve desired columns and explode to get each key/value pair from the transformed MapType column col4, into a new row
  6. Rename exploded columns names (key->new_col_keys and value->new_col_values) using withColumnRenamed
  7. Use a union to combine df_simple and df_map into final data set output_df (While using the select in the union is optional, it ensures that the rigth columns are used even if the dataframe changes in the future)

NB. Sections related to steps in code are included as comments

# Step 1
from pyspark.sql import functions as F
from pyspark.sql import types as T
df_simple = df.where(~ F.col("col4").like("{%}")) #Step 2
df_simple.show() # only for debugging purposes
 ---- ---- ------ ----- 
|col1|col2|  col3| col4|
 ---- ---- ------ ----- 
|   1|   A|hello1|time1|
|   2|   B|hello2| null|
 ---- ---- ------ ----- 
df_maps = (
    # Step 3
    df.where(F.col("col4").like("{%}")) 
    # Step 4
      .withColumn("col4",F.from_json( 
          F.col("col4"),
          T.MapType(T.StringType(),T.StringType())
      ))
      # Step 5
      .select(
          F.col("col1"),
          F.col("col2"),
          F.col("col3"),
          F.explode("col4")
      )
      # Step 6
      .withColumnRenamed("key","new_col_keys")
      .withColumnRenamed("value","new_col_values")
)
df_maps.show(truncate=False) # only for debugging purposes
 ---- ---- ------ ------------ --------------------- 
|col1|col2|col3  |new_col_keys|new_col_values       |
 ---- ---- ------ ------------ --------------------- 
|3   |C   |hello3|world1      |how are you?         |
|3   |C   |hello3|world2      |{"help me!":"please"}|
|4   |D   |hello4|world3      |ola!                 |
 ---- ---- ------ ------------ --------------------- 
# Step 7
output_df = (
    df_simple.selectExpr("col1","col2","col3","col4 as new_col_keys","NULL as new_col_values")
             .union(
                 df_maps.select("col1","col2","col3","new_col_keys","new_col_values")
             )
)
output_df.printSchema() # only for debugging
output_df.show(truncate=False)  # only for debugging
root
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: string (nullable = true)
 |-- new_col_keys: string (nullable = true)
 |-- new_col_values: string (nullable = true)

 ---- ---- ------ ------------ --------------------- 
|col1|col2|col3  |new_col_keys|new_col_values       |
 ---- ---- ------ ------------ --------------------- 
|1   |A   |hello1|time1       |null                 |
|2   |B   |hello2|null        |null                 |
|3   |C   |hello3|world1      |how are you?         |
|3   |C   |hello3|world2      |{"help me!":"please"}|
|4   |D   |hello4|world3      |ola!                 |
 ---- ---- ------ ------------ --------------------- 

NB. If you would like to repeat this for nested columns. You could repeat the operations from Steps 3 - 7. However, this time instead of using col4, you would use new_col_keys since col4 would no longer exist in the transformed dataset.

Let me know if this works for you.

  • Related