Home > Software engineering >  explode json column using pyspark
explode json column using pyspark

Time:01-04

I have dataframe as below:

 ----------------------------------------------------------------------------------------------- ----------------------- 
|value                                                                                          |timestamp              |
 ----------------------------------------------------------------------------------------------- ----------------------- 
|{"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"[email protected]"}}|2023-01-03 11:02:11.975|
|{"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"[email protected]"}}  |2023-01-03 11:02:11.976|
|{"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"[email protected]"}}       |2023-01-03 11:02:11.976|
|{"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"[email protected]"}} |2023-01-03 11:02:11.976|
 ----------------------------------------------------------------------------------------------- ----------------------- 
root
 |-- value: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

Expected Result using pyspark:

 --------- ------------- ------------- ----------------------- 
id        | first_name  | last_name   | email                 |
 --------- ------------- ------------- ----------------------- 
1001      | Sally       | Thomas      | [email protected] |
1002      | George      | Bailey      | [email protected]    |
1003      | Edward      | Walker      | [email protected]         |
1004      | Anne        | Kretchmar   | [email protected]    |

Any help is appreciated

CodePudding user response:

You can use the from_json function from pyspark.sql.functions to do this. That function needs to have a schema defined, which you can also create. This is your dataframe:

df = spark.createDataFrame(
    [
        ("""{"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"[email protected]"}}""",
         "2023-01-03 11:02:11.975"),
        ("""{"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"[email protected]"}}""",
         "2023-01-03 11:02:11.976"),
        ("""{"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"[email protected]"}}""",
         "2023-01-03 11:02:11.976"),
        ("""{"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"[email protected]"}}""",
         "2023-01-03 11:02:11.976"),
    ],
    ["value", "timestamp"]
)

And a possible solution looks something like this:

from pyspark.sql.types import IntegerType, StringType, StructType, StructField
from pyspark.sql.functions import from_json

schema = StructType([
    StructField("after", StructType([
        StructField("id", IntegerType(), True),
        StructField("first_name", StringType(), True),
        StructField("last_name", StringType(), True),
        StructField("email", StringType(), True),
    ]), True),
    StructField("timestamp", StringType(), True)
])

output = df.withColumn("value", from_json("value", schema)) \
           .select("value.after.*")

output.show()
 ---- ---------- --------- -------------------- 
|  id|first_name|last_name|               email|
 ---- ---------- --------- -------------------- 
|1001|     Sally|   Thomas|sally.thomas@acme...|
|1002|    George|   Bailey|  [email protected]|
|1003|    Edward|   Walker|       [email protected]|
|1004|      Anne|Kretchmar|  [email protected]|
 ---- ---------- --------- -------------------- 

The last select statement flattens the nested structure into separate fields.

Using the different pyspark.sql.types you can create any kind of schema, accept whether fields are nullable, etc... More information about data types can be found here.

CodePudding user response:

you can use pyspark's from_json function to parse json string(s). the function requires the format to parse. in your case, you can get a struct of struct.

data_sdf. \
    withColumn('parsed_json', 
               func.from_json('value', 
                              'after struct<id: bigint, first_name: string, last_name: string, email: string>'
                              )
               ). \
    withColumn('inner_struct', func.col('parsed_json.after')). \
    selectExpr('ts', 'inner_struct.*'). \
    show(truncate=False)

#  ----------------------- ---- ---------- --------- --------------------- 
# |ts                     |id  |first_name|last_name|email                |
#  ----------------------- ---- ---------- --------- --------------------- 
# |2023-01-03 11:02:11.975|1001|Sally     |Thomas   |[email protected]|
# |2023-01-03 11:02:11.976|1002|George    |Bailey   |[email protected]   |
# |2023-01-03 11:02:11.976|1003|Edward    |Walker   |[email protected]        |
# |2023-01-03 11:02:11.976|1004|Anne      |Kretchmar|[email protected]   |
#  ----------------------- ---- ---------- --------- --------------------- 

the parsed data would look like the following

data_sdf. \
    withColumn('parsed_json', 
               func.from_json('value', 
                              'after struct<id: bigint, first_name: string, last_name: string, email: string>'
                              )
               ). \
    withColumn('inner_struct', func.col('parsed_json.after')). \
    show(truncate=False)

#  ----------------------------------------------------------------------------------------------- ----------------------- ---------------------------------------------- -------------------------------------------- 
# |value                                                                                          |ts                     |parsed_json                                   |inner_struct                                |
#  ----------------------------------------------------------------------------------------------- ----------------------- ---------------------------------------------- -------------------------------------------- 
# |{"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"[email protected]"}}|2023-01-03 11:02:11.975|{{1001, Sally, Thomas, [email protected]}}|{1001, Sally, Thomas, [email protected]}|
# |{"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"[email protected]"}}  |2023-01-03 11:02:11.976|{{1002, George, Bailey, [email protected]}}  |{1002, George, Bailey, [email protected]}  |
# |{"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"[email protected]"}}       |2023-01-03 11:02:11.976|{{1003, Edward, Walker, [email protected]}}       |{1003, Edward, Walker, [email protected]}       |
# |{"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"[email protected]"}} |2023-01-03 11:02:11.976|{{1004, Anne, Kretchmar, [email protected]}} |{1004, Anne, Kretchmar, [email protected]} |
#  ----------------------------------------------------------------------------------------------- ----------------------- ---------------------------------------------- -------------------------------------------- 

# root
#  |-- value: string (nullable = true)
#  |-- ts: string (nullable = true)
#  |-- parsed_json: struct (nullable = true)
#  |    |-- after: struct (nullable = true)
#  |    |    |-- id: long (nullable = true)
#  |    |    |-- first_name: string (nullable = true)
#  |    |    |-- last_name: string (nullable = true)
#  |    |    |-- email: string (nullable = true)
#  |-- inner_struct: struct (nullable = true)
#  |    |-- id: long (nullable = true)
#  |    |-- first_name: string (nullable = true)
#  |    |-- last_name: string (nullable = true)
#  |    |-- email: string (nullable = true)
  • Related