The idea is to identify users who bought an item
offline (no
in the online
column) and tried to buy the same item online (yes
in the online
column) in the last 7 days.
This means that for each no
in the online
column I need to search through the yes
values 7 days prior, which also match the user_ID
and item
values of the no
instance and then label matches as 1
and non-matches as 0
.
Sample dataset (notes on the right):
------- ---- ------ --------------
|user_ID|item|online| datetime|
------- ---- ------ --------------
| 1| 66| yes| 02-10-21 9:30|
| 1| 24| yes| 02-10-21 9:30|
| 1| 66| no| 06-10-21 9:30| # <- 1 ('user_ID' and 'item' matched in the last 7 days)
| 2| 24| no| 03-10-21 8:01| # <- 0 ('user_ID' didnt match in the last 7 days)
| 2| 24| yes| 03-10-21 8:02|
| 2| 24| no| 03-10-21 8:03| # <- 1 ('user_ID' and 'item' matched in the last 7 days)
| 3| 31| yes| 05-10-21 8:32|
| 3| 66| yes|10-10-21 10:57|
| 3| 66| no|14-10-21 12:07| # <- 1 ('user_ID' and 'item' matched in the last 7 days)
| 3| 31| no|15-10-21 12:07| # <- 0 ('item' didnt match in the last 7 days)
------- ---- ------ --------------
Resulting dataset:
------- ---- ------ -------------- ------
|user_ID|item|online| datetime|result|
------- ---- ------ -------------- ------
| 1| 66| no| 06-10-21 9:30| 1|
| 2| 24| no| 03-10-21 8:01| 0|
| 2| 24| no| 03-10-21 8:03| 1|
| 3| 66| no|14-10-21 12:07| 1|
| 3| 31| no|15-10-21 12:07| 0|
------- ---- ------ -------------- ------
Can't figure this one out. Is there a way of doing this with a windowed function? I'm working with spark and a large dataset (>10 mln records).
Any pyspark or sql magicians that could help?
CodePudding user response:
You can use rangeBetween
window function to look up last 7 days.
First cast datetime
to long value to be able to used with rangeBetween
.
# If the datetime is string
df = df.withColumn('datetime', F.to_timestamp(df.datetime, 'dd-MM-yy H:mm'))
df = df.withColumn('long_dt', df.datetime.cast(LongType()))
# If the datetime is timestamp type
df = df.withColumn('long_dt', df.datetime.cast(LongType()))
Then create window function to look up past 7 days with for the same user_ID
and item
.
w = (Window.partitionBy(['user_ID', 'item'])
.orderBy('long_dt')
.rangeBetween(-7*24*60*60, -1)) # -7 days to -1 second before.
Use this window function to count the row and if you see for the second time (> 0), you want to return 1. You also want to return only when online == no
, so add the filter at the end.
Edit: Thanks to @blackbishop, I updated the counting logic.
df = (df.withColumn('result', (F.count(F.when(df.online == 'yes', 1)).over(w) > 0).cast(IntegerType()))
.filter(df.online == 'no'))
Result
------- ---- ------ ------------------- ------
|user_id|item|online| datetime|result|
------- ---- ------ ------------------- ------
| 1| 66| no|2021-10-06 09:30:00| 1|
| 3| 31| no|2021-10-15 12:07:00| 0|
| 3| 66| no|2021-10-14 12:07:00| 1|
| 2| 24| no|2021-10-03 08:01:00| 0|
| 2| 24| no|2021-10-03 08:03:00| 1|
------- ---- ------ ------------------- ------
CodePudding user response:
I've included the pandas code to easily build out a sample dataframe. The spark code starts after that.
It's based upon groupby and shifting the online and datetime values. The idea being that the previous value to a no
for any given id/item needs to be within 7 days to count.
import pandas as pd
from pyspark.sql.functions import lag, col, datediff, when
from pyspark.sql.window import Window
df = pd.DataFrame({'user_ID':[1,1,1,2,2,2,3,3,3,3],
'item':[66,24,66,24,24,24,31,66,66,31],
'online':['yes','yes','no','no','yes','no','yes','yes','no','no'],
'datetime':['02-10-21 9:30',
'02-10-21 9:30',
'06-10-21 9:30',
'03-10-21 8:01',
'03-10-21 8:02',
'03-10-21 8:03',
'05-10-21 8:32',
'10-10-21 10:57',
'14-10-21 12:07',
'15-10-21 12:07']})
df['datetime'] = pd.to_datetime(df['datetime'], dayfirst=True)
df = spark.createDataFrame(df)
df = df.withColumn('o_shift', lag(df['online']).over(Window.partitionBy('user_ID','item').orderBy('datetime')))
df = df.withColumn('d_shift', lag(df['datetime']).over(Window.partitionBy('user_ID','item').orderBy('datetime')))
df = df.dropna()
df = df.withColumn('t_delta', datediff(col('datetime'),col('d_shift')))
df = df.withColumn('result',when(
(col('online')=='no') &
(col('o_shift')=='yes') &
(col('t_delta')<=7),1
).otherwise(0))
df.sort('user_id','datetime').select('user_id','item','online','datetime','result').show()
Output
------- ---- ------ ------------------- ------
|user_id|item|online| datetime|result|
------- ---- ------ ------------------- ------
| 1| 66| no|2021-10-06 09:30:00| 1|
| 2| 24| yes|2021-10-03 08:02:00| 0|
| 2| 24| no|2021-10-03 08:03:00| 1|
| 3| 66| no|2021-10-14 12:07:00| 1|
| 3| 31| no|2021-10-15 12:07:00| 0|
------- ---- ------ ------------------- ------
CodePudding user response:
Other answers demonstrated how to use window functions. Here's a another way using left_semi
join, which returns only users who bought items offline after they tried to bought the same items online 7 days earlier:
from pyspark.sql import functions as F
# cast string into timestamp if not already done
df = df.withColumn('datetime', F.to_timestamp("datetime", 'dd-MM-yy H:mm'))
df_on = df.filter("online = 'yes'").alias("on")
df_off = df.filter("online = 'no'").alias("off")
result = df_off.join(
df_on, [
F.col("on.user_ID") == F.col("off.user_ID"),
F.col("on.item") == F.col("off.item"),
F.col("on.datetime") >= F.expr("off.datetime - interval 7 day"),
F.col("on.datetime") < F.col("off.datetime")
],
"left_semi"
)
result.show()
# ------- ---- ------ -------------------
#|user_ID|item|online| datetime|
# ------- ---- ------ -------------------
#| 1| 66| no|2021-10-06 09:30:00|
#| 2| 24| no|2021-10-03 08:03:00|
#| 3| 66| no|2021-10-14 12:07:00|
# ------- ---- ------ -------------------
Note if you want the users who bought item offline but haven't tried to bought it online before (corresponding to result=0
, replace semi join with left_anti
in the above query.