Home > Software engineering >  How to count distinct over window on months (rather than days)?
How to count distinct over window on months (rather than days)?

Time:09-16

I would like to count the distinct number of emails of the current month and the previous 2 months. Preferably I'd like the syntax to be in PySpark, rather than SQL.

Example input:

df = spark.createDataFrame(
    [('2022-01-01', 'A'),
     ('2022-01-01', 'A'),
     ('2022-01-01', 'A'),
     ('2022-01-01', 'B'),
     ('2022-01-01', 'Z'),
     ('2022-01-01', 'Z'),
     ('2022-02-01', 'A'),
     ('2022-02-01', 'B'),
     ('2022-02-01', 'C'),
     ('2022-02-01', 'D'),
     ('2022-02-01', 'Z'),
     ('2022-02-01', 'A'),
     ('2022-02-01', 'F'),
     ('2022-03-01', 'A'),
     ('2022-03-01', 'B'),
     ('2022-03-01', 'B'),
     ('2022-03-01', 'C'),
     ('2022-04-01', 'G'),
     ('2022-04-01', 'H'),
     ('2022-05-01', 'G'),
     ('2022-05-01', 'H'),
     ('2022-05-01', 'I'),
     ('2022-06-01', 'I'),
     ('2022-06-01', 'J'),
     ('2022-06-01', 'K')],    
    ['yyyy_mm_dd', 'email']
)

Desired output:

yyyy_mm_dd count_distinct_email
2022-01-01 3
2022-02-01 6
2022-03-01 6
2022-04-01 8
2022-05-01 6
2022-06-01 5

As rangeBetween doesn't support months, I was forced to use SQL syntax to achieve the rolling distinct count. I tried this

users_base.createOrReplaceTempView('users_base')

users_unique = spark.sql(
                         'SELECT \
                             yyyy_mm_dd, \
                             COUNT(DISTINCT soylent_booker_email_id_orig) OVER ( \
                                 ORDER BY CAST(yyyy_mm_dd AS timestamp) ASC\
                                 RANGE BETWEEN INTERVAL 2 MONTHS PRECEDING AND CURRENT ROW \
                                ) AS users_unique_count \
                         FROM \
                             users_base'
                         )

but it's not workking and I found out that COUNT DISTINCT is not supported for window functions. Does somebody have any idea how I can generate my desired output? Thanks!

CodePudding user response:

users_unique = spark.sql("""
    SELECT yyyy_mm_dd,
           size(array_distinct(flatten(collect_set(emails) over (order by cast (yyyy_mm_dd as timestamp) asc
                                                                 range between interval 2 months preceding AND current row)))) as count_distinct_email
      FROM (SELECT yyyy_mm_dd, collect_set(email) as emails
              FROM users_base
             GROUP BY yyyy_mm_dd)
""")

The subquery here does grouping and filtering out duplicates, then outer query runs the window function. Now we need to perform a few tricks:

  • emails is a set, so collect_set(emails) returns set of sets
  • flatten returns a flattened array (not set - so includes duplicates)
  • array_distinct filters out the duplicates
  • size, finally gives the final distinct count

CodePudding user response:

Here's a way to do it using arrays (collect_set, size, aggregate, array_union).

data_sdf. \
    groupBy('yyyy_mm_dd'). \
    agg(func.collect_set('email').alias('unique_emails')). \
    withColumn('curr_and_2_prev', 
               func.collect_list('unique_emails').over(wd.partitionBy().orderBy('yyyy_mm_dd').rowsBetween(-2, 0))
               ). \
    withColumn('curr_and_2_prev', 
               func.expr('aggregate(curr_and_2_prev, cast(array() as array<string>), (x, y) -> array_union(x, y))')
               ). \
    withColumn('count_distinct_email', func.size('curr_and_2_prev')). \
    orderBy('yyyy_mm_dd'). \
    show()

#  ---------- ------------------ -------------------- -------------------- 
# |yyyy_mm_dd|     unique_emails|     curr_and_2_prev|count_distinct_email|
#  ---------- ------------------ -------------------- -------------------- 
# |2022-01-01|         [Z, B, A]|           [Z, B, A]|                   3|
# |2022-02-01|[F, C, Z, B, A, D]|  [Z, B, A, F, C, D]|                   6|
# |2022-03-01|         [C, B, A]|  [Z, B, A, F, C, D]|                   6|
# |2022-04-01|            [G, H]|[F, C, Z, B, A, D...|                   8|
# |2022-05-01|         [I, G, H]|  [C, B, A, G, H, I]|                   6|
# |2022-06-01|         [K, J, I]|     [G, H, I, K, J]|                   5|
#  ---------- ------------------ -------------------- -------------------- 
  • create an array of unique emails grouped at the months
  • curr_and_2_prev - create an array of the aforementioned arrays for current month and 2 previous months, using a sliding window
  • use aggregate function to union the arrays within the array of arrays using array_union (it keeps unique values only)
    • you can also use flatten and array_distinct instead of the aggregate
  • use size to count the elements within the resulting array

alternatively,

data_sdf. \
    groupBy('yyyy_mm_dd'). \
    agg(func.collect_set('email').alias('unique_emails')). \
    withColumn('curr_and_2_prev', 
               func.collect_list('unique_emails').over(wd.partitionBy().orderBy('yyyy_mm_dd').rowsBetween(-2, 0))
               ). \
    withColumn('curr_and_2_prev', func.array_distinct(func.flatten('curr_and_2_prev'))). \
    withColumn('count_distinct_email', func.size('curr_and_2_prev')). \
    orderBy('yyyy_mm_dd'). \
    show()
  • Related