Home > other >  Pyspark label rows based on time window and double condition match
Pyspark label rows based on time window and double condition match

Time:02-01

The idea is to identify users who bought an item offline (no in the online column) and tried to buy the same item online (yes in the online column) in the last 7 days.

This means that for each no in the online column I need to search through the yes values 7 days prior, which also match the user_ID and item values of the no instance and then label matches as 1 and non-matches as 0.

Sample dataset (notes on the right):

 ------- ---- ------ -------------- 
|user_ID|item|online|      datetime|
 ------- ---- ------ -------------- 
|      1|  66|   yes| 02-10-21 9:30|
|      1|  24|   yes| 02-10-21 9:30|
|      1|  66|    no| 06-10-21 9:30| # <- 1 ('user_ID' and 'item' matched in the last 7 days)
|      2|  24|    no| 03-10-21 8:01| # <- 0 ('user_ID' didnt match in the last 7 days)
|      2|  24|   yes| 03-10-21 8:02|
|      2|  24|    no| 03-10-21 8:03| # <- 1 ('user_ID' and 'item' matched in the last 7 days)
|      3|  31|   yes| 05-10-21 8:32|
|      3|  66|   yes|10-10-21 10:57|
|      3|  66|    no|14-10-21 12:07| # <- 1 ('user_ID' and 'item' matched in the last 7 days)
|      3|  31|    no|15-10-21 12:07| # <- 0 ('item' didnt match in the last 7 days)
 ------- ---- ------ -------------- 

Resulting dataset:

 ------- ---- ------ -------------- ------ 
|user_ID|item|online|      datetime|result|
 ------- ---- ------ -------------- ------ 
|      1|  66|    no| 06-10-21 9:30|     1|
|      2|  24|    no| 03-10-21 8:01|     0|
|      2|  24|    no| 03-10-21 8:03|     1|
|      3|  66|    no|14-10-21 12:07|     1|
|      3|  31|    no|15-10-21 12:07|     0|
 ------- ---- ------ -------------- ------ 

Can't figure this one out. Is there a way of doing this with a windowed function? I'm working with spark and a large dataset (>10 mln records).

Any pyspark or sql magicians that could help?

CodePudding user response:

You can use rangeBetween window function to look up last 7 days.

First cast datetime to long value to be able to used with rangeBetween.

# If the datetime is string
df = df.withColumn('datetime', F.to_timestamp(df.datetime, 'dd-MM-yy H:mm'))
df = df.withColumn('long_dt', df.datetime.cast(LongType()))

# If the datetime is timestamp type
df = df.withColumn('long_dt', df.datetime.cast(LongType()))

Then create window function to look up past 7 days with for the same user_ID and item.

w = (Window.partitionBy(['user_ID', 'item'])
    .orderBy('long_dt')
    .rangeBetween(-7*24*60*60, -1)) # -7 days to -1 second before.

Use this window function to count the row and if you see for the second time (> 0), you want to return 1. You also want to return only when online == no, so add the filter at the end.

Edit: Thanks to @blackbishop, I updated the counting logic.

df = (df.withColumn('result', (F.count(F.when(df.online == 'yes', 1)).over(w) > 0).cast(IntegerType()))
     .filter(df.online == 'no'))

Result

 ------- ---- ------ ------------------- ------ 
|user_id|item|online|           datetime|result|
 ------- ---- ------ ------------------- ------ 
|      1|  66|    no|2021-10-06 09:30:00|     1|
|      3|  31|    no|2021-10-15 12:07:00|     0|
|      3|  66|    no|2021-10-14 12:07:00|     1|
|      2|  24|    no|2021-10-03 08:01:00|     0|
|      2|  24|    no|2021-10-03 08:03:00|     1|
 ------- ---- ------ ------------------- ------ 

CodePudding user response:

I've included the pandas code to easily build out a sample dataframe. The spark code starts after that.

It's based upon groupby and shifting the online and datetime values. The idea being that the previous value to a no for any given id/item needs to be within 7 days to count.

import pandas as pd

from pyspark.sql.functions import lag, col, datediff, when
from pyspark.sql.window import Window

df = pd.DataFrame({'user_ID':[1,1,1,2,2,2,3,3,3,3],
                  'item':[66,24,66,24,24,24,31,66,66,31],
                   'online':['yes','yes','no','no','yes','no','yes','yes','no','no'],
                  'datetime':['02-10-21 9:30',
'02-10-21 9:30',
'06-10-21 9:30',
'03-10-21 8:01',
'03-10-21 8:02',
'03-10-21 8:03',
'05-10-21 8:32',
'10-10-21 10:57',
'14-10-21 12:07',
'15-10-21 12:07']})


df['datetime']  = pd.to_datetime(df['datetime'], dayfirst=True)


df = spark.createDataFrame(df)


df = df.withColumn('o_shift', lag(df['online']).over(Window.partitionBy('user_ID','item').orderBy('datetime')))
df = df.withColumn('d_shift', lag(df['datetime']).over(Window.partitionBy('user_ID','item').orderBy('datetime')))
df = df.dropna()

df = df.withColumn('t_delta', datediff(col('datetime'),col('d_shift')))


df = df.withColumn('result',when(
  (col('online')=='no') &
  (col('o_shift')=='yes') & 
  (col('t_delta')<=7),1
).otherwise(0))

df.sort('user_id','datetime').select('user_id','item','online','datetime','result').show()

Output

 ------- ---- ------ ------------------- ------ 
|user_id|item|online|           datetime|result|
 ------- ---- ------ ------------------- ------ 
|      1|  66|    no|2021-10-06 09:30:00|     1|
|      2|  24|   yes|2021-10-03 08:02:00|     0|
|      2|  24|    no|2021-10-03 08:03:00|     1|
|      3|  66|    no|2021-10-14 12:07:00|     1|
|      3|  31|    no|2021-10-15 12:07:00|     0|
 ------- ---- ------ ------------------- ------ 

CodePudding user response:

Other answers demonstrated how to use window functions. Here's a another way using left_semi join, which returns only users who bought items offline after they tried to bought the same items online 7 days earlier:

from pyspark.sql import functions as F

# cast string into timestamp if not already done
df = df.withColumn('datetime', F.to_timestamp("datetime", 'dd-MM-yy H:mm'))

df_on = df.filter("online = 'yes'").alias("on")
df_off = df.filter("online = 'no'").alias("off")

result = df_off.join(
    df_on, [
        F.col("on.user_ID") == F.col("off.user_ID"),
        F.col("on.item") == F.col("off.item"),
        F.col("on.datetime") >= F.expr("off.datetime - interval 7 day"),
        F.col("on.datetime") < F.col("off.datetime")
    ],
    "left_semi"
)

result.show()
# ------- ---- ------ ------------------- 
#|user_ID|item|online|           datetime|
# ------- ---- ------ ------------------- 
#|      1|  66|    no|2021-10-06 09:30:00|
#|      2|  24|    no|2021-10-03 08:03:00|
#|      3|  66|    no|2021-10-14 12:07:00|
# ------- ---- ------ ------------------- 

Note if you want the users who bought item offline but haven't tried to bought it online before (corresponding to result=0, replace semi join with left_anti in the above query.

  •  Tags:  
  • Related