I have a dataframe with a column of string datatype. The string represents an api request that returns a json.
df = spark.createDataFrame([
("[{original={ranking=1.0, input=top3}, response=[{to=Sam, position=guard}, {to=John, position=center}, {to=Andrew, position=forward}]}]",1)],
"col1:string, col2:int")
df.show()
Which generates a dataframe like:
-------------------- ----
| col1|col2|
-------------------- ----
|[{original={ranki...| 1|
-------------------- ----
The output I would like to have col2 and have two additional columns from the response. Col3 would capture the player name, indicated by to= and col 4 would have their position indicated by position=. As well as the dataframe would now have three rows, since there's three players. Example:
---- ------ -------
|col2| col3| col4|
---- ------ -------
| 1| Sam| guard|
| 1| John| center|
| 1|Andrew|forward|
---- ------ -------
I've read that I can leverage something like:
df.withColumn("col3",explode(from_json("col1")))
However, I'm not sure how to explode given I want two columns instead of one and need the schema.
Note, I can modify the response using json_dumps to return only the response piece of the string or...
[{to=Sam, position=guard}, {to=John, position=center}, {to=Andrew, position=forward}]}]
CodePudding user response:
If you simplify the output like mentioned, you can define a simple JSON schema and convert JSON string into StructType
and read each fields
Input
df = spark.createDataFrame([("[{'to': 'Sam', 'position': 'guard'},{'to': 'John', 'position': 'center'},{'to': 'Andrew', 'position': 'forward'}]",1)], "col1:string, col2:int")
# ----------------------------------------------------------------------------------------------------------------- ----
# |col1 |col2|
# ----------------------------------------------------------------------------------------------------------------- ----
# |[{'to': 'Sam', 'position': 'guard'},{'to': 'John', 'position': 'center'},{'to': 'Andrew', 'position': 'forward'}]|1 |
# ----------------------------------------------------------------------------------------------------------------- ----
And this is the transformation
from pyspark.sql import functions as F
from pyspark.sql import types as T
schema = T.ArrayType(T.StructType([
T.StructField('to', T.StringType()),
T.StructField('position', T.StringType())
]))
(df
.withColumn('temp', F.explode(F.from_json('col1', schema=schema)))
.select(
F.col('col2'),
F.col('temp.to').alias('col3'),
F.col('temp.position').alias('col4'),
)
.show()
)
# Output
# ---- ------ -------
# |col2| col3| col4|
# ---- ------ -------
# | 1| Sam| guard|
# | 1| John| center|
# | 1|Andrew|forward|
# ---- ------ -------