I am trying to get keys and values of column which has some rows as json and others as string/None. I want to get each json key and it's value stacked into new different columns respectively.
Json can be nested type and I don't know the schema of json rows beforehand (like how many keys are/ how nested it is)
.
Data set has 100s millions of rows
.
Sample data:
----------- ---------- -------------------------------------------------------
|col1 |col2 | col3 | col4 |
----- ----- ---------- --------------------------------------------------------
|1 |A | hello1 | time1 |
|2 |B | hello2 | None
|3 |C | hello3 | {'world1': 'how are you?','world2':{'help me!':'please'}}
|4 |D | hello4 | {'world3':'ola!'} |
----- ----- ---------- -------------------------------------------------------
Expected dataframe:
-------- ------------------- -------------------------------
|col1 |col2 | col3 | new_col_keys | new_col_values |
-------- ------------------- ---------- --------------------
|1 |A | hello1 | time1 Null |
|2 |B | hello2 | Null Null |
|3 |C | hello3 | world1 how are you? |
|3 |C | hello3 | world2 {'help me!':'please'}|
|4 |D | hello4 | world3 ola! |
----- ----- ---------- -------------------------------------
Here I am adding new columns for keys and values differently and deleting the original one.
NOTE: All columns in Sample data are of StringType
CodePudding user response:
You may try the following :
- Using the type and function imports from
pyspark.sql
- Filter out non-json type records using pattern matching (eg.
LIKE
) not (~
) like as json dictionary/map type values have the pattern{key:value}
and store in a dataframedf_simple
- Filter out json type dictionary records using pattern matching using
LIKE
and store results in a dataframedf_map
. - Cast records to
MapType
with string keys and string values usingfrom_json
- Use
select
to retrieve desired columns andexplode
to get each key/value pair from the transformedMapType
columncol4
, into a new row - Rename exploded columns names (
key
->new_col_keys
andvalue
->new_col_values
) usingwithColumnRenamed
- Use a union to combine
df_simple
anddf_map
into final data setoutput_df
(While using theselect
in the union is optional, it ensures that the rigth columns are used even if the dataframe changes in the future)
NB. Sections related to steps in code are included as comments
# Step 1
from pyspark.sql import functions as F
from pyspark.sql import types as T
df_simple = df.where(~ F.col("col4").like("{%}")) #Step 2
df_simple.show() # only for debugging purposes
---- ---- ------ -----
|col1|col2| col3| col4|
---- ---- ------ -----
| 1| A|hello1|time1|
| 2| B|hello2| null|
---- ---- ------ -----
df_maps = (
# Step 3
df.where(F.col("col4").like("{%}"))
# Step 4
.withColumn("col4",F.from_json(
F.col("col4"),
T.MapType(T.StringType(),T.StringType())
))
# Step 5
.select(
F.col("col1"),
F.col("col2"),
F.col("col3"),
F.explode("col4")
)
# Step 6
.withColumnRenamed("key","new_col_keys")
.withColumnRenamed("value","new_col_values")
)
df_maps.show(truncate=False) # only for debugging purposes
---- ---- ------ ------------ ---------------------
|col1|col2|col3 |new_col_keys|new_col_values |
---- ---- ------ ------------ ---------------------
|3 |C |hello3|world1 |how are you? |
|3 |C |hello3|world2 |{"help me!":"please"}|
|4 |D |hello4|world3 |ola! |
---- ---- ------ ------------ ---------------------
# Step 7
output_df = (
df_simple.selectExpr("col1","col2","col3","col4 as new_col_keys","NULL as new_col_values")
.union(
df_maps.select("col1","col2","col3","new_col_keys","new_col_values")
)
)
output_df.printSchema() # only for debugging
output_df.show(truncate=False) # only for debugging
root
|-- col1: string (nullable = true)
|-- col2: string (nullable = true)
|-- col3: string (nullable = true)
|-- new_col_keys: string (nullable = true)
|-- new_col_values: string (nullable = true)
---- ---- ------ ------------ ---------------------
|col1|col2|col3 |new_col_keys|new_col_values |
---- ---- ------ ------------ ---------------------
|1 |A |hello1|time1 |null |
|2 |B |hello2|null |null |
|3 |C |hello3|world1 |how are you? |
|3 |C |hello3|world2 |{"help me!":"please"}|
|4 |D |hello4|world3 |ola! |
---- ---- ------ ------------ ---------------------
NB. If you would like to repeat this for nested columns. You could repeat the operations from Steps 3 - 7. However, this time instead of using col4
, you would use new_col_keys
since col4
would no longer exist in the transformed dataset.
Let me know if this works for you.