Home > Enterprise >  PySpark: Construct an array based on the order of customer actions
PySpark: Construct an array based on the order of customer actions

Time:08-18

I created the following dataset of customers' cart actions. This dataset tells me when a customer adds an item, removes an item, and undo actions. For example, a customer can ITEM_ADD, ITEM_ADD, ITEM_ADD, ITEM_UNDO, ITEM_UNDO, ITEM_UNDO ending up with an empty cart.

event_id customer_id event event_type event_ts item_id next_event_type
246984 993922 {"item_id":1000,"customer_id":993922,"timestamp":5260,"type":"ITEM_ADDED"} ITEM_ADDED 5260 1000 ITEM_ADDED
246984 993922 {"item_id":1001,"customer_id":993922,"timestamp":5260,"type":"ITEM_ADDED"} ITEM_ADDED 5355 1001 ITEM_ADDED
246984 993922 {"item_id":1002,"customer_id":993922,"timestamp":5260,"type":"ITEM_ADDED"} ITEM_ADDED 5460 1002 ITEM_ADDED
246984 993922 {"after_id":0,"before_id":1002,"customer_id":993922,"timestamp":5500,"type":"ITEM_UNDO"} ITEM_UNDO 5500 NULL ITEM_UNDO
246984 993922 {"after_id":0,"before_id":1001,"customer_id":993922,"timestamp":5510,"type":"ITEM_UNDO"} ITEM_UNDO 5510 NULL ITEM_UNDO
246984 993922 {"after_id":0,"before_id":1000,"customer_id":993922,"timestamp":5515,"type":"ITEM_UNDO"} ITEM_UNDO 5515 NULL ITEM_ADDED
246984 993922 {"item_id":2000,"customer_id":993922,"timestamp":6000,"type":"ITEM_ADDED"} ITEM_ADDED 6000 2000 ITEM_ADDED
246984 993922 {"item_id":2000,"customer_id":993922,"timestamp":6010,"type":"ITEM_REMOVED"} ITEM_ADDED 6010 2000 ITEM_REMOVED
246984 993922 {"item_id":7777,"customer_id":993922,"timestamp":9999,"type":"ITEM_ADDED"} ITEM_ADDED 6700 7777 ITEM_ADDED
246984 993922 {"item_id":9999,"customer_id":993922,"timestamp":9999,"type":"ITEM_ADDED"} ITEM_ADDED 9999 9999 NULL

The result of my query should give me [7777, 9999]

Here is the spark code I used to create the dataset. I know I can do F.collect_list(F.col("item_id")).over(w.rowsBetween(Window.unboundedPreceding, end=0)) but this will not take in consideration remove and undo.

w = Window().partitionBy("customer_id").orderBy(F.asc("event_timestamp"))
cumulative_customer_items_purchased_df = (
    spark
    .table('testing')
    .where(
        (F.col("event_type").isin(item_events))
        &
        (F.col("customer_id") == 993922)
    )
    .select(*columns)
    .withColumn(
        "prev_event_type",
        F.lag(F.col("event_type")).over(w)
    )
    .withColumn(
        "next_event_type",
        F.lead(F.col("event_type")).over(w)
    )
)

CodePudding user response:

I assume that event-column in your input DataFrame is of struct type and looks like this:

from pyspark.sql import functions as F

df = spark.createDataFrame(
    [
        (246984,  993922,  "{'item_id':1000,'customer_id':993922,'timestamp':5260,'type':'ITEM_ADDED'}", 5260),
        (246984,  993922,  "{'item_id':1001,'customer_id':993922,'timestamp':5260,'type':'ITEM_ADDED'}", 5355),
        (246984,  993922,  "{'item_id':1002,'customer_id':993922,'timestamp':5260,'type':'ITEM_ADDED'}", 5460),
        (246984,  993922,  "{'after_id':0,'before_id':1002,'customer_id':993922,'timestamp':5500,'type':'ITEM_UNDO'}", 5500),
        (246984,  993922,  "{'after_id':0,'before_id':1001,'customer_id':993922,'timestamp':5510,'type':'ITEM_UNDO'}", 5510),
        (246984,  993922,  "{'after_id':0,'before_id':1000,'customer_id':993922,'timestamp':5515,'type':'ITEM_UNDO'}", 5515),
        (246984,  993922,  "{'item_id':2000,'customer_id':993922,'timestamp':6000,'type':'ITEM_ADDED'}", 6000),
        (246984,  993922,  "{'item_id':2000,'customer_id':993922,'timestamp':6010,'type':'ITEM_REMOVED'}", 6010),
        (246984,  993922,  "{'after_id':2000,'before_id':0,'customer_id':993922,'timestamp':6010,'type':'ITEM_UNDO'}", 6010),
        (246984,  993922,  "{'item_id':7777,'customer_id':993922,'timestamp':9999,'type':'ITEM_ADDED'}", 6700),
        (246984,  993922,  "{'item_id':9999,'customer_id':993922,'timestamp':9999,'type':'ITEM_ADDED'}", 9999),
    ],
    ["event_id", "customer_id", "event", "event_timestamp"]
).select("*", F.json_tuple("event", "item_id", "after_id", "before_id", "type").alias("item_id", "after_id", "before_id", "event_type"))
 -------- ----------- -------------------- --------------- ------- -------- --------- ------------ 
