Home > Net >  How to track number of distinct values incrementally from a spark table?
How to track number of distinct values incrementally from a spark table?

Time:12-25

Suppose we have a very large table that we'd like to process statistics for incrementally.

Date Amount Customer
2022-12-20 30 Mary
2022-12-21 12 Mary
2022-12-20 12 Bob
2022-12-21 15 Bob
2022-12-22 15 Alice

We'd like to be able to calculate incrementally how much we made per distinct customer for a date range. So from 12-20 to 12-22 (inclusive), we'd have 3 distinct customers, but 12-20 to 12-21 there are 2 distinct customers.

If we want to run this pipeline once a day and there are many customers, how can we keep a rolling count of distinct customers for an arbitrary date range? Is there a way to do this without storing a huge list of customer names for each day?

We'd like to support a frontend that has a date range filter and can quickly calculate results for that date range. For example:

Start Date End Date Average Income Per Customer
2022-12-20 2022-12-21 (30 12 12 15)/2 = 34.5
2022-12-20 2022-12-22 (30 12 12 15 15)/3 = 28

The only approach I can think of is to store a set of customer names for each day, and when viewing the results calculate the size of the joined set of sets to calculate distinct customers. This seems inefficient. In this case we'd store the following table, with the customer column being extremely large.

Date Total Income Customers
2022-12-20 42 set(Mary, Bob)
2022-12-21 27 set(Mary, Bob)
2022-12-22 15 set(Alice)

CodePudding user response:

You need to use window functions. Here is a solution to a similar problem: https://stackoverflow.com/a/45869254/1888799

CodePudding user response:

You can achieve this by filtering rows with dates between start_date and end_date then grouping by customer_id and calculating the sum of amounts and then getting avg of these amounts. this approach works for only one start_date and end_date and you should run this code with different parameters to solve with different date ranges

start_date = '2022-12-20'
end_date = '2022-12-21'
(
    df
    .withColumn('isInRange', F.col('date').between(start_date, end_date))
    .filter(F.col('isInRange'))
    .groupby('customer')
    .agg(F.sum('amount').alias('sum'))
    .agg(F.avg('sum').alias('avg income'))
).show()
  • Related