I am using Spark SQL to process the following dataset, so it can fit a marketing attribution model:
| user_ID | timestamp | activity | campaign | event_name
------ --------- ------ -------- ------
| akalsds124 | 2021-12-31 10:00 | click | Holidays Campaign | NULL
| akalsds124 | 2022-03-01 16:32 | click | Super Campaign | NULL
| akalsds124 | 2022-04-27 20:55 | event | NULL | purchase
| akalsds124 | 2022-05-10 10:21 | event | NULL | purchase
| akalsds124 | 2022-06-25 09:22 | click | IG 3 Campaign | NULL
| akalsds124 | 2022-07-07 15:00 | event | NULL | purchase
| ijnbmshs33 | 2022-05-02 10:31 | click | New Campaign | NULL
| ijnbmshs33 | 2022-07-04 17:01 | click | Mega Campaign | NULL
A click activity is an ad click made by the user and an event is an interaction inside the app (e.g. a purchase, login, etc).
To create the table above, you can use this code:
df=spark.createDataFrame(
[('akalsds124','2021-12-31 10:00','click','Holidays Campaign','NULL'),
('akalsds124','2022-03-01 16:32','click','Super Campaign','NULL'),
('akalsds124','2022-04-27 20:55','event','NULL','purchase'),
('akalsds124','2022-05-10 10:21','event','NULL', 'purchase'),
('akalsds124','2022-06-25 09:22','click','IG 3 Campaign','NULL'),
('akalsds124','2022-07-07 15:00','event','NULL','purchase'),
('ijnbmshs33','2022-05-02 10:31','click','New Campaign','NULL'),
('ijnbmshs33','2022-07-04 17:01','click','Mega Campaign','NULL')],
['user_id','timestamp','activity','campaign','event_name']
)
I need to create a path with each user's campaign touchpoints inside a list. When a user purchases a product, a new path must be created for his/her next touchpoints.
Also, I need a column named 'converted' with boolean results (1 if the path led to a purchase and 0 if it did not lead to a conversion), and another one (total_conversions) with the total n° of purchases per path.
The expected output should be like this:
| user_ID | path | converted | total_conversions
----- ------ ----- -------
| akalsds124 | [Holidays Campaign,Super Campaign] | 1 | 2
| akalsds124 | [IG Campaign] | 1 | 1
| ijnbmshs33 | [New Campaign,Mega Campaign] | 0 | 0
CodePudding user response:
Starting from the dataset you created, here is what i've done :
data preparation
from pyspark.sql import functions as F, Window as W
df = df.withColumn(
"event_name", F.when(F.col("event_name") == "purchase", 1).otherwise(0)
)
df = df.withColumn(
"rnk", F.lag("event_name").over(W.partitionBy("user_id").orderBy("timestamp"))
)
df = df.withColumn(
"rnk", F.when((F.col("rnk") == 1) & (F.col("event_name") != 1), 1).otherwise(0)
)
df = df.withColumn(
"rnk", F.sum("rnk").over(W.partitionBy("user_id").orderBy("timestamp"))
)
aggregation
df = df.groupBy("user_id", "rnk").agg(
F.collect_set("campaign").alias("path"),
F.max("event_name").alias("converted"),
F.sum("event_name").alias("total_conversions"),
)
Result
df.show()
---------- --- -------------------- --------- -----------------
| user_id|rnk| path|converted|total_conversions|
---------- --- -------------------- --------- -----------------
|akalsds124| 0|[Super Campaign, ...| 1| 2|
|akalsds124| 1|[NULL, IG 3 Campa...| 1| 1|
|ijnbmshs33| 0|[Mega Campaign, N...| 0| 0|
---------- --- -------------------- --------- -----------------