I've been trying to update a series of JSON blobs via Pyspark, SparkSQL, and Pandas but have been unsuccessful. Here is what the data looks like:
# --- --------- ------------------------------------------
#|ID |Timestamp|Properties |
# --- --------- ------------------------------------------
#|a |7 |{"a1": 5, "a2": 8} |
#|b |12 |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|
#|a |8 |{"a2": 4} |
#|a |10 |{"a3": "z", "a4": "t"} |
#|a |5 |{"a1": 3, "a2": 12, "a4": "r"} |
#|b |20 |{"b2": "k", "b3": 9} |
#|b |14 |{"b8": "y", "b3": 2} |
# --- --------- ------------------------------------------
I want a query that will partition the rows by the ID
field and sort it by the Timestamp
field. After this the Properties
field would cumulatively get merged in each partition to create a new column New Props
. So the output would be this:
# --- --------- ------------------------------------------ ------------------------------------------ ------
#|ID |Timestamp|Properties |New_Props |rownum|
# --- --------- ------------------------------------------ ------------------------------------------ ------
#|a |5 |{"a1": 3, "a2": 12, "a4": "r"} |{"a1": 3, "a2": 12, "a4": "r"} |1 |
#|a |7 |{"a1": 5, "a2": 8} |{"a1": 5, "a2": 8, "a4": "r"} |2 |
#|a |8 |{"a2": 4} |{"a1": 5, "a2": 4, "a4": "r"} |3 |
#|a |10 |{"a3": "z", "a4": "t"} |{"a1": 5, "a2": 4, "a3": "z", "a4": "t"} |4 |
#|b |12 |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|1 |
#|b |14 |{"b8": "y", "b3": 2} |{"b1": 36, "b2": "u", "b3": 2, "b8": "y"} |2 |
#|b |20 |{"b2": "k", "b3": 9} |{"b1": 36, "b2": "k", "b3": 9, "b8": "y"} |3 |
# --- --------- ------------------------------------------ ------ ------------------------------------------
Formula: Starting from rownum
2, get the New Props
column value of the previous row (rownum
1) and update it with the value in column Properties
of the current row (rownum
2).
I tried using the LAG
function but I can't use a column that I'm currently calculating within the function itself.
To create the Next Props
column I tried this CASE statement but it did not work:
CASE
WHEN rownum != 1 THEN concat(properties, LAG(next_props, 1) OVER (PARTITION BY contentid ORDER BY updateddatetime))
ELSE next_props
END AS new_props
I've been trying different things for awhile but I'm stuck. I can probably do it with a for loop and the python dict.update()
function but I'm worried about efficiency. Any help is appreciated.
CodePudding user response:
Here's one way using higher-order functions on array and map columns:
- get previous
Properties
for each row usinglag
and convert both the previous and current rowProperties
into map type - using
collect_list
function over window, get cumulative array of previous rowProperties
- add the current row
Properties
to the resulting array and useaggregate
to concatenate inner maps usingmap_concat
. From your example, it seems that the update operation consists on simply adding the new keys, so before concat, we filter the already existing keys usingmap_filter
function - use
to_json
to get json string from the aggregated map and drop intermediary columns
from pyspark.sql import functions as F, Window
w = Window.partitionBy("ID").orderBy("Timestamp")
df1 = df.withColumn("rownum", F.row_number().over(w)) \
.withColumn("prev_prop_map", F.from_json(F.lag("Properties").over(w), "map<string,string>")) \
.withColumn("current_prop_map", F.from_json("Properties", "map<string,string>")) \
.withColumn("cumulative_prev_props", F.collect_list("prev_prop_map").over(w)) \
.withColumn(
"New_Props",
F.to_json(F.aggregate(
F.concat(F.array("current_prop_map"), F.reverse(F.col("cumulative_prev_props"))),
F.expr("cast(map() as map<string,string>)"),
lambda acc, x: F.map_concat(
acc,
F.map_filter(x, lambda k, _: ~F.array_contains(F.map_keys(acc), k))
)
))
).drop("prev_prop_map", "current_prop_map", "cumulative_prev_props")
df1.show(truncate=False)
# --- --------- ------------------------------------------ ------ ---------------------------------------
#|ID |Timestamp|Properties |rownum|New_Props |
# --- --------- ------------------------------------------ ------ ---------------------------------------
#|a |5 |{"a1": 3, "a2": 12, "a4": "r"} |1 |{"a1":"3","a2":"12","a4":"r"} |
#|a |7 |{"a1": 5, "a2": 8} |2 |{"a1":"5","a2":"8","a4":"r"} |
#|a |8 |{"a2": 4} |3 |{"a2":"4","a1":"5","a4":"r"} |
#|a |10 |{"a3": "z", "a4": "t"} |4 |{"a3":"z","a4":"t","a2":"4","a1":"5"} |
#|b |12 |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|1 |{"b1":"36","b2":"u","b3":"17","b8":"c"}|
#|b |14 |{"b8": "y", "b3": 2} |2 |{"b8":"y","b3":"2","b1":"36","b2":"u"} |
#|b |20 |{"b2": "k", "b3": 9} |3 |{"b2":"k","b3":"9","b8":"y","b1":"36"} |
# --- --------- ------------------------------------------ ------ ---------------------------------------
If you prefer using SQL query, here's the equivalent SparkSQL:
WITH props AS (
SELECT *,
row_number() over(partition by ID order by Timestamp) AS rownum,
from_json(lag(Properties) over(partition by ID order by Timestamp), 'map<string,string>') AS prev_prop_map,
from_json(Properties, 'map<string,string>') AS current_prop_map
FROM props_tb
), cumulative_props AS (
SELECT *,
collect_list(prev_prop_map) over(partition by ID order by Timestamp) AS cumulative_prev_props
FROM props
)
SELECT ID,
Timestamp,
Properties,
aggregate(
concat(array(current_prop_map), reverse(cumulative_prev_props)),
cast(map() as map<string,string>),
(acc, x) -> map_concat(acc, map_filter(x, (k,v) -> ! array_contains(map_keys(acc), k)))
) AS New_Props,
rownum
FROM cumulative_props