My data is like this:
year_month | user_id | pageviews | visits |
---|---|---|---|
2020-03 | 2 | 8 | 3 |
2021-03 | 27 | 4 | 3 |
2021-05 | 23 | 75 | 7 |
2020-05 | 23 | 17 | 7 |
2020-08 | 339 | 253 | 169 |
2020-08 | 892 | 31 | 4 |
2021-08 | 339 | 4 | 3 |
And I wanted to group by year_month calculating the difference of pageviews and visits from one year(2020) to the next(2021).
So, I was thinking the output should be something similar to (without the content inside the parenthesis):
last_month | diff(pageviews) | diff(visits) |
---|---|---|
2021-03 | -4(4-8) | 0(3-3) |
2021-05 | 58(75-17) | 0(7-7) |
2021-08 | -280(4-284) | -170(3-173) |
But I'm not sure how to do it vectorized, I was thinking of passing it to pandas and doing it with a for loop, but wanted to learn how to do this kind of things in a vectorized way with pyspark or sparksql that I think they will be much faster.
CodePudding user response:
The main idea is using window function to compare months. Check my comments for more explanations
from pyspark.sql import functions as F
from pyspark.sql import Window as W
(df
# since you'd want to compare month and year separately,
# we have to separate them out using split function
.withColumn('year', F.split('year_month', '-')[0].cast('int'))
.withColumn('month', F.split('year_month', '-')[1].cast('int'))
# you have multiple rows per year_month
# so we have to group and sum the similar records
.groupBy('year', 'month')
.agg(
F.sum('pageviews').alias('pageviews'),
F.sum('visits').alias('visits')
)
# now, you need to compare 2021's months with 2020's months,
# you'd have to use lag window function, pay attention to the orderBy window
.withColumn('prev_pageviews', F.lag('pageviews').over(W.orderBy('month', 'year')))
.withColumn('prev_visits', F.lag('visits').over(W.orderBy('month', 'year')))
# with current pageviews/visits and previous pageviews/visits on the same row
# you can easily calculate the difference between months
.withColumn('diff_pageviews', F.col('pageviews') - F.col('prev_pageviews'))
.withColumn('diff_visits', F.col('visits') - F.col('prev_visits'))
# select only necessary colums and rows
.select('year', 'month', 'diff_pageviews', 'diff_visits')
.where(F.col('year') == 2021)
.show()
)
# Output
# ---- ----- -------------- -----------
# |year|month|diff_pageviews|diff_visits|
# ---- ----- -------------- -----------
# |2021| 3| -4| 0|
# |2021| 5| 58| 0|
# |2021| 8| -280| -170|
# ---- ----- -------------- -----------