I have a data set which contains fields such as: userId, event, pageName, and timestamp, while lacking of the sessionId. I want to create a sessionId for each record based on the timestamp and a pre-defined value "finish" (which indicates after how many minutes of inactivity a session ends). Only users with the same UserId can be in the same session.
If the "finish" value is 30 minutes (1800 difference in timestamp), and a sample DataFrame is:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = spark.createDataFrame([
("blue", "view", 1610494094750, 11),
("green", "add to bag", 1510593114350, 21),
("red", "close", 1610493115350, 41),
("blue", "view", 1610494094350, 11),
("blue", "close", 1510593114312, 21),
("red", "view", 1610493114350, 41),
("red", "view", 1610593114350, 41),
("green", "purchase", 1610494094350, 31)
], ["item", "event", "timestamp", "userId"])
----- ---------- ------------- ------
| item| event| timestamp|userId|
----- ---------- ------------- ------
| blue| view|1610494094750| 11|
|green|add to bag|1510593114350| 21|
| red| close|1610493115350| 41|
| blue| view|1610494094350| 11|
| blue| close|1510593114312| 21|
| red| view|1610493114350| 41|
| red| view|1610593114350| 41|
|green| purchase|1610494094350| 31|
----- ---------- ------------- ------
The end result should be something like this:
-------- ---------- ------------- ------ ---------
| item| event| timestamp|userId|sessionId|
-------- ---------- ------------- ------ ---------
| blue| close|1510593114312| 21| session1|
| green|add to bag|1510593114350| 21| session1|
| red| view|1610493114350| 41| session2|
| blue| view|1610494094350| 11| session3|
| red| close|1610493115350| 41| session2|
| green| purchase|1610494094350| 31| session4|
| blue| view|1610494094750| 11| session3|
| red| view|1610593114350| 41| session5|
-------- ---------- ------------- ------ ---------
I'm trying to solve this problem using PySpark. Any advice is welcome.
Edit: I've edited the timestamp
CodePudding user response:
I think this should do the work:
# Get the previous timestamp for each userid
df = df.withColumn(
"session_id",
F.lag("timestamp").over(Window.partitionBy("userid").orderBy("timestamp")),
)
# Define if the session is the 1st one (more than 1800s after the previous one)
df = df.withColumn(
"session_id",
F.when(F.col("timestamp") - F.col("session_id") <= 1800, 0).otherwise(1),
)
# create a unic id per session (same id can exists for different users)
df = df.withColumn(
"session_id",
F.sum("session_id").over(Window.partitionBy("userid").orderBy("timestamp")),
)
# create a unic id per session per user
df = df.withColumn(
"session_id", F.dense_rank().over(Window.orderBy("userid", "session_id"))
)
df.show()
----- ---------- ------------- ------ ----------
| item| event| timestamp|userId|session_id|
----- ---------- ------------- ------ ----------
| blue| view|1610494094350| 11| 1|
| blue| view|1610494094750| 11| 1|
| blue| close|1510593114312| 21| 2|
|green|add to bag|1510593114350| 21| 2|
|green| purchase|1610494094350| 31| 3|
| red| view|1610493114350| 41| 4|
| red| close|1610493115350| 41| 4|
| red| view|1610593114350| 41| 5|
----- ---------- ------------- ------ ----------