A column inside my table contains an array JSON object (JSON string): one JSON presents a timestamp. The data is a record which the format is presented by sheet and the related data ("param_value") is a array JSON. The "param_value" contains parameters' value for each timestamp. And I want to transform it to by 'sheet', 'equipment', and 'point'. I have referred to this post already. But I can't use '*' to select the all the schema to expand. I can't sure the schema because this work is an ETL job. And it show that I need to use structType to build the schema.
The table looks like:
sheet | equip | param_value |
---|---|---|
a1 | E1 | [{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3', 'status':'ok','log':'ok'}] |
a2 | E1 | [{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3', 'status':'ok','log':'ok'}] |
a3 | E1 | [{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3', 'status':'ok','log':'ok'}] |
The expected result:
sheet | equipment | point | status | log |
---|---|---|---|---|
a1 | E1 | 1 | no | no |
a1 | E1 | 2 | ok | no |
a1 | E1 | 3 | ok | ok |
a2 | E1 | 1 | no | no |
a2 | E1 | 2 | ok | no |
a2 | E1 | 3 | ok | ok |
a3 | E1 | 1 | no | no |
a3 | E1 | 2 | ok | no |
a3 | E1 | 3 | ok | ok |
CodePudding user response:
If param_value is string then you need to parse this string as a JSON and then explode it into rows and expand the keys to columns:
import pyspark.sql.functions as F
from pyspark.sql.types import *
data = [('a1', 'E1',"[{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3','status':'ok','log':'ok'}]")
,('a2', 'E1',"[{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3','status':'ok','log':'ok'}]")
,('a3', 'E1',"[{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3','status':'ok','log':'ok'}]")]
schema = ArrayType(StructType([StructField('point', StringType()),
StructField('status', StringType()),
StructField('log', StringType())]))
df = spark.createDataFrame(data, ['id', 'equip', 'param_value'])\
.withColumn('json_col', F.from_json(F.col('param_value'),schema))\
.select("id", "equip", F.explode("json_col").alias("json_col"))\
.select("id", "equip", F.col('json_col.*'))
df.show()
# --- ----- ----- ------ ---
# | id|equip|point|status|log|
# --- ----- ----- ------ ---
# | a1| E1| 1| no| no|
# | a1| E1| 2| ok| no|
# | a1| E1| 3| ok| ok|
# | a2| E1| 1| no| no|
# | a2| E1| 2| ok| no|
# | a2| E1| 3| ok| ok|
# | a3| E1| 1| no| no|
# | a3| E1| 2| ok| no|
# | a3| E1| 3| ok| ok|
# --- ----- ----- ------ ---
CodePudding user response:
To extract data from a JSON string you will need to use from_json
for which you will need to provide the schema. If you define the schema as a DDL-formatted string, then you can use it inside inline
which quickly extracts array of structs into columns.
Input dataframe:
df = spark.createDataFrame(
[('a1', 'E1', "[{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3','status':'ok','log':'ok'}]"),
('a2', 'E1', "[{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3','status':'ok','log':'ok'}]"),
('a3', 'E1', "[{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3','status':'ok','log':'ok'}]")],
['sheet', 'equip', 'param_value'])
Script:
schema = "'array<struct<point:string,status:string,log:string>>'"
df = df.selectExpr("sheet", "equip", f"inline(from_json(param_value, {schema}))")
df.show()
# ----- ----- ----- ------ ---
# |sheet|equip|point|status|log|
# ----- ----- ----- ------ ---
# | a1| E1| 1| no| no|
# | a1| E1| 2| ok| no|
# | a1| E1| 3| ok| ok|
# | a2| E1| 1| no| no|
# | a2| E1| 2| ok| no|
# | a2| E1| 3| ok| ok|
# | a3| E1| 1| no| no|
# | a3| E1| 2| ok| no|
# | a3| E1| 3| ok| ok|
# ----- ----- ----- ------ ---