I have the following dataset
event_id | user_id | event | event_type | event_ts | item_id | next_event_type | next_item_id |
---|---|---|---|---|---|---|---|
246984 | 993922 | {"item_id":1000,"user_id":993922,"timestamp":5260,"type":"ITEM_PURCHASED"} | ITEM_PURCHASED | 5260 | 1000 | ITEM_PURCHASED | 1001 |
246984 | 993922 | {"item_id":1001,"user_id":993922,"timestamp":5260,"type":"ITEM_PURCHASED"} | ITEM_PURCHASED | 5855 | 1001 | ITEM_PURCHASED | 1002 |
I want to cumulatively append the next item_id
to the array. I know I can do this in a udf, but the dataset is quite massive and want to avoid a performance hit.
event_id | user_id | event | event_type | event_ts | item_id | next_event_type | next_item_id | next_item_set |
---|---|---|---|---|---|---|---|---|
246984 | 993922 | {"item_id":1000,"user_id":993922,"timestamp":5260,"type":"ITEM_PURCHASED"} | ITEM_PURCHASED | 5260 | 1000 | ITEM_PURCHASED | 1001 | [1000, 1001] |
246984 | 993922 | {"item_id":1001,"user_id":993922,"timestamp":5260,"type":"ITEM_PURCHASED"} | ITEM_PURCHASED | 5855 | 1001 | ITEM_PURCHASED | 1002 | [1000, 1001, 1002] |
This is the query I have so far
with a as (
select event_id
, user_id
, event
, event_type
, event_ts
, item_id
, lead(event_type) over (partition by user_id order by event_ts) as next_event_type
, lead(item_id) over (partition by user_id order by event_ts) as next_item_id
from tableA
)
select *
, concat(lag(next_item_set) over (order by event_ts), array(next_item_id)) as cumulative_item_set
from a
;
CodePudding user response:
You could use collect_list
or collect_set
and specify the window's frame from unbounded preceding
to 1 following
. Try adding this to your select clause (SparkSQL):
collect_list(item_id) over (partition by user_id order by event_ts rows between unbounded preceding and 1 following) as next_item_set