Home > Enterprise >  How to create a “sessionId” column using timestamps and userid in PySpark?
How to create a “sessionId” column using timestamps and userid in PySpark?

Time:09-17

I have a data set which contains fields such as: userId, event, pageName, and timestamp, while lacking of the sessionId. I want to create a sessionId for each record based on the timestamp and a pre-defined value "finish" (which indicates after how many minutes of inactivity a session ends). Only users with the same UserId can be in the same session.

If the "finish" value is 30 minutes (1800 difference in timestamp), and a sample DataFrame is:

from pyspark.sql import functions as F 
from pyspark.sql.window import Window

df = spark.createDataFrame([
  ("blue", "view", 1610494094750, 11),
  ("green", "add to bag", 1510593114350, 21),
  ("red", "close", 1610493115350, 41),
  ("blue", "view", 1610494094350, 11),
  ("blue", "close", 1510593114312, 21),
  ("red", "view", 1610493114350, 41),
  ("red", "view", 1610593114350, 41),
  ("green", "purchase", 1610494094350, 31)
], ["item", "event", "timestamp", "userId"])


 ----- ---------- ------------- ------ 
| item|     event|    timestamp|userId|
 ----- ---------- ------------- ------ 
| blue|      view|1610494094750|    11|
|green|add to bag|1510593114350|    21|
|  red|     close|1610493115350|    41|
| blue|      view|1610494094350|    11|
| blue|     close|1510593114312|    21|
|  red|      view|1610493114350|    41|
|  red|      view|1610593114350|    41|
|green|  purchase|1610494094350|    31|
 ----- ---------- ------------- ------ 

The end result should be something like this:

 -------- ---------- ------------- ------ --------- 
|    item|     event|    timestamp|userId|sessionId|
 -------- ---------- ------------- ------ --------- 
|    blue|     close|1510593114312|    21| session1|
|   green|add to bag|1510593114350|    21| session1|
|     red|      view|1610493114350|    41| session2|
|    blue|      view|1610494094350|    11| session3|
|     red|     close|1610493115350|    41| session2|
|   green|  purchase|1610494094350|    31| session4|
|    blue|      view|1610494094750|    11| session3|
|     red|      view|1610593114350|    41| session5|
 -------- ---------- ------------- ------ --------- 

I'm trying to solve this problem using PySpark. Any advice is welcome.

Edit: I've edited the timestamp

CodePudding user response:

I think this should do the work:

# Get the previous timestamp for each userid
df = df.withColumn(
    "session_id",
    F.lag("timestamp").over(Window.partitionBy("userid").orderBy("timestamp")),
)

# Define if the session is the 1st one (more than 1800s after the previous one)
df = df.withColumn(
    "session_id",
    F.when(F.col("timestamp") - F.col("session_id") <= 1800, 0).otherwise(1),
)

# create a unic id per session (same id can exists for different users)
df = df.withColumn(
    "session_id",
    F.sum("session_id").over(Window.partitionBy("userid").orderBy("timestamp")),
)

# create a unic id per session per user
df = df.withColumn(
    "session_id", F.dense_rank().over(Window.orderBy("userid", "session_id"))
)
df.show()
 ----- ---------- ------------- ------ ----------                               
| item|     event|    timestamp|userId|session_id|
 ----- ---------- ------------- ------ ---------- 
| blue|      view|1610494094350|    11|         1|
| blue|      view|1610494094750|    11|         1|
| blue|     close|1510593114312|    21|         2|
|green|add to bag|1510593114350|    21|         2|
|green|  purchase|1610494094350|    31|         3|
|  red|      view|1610493114350|    41|         4|
|  red|     close|1610493115350|    41|         4|
|  red|      view|1610593114350|    41|         5|
 ----- ---------- ------------- ------ ---------- 
  • Related