Home > front end >  Updating json column using window cumulative via pyspark
Updating json column using window cumulative via pyspark

Time:01-19

I've been trying to update a series of JSON blobs via Pyspark, SparkSQL, and Pandas but have been unsuccessful. Here is what the data looks like:

# --- --------- ------------------------------------------ 
#|ID |Timestamp|Properties                                |
# --- --------- ------------------------------------------ 
#|a  |7        |{"a1": 5, "a2": 8}                        |
#|b  |12       |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|
#|a  |8        |{"a2": 4}                                 |
#|a  |10       |{"a3": "z", "a4": "t"}                    |
#|a  |5        |{"a1": 3, "a2": 12, "a4": "r"}            |
#|b  |20       |{"b2": "k", "b3": 9}                      |
#|b  |14       |{"b8": "y", "b3": 2}                      |
# --- --------- ------------------------------------------ 

I want a query that will partition the rows by the ID field and sort it by the Timestamp field. After this the Properties field would cumulatively get merged in each partition to create a new column New Props. So the output would be this:

# --- --------- ------------------------------------------ ------------------------------------------ ------ 
#|ID |Timestamp|Properties                                |New_Props                                 |rownum|
# --- --------- ------------------------------------------ ------------------------------------------ ------ 
#|a  |5        |{"a1": 3, "a2": 12, "a4": "r"}            |{"a1": 3, "a2": 12, "a4": "r"}            |1     |
#|a  |7        |{"a1": 5, "a2": 8}                        |{"a1": 5, "a2": 8, "a4": "r"}             |2     |
#|a  |8        |{"a2": 4}                                 |{"a1": 5, "a2": 4, "a4": "r"}             |3     |
#|a  |10       |{"a3": "z", "a4": "t"}                    |{"a1": 5, "a2": 4, "a3": "z", "a4": "t"}  |4     |
#|b  |12       |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|1     |
#|b  |14       |{"b8": "y", "b3": 2}                      |{"b1": 36, "b2": "u", "b3": 2, "b8": "y"} |2     |
#|b  |20       |{"b2": "k", "b3": 9}                      |{"b1": 36, "b2": "k", "b3": 9, "b8": "y"} |3     |
# --- --------- ------------------------------------------ ------ ------------------------------------------ 

Formula: Starting from rownum 2, get the New Props column value of the previous row (rownum 1) and update it with the value in column Properties of the current row (rownum 2).

I tried using the LAG function but I can't use a column that I'm currently calculating within the function itself.

To create the Next Props column I tried this CASE statement but it did not work:

CASE
    WHEN rownum != 1 THEN concat(properties, LAG(next_props, 1) OVER (PARTITION BY contentid ORDER BY updateddatetime))
    ELSE next_props
END AS new_props

I've been trying different things for awhile but I'm stuck. I can probably do it with a for loop and the python dict.update() function but I'm worried about efficiency. Any help is appreciated.

CodePudding user response:

Here's one way using higher-order functions on array and map columns:

  1. get previous Properties for each row using lag and convert both the previous and current row Properties into map type
  2. using collect_list function over window, get cumulative array of previous row Properties
  3. add the current row Properties to the resulting array and use aggregate to concatenate inner maps using map_concat. From your example, it seems that the update operation consists on simply adding the new keys, so before concat, we filter the already existing keys using map_filter function
  4. use to_json to get json string from the aggregated map and drop intermediary columns
from pyspark.sql import functions as F, Window

w = Window.partitionBy("ID").orderBy("Timestamp")

df1 = df.withColumn("rownum", F.row_number().over(w)) \
    .withColumn("prev_prop_map", F.from_json(F.lag("Properties").over(w), "map<string,string>")) \
    .withColumn("current_prop_map", F.from_json("Properties", "map<string,string>")) \
    .withColumn("cumulative_prev_props", F.collect_list("prev_prop_map").over(w)) \
    .withColumn(
        "New_Props",
        F.to_json(F.aggregate(
            F.concat(F.array("current_prop_map"), F.reverse(F.col("cumulative_prev_props"))),
            F.expr("cast(map() as map<string,string>)"),
            lambda acc, x: F.map_concat(
                acc,
                F.map_filter(x, lambda k, _: ~F.array_contains(F.map_keys(acc), k))
            )
        ))
).drop("prev_prop_map", "current_prop_map", "cumulative_prev_props")


df1.show(truncate=False)
# --- --------- ------------------------------------------ ------ --------------------------------------- 
#|ID |Timestamp|Properties                                |rownum|New_Props                              |
# --- --------- ------------------------------------------ ------ --------------------------------------- 
#|a  |5        |{"a1": 3, "a2": 12, "a4": "r"}            |1     |{"a1":"3","a2":"12","a4":"r"}          |
#|a  |7        |{"a1": 5, "a2": 8}                        |2     |{"a1":"5","a2":"8","a4":"r"}           |
#|a  |8        |{"a2": 4}                                 |3     |{"a2":"4","a1":"5","a4":"r"}           |
#|a  |10       |{"a3": "z", "a4": "t"}                    |4     |{"a3":"z","a4":"t","a2":"4","a1":"5"}  |
#|b  |12       |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|1     |{"b1":"36","b2":"u","b3":"17","b8":"c"}|
#|b  |14       |{"b8": "y", "b3": 2}                      |2     |{"b8":"y","b3":"2","b1":"36","b2":"u"} |
#|b  |20       |{"b2": "k", "b3": 9}                      |3     |{"b2":"k","b3":"9","b8":"y","b1":"36"} |
# --- --------- ------------------------------------------ ------ --------------------------------------- 

If you prefer using SQL query, here's the equivalent SparkSQL:

WITH props AS (
    SELECT  *,
            row_number() over(partition by ID order by Timestamp) AS rownum,
            from_json(lag(Properties) over(partition by ID order by Timestamp), 'map<string,string>') AS prev_prop_map,
            from_json(Properties, 'map<string,string>') AS current_prop_map
    FROM    props_tb
),  cumulative_props AS (
    SELECT  *,
            collect_list(prev_prop_map) over(partition by ID order by Timestamp) AS cumulative_prev_props
    FROM    props 
)

SELECT  ID,
        Timestamp,
        Properties,
        aggregate(
            concat(array(current_prop_map), reverse(cumulative_prev_props)),
            cast(map() as map<string,string>),
            (acc, x) -> map_concat(acc, map_filter(x, (k,v) -> ! array_contains(map_keys(acc), k)))
        ) AS New_Props,
        rownum
FROM    cumulative_props
  •  Tags:  
  • Related