|event_id|customer_id|               event|event_timestamp|item_id|after_id|before_id|  event_type|
 -------- ----------- -------------------- --------------- ------- -------- --------- ------------ 
|  246984|     993922|{'item_id':1000,'...|           5260|   1000|    null|     null|  ITEM_ADDED|
|  246984|     993922|{'item_id':1001,'...|           5355|   1001|    null|     null|  ITEM_ADDED|
|  246984|     993922|{'item_id':1002,'...|           5460|   1002|    null|     null|  ITEM_ADDED|
|  246984|     993922|{'after_id':0,'be...|           5500|   null|       0|     1002|   ITEM_UNDO|
|  246984|     993922|{'after_id':0,'be...|           5510|   null|       0|     1001|   ITEM_UNDO|
|  246984|     993922|{'after_id':0,'be...|           5515|   null|       0|     1000|   ITEM_UNDO|
|  246984|     993922|{'item_id':2000,'...|           6000|   2000|    null|     null|  ITEM_ADDED|
|  246984|     993922|{'item_id':2000,'...|           6010|   2000|    null|     null|ITEM_REMOVED|
|  246984|     993922|{'after_id':2000,...|           6010|   null|    2000|        0|   ITEM_UNDO|
|  246984|     993922|{'item_id':7777,'...|           6700|   7777|    null|     null|  ITEM_ADDED|
|  246984|     993922|{'item_id':9999,'...|           9999|   9999|    null|     null|  ITEM_ADDED|
 -------- ----------- -------------------- --------------- ------- -------- --------- ------------ 

Note that I added one more ITEM_UNDO event in your sample dataset, to undo the ITEM_REMOVED event.

I also assume that either after_id or before_id is set in the ITEM_UNDO events, and that after_id is for undoing the ITEM_REMOVED events, and before_id - ITEM_ADDED events.

My approach is to add column qnt that maps each item's event type to 1 (for added/undo_removed events) or -1 (removed/undo_added):

df2 = (
  df.withColumn(
    "item", 
    F.expr("""
      case 
        when event_type='ITEM_UNDO' and before_id!=0 then (-1 as qnt, before_id as item_id) -- undo add
        when event_type='ITEM_UNDO' and after_id!=0 then (1 as qnt, after_id as item_id) --undo remove
        when event_type='ITEM_ADDED' then (1 as qnt, item_id)
        when event_type='ITEM_REMOVED' then (-1 as qnt, item_id)
      end
    """))
  .select("event_id", "customer_id", "event_timestamp", "item.qnt", "item.item_id")
)
 -------- ----------- --------------- --- ------- 
|event_id|customer_id|event_timestamp|qnt|item_id|
 -------- ----------- --------------- --- ------- 
|  246984|     993922|           5260|  1|   1000|
|  246984|     993922|           5355|  1|   1001|
|  246984|     993922|           5460|  1|   1002|
|  246984|     993922|           5500| -1|   1002|
|  246984|     993922|           5510| -1|   1001|
|  246984|     993922|           5515| -1|   1000|
|  246984|     993922|           6000|  1|   2000|
|  246984|     993922|           6010| -1|   2000|
|  246984|     993922|           6010|  1|   2000|
|  246984|     993922|           6700|  1|   7777|
|  246984|     993922|           9999|  1|   9999|
 -------- ----------- --------------- --- ------- 

After we get numeric change for each item in qnt, we can sum up the results for each item and filter out those with sum(qnt)==0:

(
  df2
  .groupBy("customer_id", "item_id")
  .agg(F.sum("qnt").alias("qnt"), F.min("event_timestamp").alias("event_ts"))
  .where("qnt>0")
  .groupBy("customer_id")
  .agg(F.sort_array(F.collect_list(F.struct("event_ts", "item_id"))).alias("items"))
  .withColumn("items", F.col("items.item_id"))
).show()
 ----------- ------------------ 
|customer_id|             items|
 ----------- ------------------ 
|     993922|[2000, 7777, 9999]|
 ----------- ------------------ 
  • Related