Home > OS >  Cumulative array aggregation in SparkSQL
Cumulative array aggregation in SparkSQL

Time:08-10

I have the following dataset

event_id user_id event event_type event_ts item_id next_event_type next_item_id
246984 993922 {"item_id":1000,"user_id":993922,"timestamp":5260,"type":"ITEM_PURCHASED"} ITEM_PURCHASED 5260 1000 ITEM_PURCHASED 1001
246984 993922 {"item_id":1001,"user_id":993922,"timestamp":5260,"type":"ITEM_PURCHASED"} ITEM_PURCHASED 5855 1001 ITEM_PURCHASED 1002

I want to cumulatively append the next item_id to the array. I know I can do this in a udf, but the dataset is quite massive and want to avoid a performance hit.

event_id user_id event event_type event_ts item_id next_event_type next_item_id next_item_set
246984 993922 {"item_id":1000,"user_id":993922,"timestamp":5260,"type":"ITEM_PURCHASED"} ITEM_PURCHASED 5260 1000 ITEM_PURCHASED 1001 [1000, 1001]
246984 993922 {"item_id":1001,"user_id":993922,"timestamp":5260,"type":"ITEM_PURCHASED"} ITEM_PURCHASED 5855 1001 ITEM_PURCHASED 1002 [1000, 1001, 1002]

This is the query I have so far

with a as (
select event_id
    , user_id
    , event
    , event_type
    , event_ts
    , item_id
    , lead(event_type) over (partition by user_id order by event_ts) as next_event_type
    , lead(item_id) over (partition by user_id order by event_ts) as next_item_id
from tableA
)
select *
, concat(lag(next_item_set) over (order by event_ts), array(next_item_id))  as cumulative_item_set
from a
;

CodePudding user response:

You could use collect_list or collect_set and specify the window's frame from unbounded preceding to 1 following. Try adding this to your select clause (SparkSQL):

collect_list(item_id) over (partition by user_id order by event_ts rows between unbounded preceding and 1 following) as next_item_set
  • Related