I have dataframe as below:
----------------------------------------------------------------------------------------------- -----------------------
|value |timestamp |
----------------------------------------------------------------------------------------------- -----------------------
|{"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"[email protected]"}}|2023-01-03 11:02:11.975|
|{"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"[email protected]"}} |2023-01-03 11:02:11.976|
|{"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"[email protected]"}} |2023-01-03 11:02:11.976|
|{"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"[email protected]"}} |2023-01-03 11:02:11.976|
----------------------------------------------------------------------------------------------- -----------------------
root
|-- value: string (nullable = true)
|-- timestamp: timestamp (nullable = true)
Expected Result using pyspark:
--------- ------------- ------------- -----------------------
id | first_name | last_name | email |
--------- ------------- ------------- -----------------------
1001 | Sally | Thomas | [email protected] |
1002 | George | Bailey | [email protected] |
1003 | Edward | Walker | [email protected] |
1004 | Anne | Kretchmar | [email protected] |
Any help is appreciated
CodePudding user response:
You can use the from_json
function from pyspark.sql.functions
to do this. That function needs to have a schema defined, which you can also create. This is your dataframe:
df = spark.createDataFrame(
[
("""{"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"[email protected]"}}""",
"2023-01-03 11:02:11.975"),
("""{"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"[email protected]"}}""",
"2023-01-03 11:02:11.976"),
("""{"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"[email protected]"}}""",
"2023-01-03 11:02:11.976"),
("""{"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"[email protected]"}}""",
"2023-01-03 11:02:11.976"),
],
["value", "timestamp"]
)
And a possible solution looks something like this:
from pyspark.sql.types import IntegerType, StringType, StructType, StructField
from pyspark.sql.functions import from_json
schema = StructType([
StructField("after", StructType([
StructField("id", IntegerType(), True),
StructField("first_name", StringType(), True),
StructField("last_name", StringType(), True),
StructField("email", StringType(), True),
]), True),
StructField("timestamp", StringType(), True)
])
output = df.withColumn("value", from_json("value", schema)) \
.select("value.after.*")
output.show()
---- ---------- --------- --------------------
| id|first_name|last_name| email|
---- ---------- --------- --------------------
|1001| Sally| Thomas|sally.thomas@acme...|
|1002| George| Bailey| [email protected]|
|1003| Edward| Walker| [email protected]|
|1004| Anne|Kretchmar| [email protected]|
---- ---------- --------- --------------------
The last select
statement flattens the nested structure into separate fields.
Using the different pyspark.sql.types
you can create any kind of schema, accept whether fields are nullable
, etc... More information about data types can be found here.
CodePudding user response:
you can use pyspark's from_json
function to parse json string(s). the function requires the format to parse. in your case, you can get a struct of struct.
data_sdf. \
withColumn('parsed_json',
func.from_json('value',
'after struct<id: bigint, first_name: string, last_name: string, email: string>'
)
). \
withColumn('inner_struct', func.col('parsed_json.after')). \
selectExpr('ts', 'inner_struct.*'). \
show(truncate=False)
# ----------------------- ---- ---------- --------- ---------------------
# |ts |id |first_name|last_name|email |
# ----------------------- ---- ---------- --------- ---------------------
# |2023-01-03 11:02:11.975|1001|Sally |Thomas |[email protected]|
# |2023-01-03 11:02:11.976|1002|George |Bailey |[email protected] |
# |2023-01-03 11:02:11.976|1003|Edward |Walker |[email protected] |
# |2023-01-03 11:02:11.976|1004|Anne |Kretchmar|[email protected] |
# ----------------------- ---- ---------- --------- ---------------------
the parsed data would look like the following
data_sdf. \
withColumn('parsed_json',
func.from_json('value',
'after struct<id: bigint, first_name: string, last_name: string, email: string>'
)
). \
withColumn('inner_struct', func.col('parsed_json.after')). \
show(truncate=False)
# ----------------------------------------------------------------------------------------------- ----------------------- ---------------------------------------------- --------------------------------------------
# |value |ts |parsed_json |inner_struct |
# ----------------------------------------------------------------------------------------------- ----------------------- ---------------------------------------------- --------------------------------------------
# |{"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"[email protected]"}}|2023-01-03 11:02:11.975|{{1001, Sally, Thomas, [email protected]}}|{1001, Sally, Thomas, [email protected]}|
# |{"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"[email protected]"}} |2023-01-03 11:02:11.976|{{1002, George, Bailey, [email protected]}} |{1002, George, Bailey, [email protected]} |
# |{"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"[email protected]"}} |2023-01-03 11:02:11.976|{{1003, Edward, Walker, [email protected]}} |{1003, Edward, Walker, [email protected]} |
# |{"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"[email protected]"}} |2023-01-03 11:02:11.976|{{1004, Anne, Kretchmar, [email protected]}} |{1004, Anne, Kretchmar, [email protected]} |
# ----------------------------------------------------------------------------------------------- ----------------------- ---------------------------------------------- --------------------------------------------
# root
# |-- value: string (nullable = true)
# |-- ts: string (nullable = true)
# |-- parsed_json: struct (nullable = true)
# | |-- after: struct (nullable = true)
# | | |-- id: long (nullable = true)
# | | |-- first_name: string (nullable = true)
# | | |-- last_name: string (nullable = true)
# | | |-- email: string (nullable = true)
# |-- inner_struct: struct (nullable = true)
# | |-- id: long (nullable = true)
# | |-- first_name: string (nullable = true)
# | |-- last_name: string (nullable = true)
# | |-- email: string (nullable = true)