Home > Software design >  Spark SQL - How to create marketing campaign paths for each user until purchase
Spark SQL - How to create marketing campaign paths for each user until purchase

Time:08-24

I am using Spark SQL to process the following dataset, so it can fit a marketing attribution model:

|   user_ID  |    timestamp     | activity |    campaign       | event_name
    ------        ---------        ------       --------           ------
| akalsds124 | 2021-12-31 10:00 | click    | Holidays Campaign | NULL
| akalsds124 | 2022-03-01 16:32 | click    | Super Campaign    | NULL 
| akalsds124 | 2022-04-27 20:55 | event    | NULL              | purchase
| akalsds124 | 2022-05-10 10:21 | event    | NULL              | purchase  
| akalsds124 | 2022-06-25 09:22 | click    | IG 3 Campaign     | NULL
| akalsds124 | 2022-07-07 15:00 | event    | NULL              | purchase
| ijnbmshs33 | 2022-05-02 10:31 | click    | New Campaign      | NULL
| ijnbmshs33 | 2022-07-04 17:01 | click    | Mega Campaign     | NULL

A click activity is an ad click made by the user and an event is an interaction inside the app (e.g. a purchase, login, etc).

To create the table above, you can use this code:

df=spark.createDataFrame(
[('akalsds124','2021-12-31 10:00','click','Holidays Campaign','NULL'),
 ('akalsds124','2022-03-01 16:32','click','Super Campaign','NULL'),
 ('akalsds124','2022-04-27 20:55','event','NULL','purchase'),
 ('akalsds124','2022-05-10 10:21','event','NULL', 'purchase'),
 ('akalsds124','2022-06-25 09:22','click','IG 3 Campaign','NULL'),
 ('akalsds124','2022-07-07 15:00','event','NULL','purchase'),
 ('ijnbmshs33','2022-05-02 10:31','click','New Campaign','NULL'),
 ('ijnbmshs33','2022-07-04 17:01','click','Mega Campaign','NULL')],
 ['user_id','timestamp','activity','campaign','event_name']
)

I need to create a path with each user's campaign touchpoints inside a list. When a user purchases a product, a new path must be created for his/her next touchpoints.

Also, I need a column named 'converted' with boolean results (1 if the path led to a purchase and 0 if it did not lead to a conversion), and another one (total_conversions) with the total n° of purchases per path.

The expected output should be like this:

|  user_ID      |              path                   | converted | total_conversions 
    -----                     ------                      -----          ------- 
| akalsds124    | [Holidays Campaign,Super Campaign]  |    1      |        2
| akalsds124    | [IG Campaign]                       |    1      |        1
| ijnbmshs33    | [New Campaign,Mega Campaign]        |    0      |        0     

CodePudding user response:

Starting from the dataset you created, here is what i've done :

data preparation

from pyspark.sql import functions as F, Window as W

df = df.withColumn(
    "event_name", F.when(F.col("event_name") == "purchase", 1).otherwise(0)
)


df = df.withColumn(
    "rnk", F.lag("event_name").over(W.partitionBy("user_id").orderBy("timestamp"))
)


df = df.withColumn(
    "rnk", F.when((F.col("rnk") == 1) & (F.col("event_name") != 1), 1).otherwise(0)
)


df = df.withColumn(
    "rnk", F.sum("rnk").over(W.partitionBy("user_id").orderBy("timestamp"))
)

aggregation

df = df.groupBy("user_id", "rnk").agg(
    F.collect_set("campaign").alias("path"),
    F.max("event_name").alias("converted"),
    F.sum("event_name").alias("total_conversions"),
)

Result

df.show()

 ---------- --- -------------------- --------- ----------------- 
|   user_id|rnk|                path|converted|total_conversions|
 ---------- --- -------------------- --------- ----------------- 
|akalsds124|  0|[Super Campaign, ...|        1|                2|
|akalsds124|  1|[NULL, IG 3 Campa...|        1|                1|
|ijnbmshs33|  0|[Mega Campaign, N...|        0|                0|
 ---------- --- -------------------- --------- ----------------- 
  • Related