I created the following dataset of customers' cart actions. This dataset tells me when a customer adds an item, removes an item, and undo actions. For example, a customer can ITEM_ADD, ITEM_ADD, ITEM_ADD, ITEM_UNDO, ITEM_UNDO, ITEM_UNDO ending up with an empty cart.
event_id | customer_id | event | event_type | event_ts | item_id | next_event_type | |
---|---|---|---|---|---|---|---|
246984 | 993922 | {"item_id":1000,"customer_id":993922,"timestamp":5260,"type":"ITEM_ADDED"} | ITEM_ADDED | 5260 | 1000 | ITEM_ADDED | |
246984 | 993922 | {"item_id":1001,"customer_id":993922,"timestamp":5260,"type":"ITEM_ADDED"} | ITEM_ADDED | 5355 | 1001 | ITEM_ADDED | |
246984 | 993922 | {"item_id":1002,"customer_id":993922,"timestamp":5260,"type":"ITEM_ADDED"} | ITEM_ADDED | 5460 | 1002 | ITEM_ADDED | |
246984 | 993922 | {"after_id":0,"before_id":1002,"customer_id":993922,"timestamp":5500,"type":"ITEM_UNDO"} | ITEM_UNDO | 5500 | NULL | ITEM_UNDO | |
246984 | 993922 | {"after_id":0,"before_id":1001,"customer_id":993922,"timestamp":5510,"type":"ITEM_UNDO"} | ITEM_UNDO | 5510 | NULL | ITEM_UNDO | |
246984 | 993922 | {"after_id":0,"before_id":1000,"customer_id":993922,"timestamp":5515,"type":"ITEM_UNDO"} | ITEM_UNDO | 5515 | NULL | ITEM_ADDED | |
246984 | 993922 | {"item_id":2000,"customer_id":993922,"timestamp":6000,"type":"ITEM_ADDED"} | ITEM_ADDED | 6000 | 2000 | ITEM_ADDED | |
246984 | 993922 | {"item_id":2000,"customer_id":993922,"timestamp":6010,"type":"ITEM_REMOVED"} | ITEM_ADDED | 6010 | 2000 | ITEM_REMOVED | |
246984 | 993922 | {"item_id":7777,"customer_id":993922,"timestamp":9999,"type":"ITEM_ADDED"} | ITEM_ADDED | 6700 | 7777 | ITEM_ADDED | |
246984 | 993922 | {"item_id":9999,"customer_id":993922,"timestamp":9999,"type":"ITEM_ADDED"} | ITEM_ADDED | 9999 | 9999 | NULL |
The result of my query should give me [7777, 9999]
Here is the spark code I used to create the dataset. I know I can do F.collect_list(F.col("item_id")).over(w.rowsBetween(Window.unboundedPreceding, end=0))
but this will not take in consideration remove and undo.
w = Window().partitionBy("customer_id").orderBy(F.asc("event_timestamp"))
cumulative_customer_items_purchased_df = (
spark
.table('testing')
.where(
(F.col("event_type").isin(item_events))
&
(F.col("customer_id") == 993922)
)
.select(*columns)
.withColumn(
"prev_event_type",
F.lag(F.col("event_type")).over(w)
)
.withColumn(
"next_event_type",
F.lead(F.col("event_type")).over(w)
)
)
CodePudding user response:
I assume that event
-column in your input DataFrame is of struct
type and looks like this:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[
(246984, 993922, "{'item_id':1000,'customer_id':993922,'timestamp':5260,'type':'ITEM_ADDED'}", 5260),
(246984, 993922, "{'item_id':1001,'customer_id':993922,'timestamp':5260,'type':'ITEM_ADDED'}", 5355),
(246984, 993922, "{'item_id':1002,'customer_id':993922,'timestamp':5260,'type':'ITEM_ADDED'}", 5460),
(246984, 993922, "{'after_id':0,'before_id':1002,'customer_id':993922,'timestamp':5500,'type':'ITEM_UNDO'}", 5500),
(246984, 993922, "{'after_id':0,'before_id':1001,'customer_id':993922,'timestamp':5510,'type':'ITEM_UNDO'}", 5510),
(246984, 993922, "{'after_id':0,'before_id':1000,'customer_id':993922,'timestamp':5515,'type':'ITEM_UNDO'}", 5515),
(246984, 993922, "{'item_id':2000,'customer_id':993922,'timestamp':6000,'type':'ITEM_ADDED'}", 6000),
(246984, 993922, "{'item_id':2000,'customer_id':993922,'timestamp':6010,'type':'ITEM_REMOVED'}", 6010),
(246984, 993922, "{'after_id':2000,'before_id':0,'customer_id':993922,'timestamp':6010,'type':'ITEM_UNDO'}", 6010),
(246984, 993922, "{'item_id':7777,'customer_id':993922,'timestamp':9999,'type':'ITEM_ADDED'}", 6700),
(246984, 993922, "{'item_id':9999,'customer_id':993922,'timestamp':9999,'type':'ITEM_ADDED'}", 9999),
],
["event_id", "customer_id", "event", "event_timestamp"]
).select("*", F.json_tuple("event", "item_id", "after_id", "before_id", "type").alias("item_id", "after_id", "before_id", "event_type"))
-------- ----------- -------------------- --------------- ------- -------- --------- ------------
|event_id|customer_id| event|event_timestamp|item_id|after_id|before_id| event_type|
-------- ----------- -------------------- --------------- ------- -------- --------- ------------
| 246984| 993922|{'item_id':1000,'...| 5260| 1000| null| null| ITEM_ADDED|
| 246984| 993922|{'item_id':1001,'...| 5355| 1001| null| null| ITEM_ADDED|
| 246984| 993922|{'item_id':1002,'...| 5460| 1002| null| null| ITEM_ADDED|
| 246984| 993922|{'after_id':0,'be...| 5500| null| 0| 1002| ITEM_UNDO|
| 246984| 993922|{'after_id':0,'be...| 5510| null| 0| 1001| ITEM_UNDO|
| 246984| 993922|{'after_id':0,'be...| 5515| null| 0| 1000| ITEM_UNDO|
| 246984| 993922|{'item_id':2000,'...| 6000| 2000| null| null| ITEM_ADDED|
| 246984| 993922|{'item_id':2000,'...| 6010| 2000| null| null|ITEM_REMOVED|
| 246984| 993922|{'after_id':2000,...| 6010| null| 2000| 0| ITEM_UNDO|
| 246984| 993922|{'item_id':7777,'...| 6700| 7777| null| null| ITEM_ADDED|
| 246984| 993922|{'item_id':9999,'...| 9999| 9999| null| null| ITEM_ADDED|
-------- ----------- -------------------- --------------- ------- -------- --------- ------------
Note that I added one more ITEM_UNDO
event in your sample dataset, to undo the ITEM_REMOVED
event.
I also assume that either after_id
or before_id
is set in the ITEM_UNDO
events, and that after_id
is for undoing the ITEM_REMOVED
events, and before_id
- ITEM_ADDED
events.
My approach is to add column qnt
that maps each item's event type to 1 (for added/undo_removed events) or -1 (removed/undo_added):
df2 = (
df.withColumn(
"item",
F.expr("""
case
when event_type='ITEM_UNDO' and before_id!=0 then (-1 as qnt, before_id as item_id) -- undo add
when event_type='ITEM_UNDO' and after_id!=0 then (1 as qnt, after_id as item_id) --undo remove
when event_type='ITEM_ADDED' then (1 as qnt, item_id)
when event_type='ITEM_REMOVED' then (-1 as qnt, item_id)
end
"""))
.select("event_id", "customer_id", "event_timestamp", "item.qnt", "item.item_id")
)
-------- ----------- --------------- --- -------
|event_id|customer_id|event_timestamp|qnt|item_id|
-------- ----------- --------------- --- -------
| 246984| 993922| 5260| 1| 1000|
| 246984| 993922| 5355| 1| 1001|
| 246984| 993922| 5460| 1| 1002|
| 246984| 993922| 5500| -1| 1002|
| 246984| 993922| 5510| -1| 1001|
| 246984| 993922| 5515| -1| 1000|
| 246984| 993922| 6000| 1| 2000|
| 246984| 993922| 6010| -1| 2000|
| 246984| 993922| 6010| 1| 2000|
| 246984| 993922| 6700| 1| 7777|
| 246984| 993922| 9999| 1| 9999|
-------- ----------- --------------- --- -------
After we get numeric change for each item in qnt
, we can sum up the results for each item and filter out those with sum(qnt)==0
:
(
df2
.groupBy("customer_id", "item_id")
.agg(F.sum("qnt").alias("qnt"), F.min("event_timestamp").alias("event_ts"))
.where("qnt>0")
.groupBy("customer_id")
.agg(F.sort_array(F.collect_list(F.struct("event_ts", "item_id"))).alias("items"))
.withColumn("items", F.col("items.item_id"))
).show()
----------- ------------------
|customer_id| items|
----------- ------------------
| 993922|[2000, 7777, 9999]|
----------- ------------------