I have a pyspark dataset with a sample fragment below.How can I fill the recordcounts_in_last_30_days
with a count of number of records in last 30 days for each user per row like this:
date | userid | comment | recordcounts_in_last_30_days |
---|---|---|---|
2022-01-15 09:00 | 1 | examplecomment1 | 0 |
2022-01-16 09:00 | 2 | examplecomment2 | 0 |
2022-01-25 09:00 | 1 | examplecomment3 | 1 |
2022-01-28 09:00 | 2 | examplecomment3 | 1 |
2022-02-26 09:00 | 2 | examplecomment3 | 1 |
2022-03-25 09:00 | 1 | examplecomment4 | 0 |
Please write in the comments if you don't understand the problem
CodePudding user response:
Use count
over a window bounded with rangeBetween
like this:
from pyspark.sql import functions as F, Window
w = (Window.partitionBy("userid").orderBy(F.col("date").cast("long"))
.rangeBetween(-30 * 86400, -1) # 86400 = number of second in a day
)
result = (df.withColumn("date", F.to_timestamp("date", "yyyy-MM-dd HH:mm"))
.withColumn("recordcounts_in_last_30_days", F.count("*").over(w))
)
result.show()
# ------------------- ------ --------------- ----------------------------
#| date|userid| comment|recordcounts_in_last_30_days|
# ------------------- ------ --------------- ----------------------------
#|2022-01-15 09:00:00| 1|examplecomment1| 0|
#|2022-01-25 09:00:00| 1|examplecomment3| 1|
#|2022-03-25 09:00:00| 1|examplecomment4| 0|
#|2022-01-16 09:00:00| 2|examplecomment2| 0|
#|2022-01-28 09:00:00| 2|examplecomment3| 1|
#|2022-02-26 09:00:00| 2|examplecomment3| 1|
# ------------------- ------ --------------- ----------------------------