Suppose we have a very large table that we'd like to process statistics for incrementally.
Date | Amount | Customer |
---|---|---|
2022-12-20 | 30 | Mary |
2022-12-21 | 12 | Mary |
2022-12-20 | 12 | Bob |
2022-12-21 | 15 | Bob |
2022-12-22 | 15 | Alice |
We'd like to be able to calculate incrementally how much we made per distinct customer for a date range. So from 12-20 to 12-22 (inclusive), we'd have 3 distinct customers, but 12-20 to 12-21 there are 2 distinct customers.
If we want to run this pipeline once a day and there are many customers, how can we keep a rolling count of distinct customers for an arbitrary date range? Is there a way to do this without storing a huge list of customer names for each day?
We'd like to support a frontend that has a date range filter and can quickly calculate results for that date range. For example:
Start Date | End Date | Average Income Per Customer |
---|---|---|
2022-12-20 | 2022-12-21 | (30 12 12 15)/2 = 34.5 |
2022-12-20 | 2022-12-22 | (30 12 12 15 15)/3 = 28 |
The only approach I can think of is to store a set of customer names for each day, and when viewing the results calculate the size of the joined set of sets to calculate distinct customers. This seems inefficient. In this case we'd store the following table, with the customer column being extremely large.
Date | Total Income | Customers |
---|---|---|
2022-12-20 | 42 | set(Mary, Bob) |
2022-12-21 | 27 | set(Mary, Bob) |
2022-12-22 | 15 | set(Alice) |
CodePudding user response:
You need to use window functions. Here is a solution to a similar problem: https://stackoverflow.com/a/45869254/1888799
CodePudding user response:
You can achieve this by filtering rows with dates between start_date and end_date then grouping by customer_id and calculating the sum of amounts and then getting avg of these amounts. this approach works for only one start_date and end_date and you should run this code with different parameters to solve with different date ranges
start_date = '2022-12-20'
end_date = '2022-12-21'
(
df
.withColumn('isInRange', F.col('date').between(start_date, end_date))
.filter(F.col('isInRange'))
.groupby('customer')
.agg(F.sum('amount').alias('sum'))
.agg(F.avg('sum').alias('avg income'))
).show()