I would like to count the distinct number of emails of the current month and the previous 2 months. Preferably I'd like the syntax to be in PySpark, rather than SQL.
Example input:
df = spark.createDataFrame(
[('2022-01-01', 'A'),
('2022-01-01', 'A'),
('2022-01-01', 'A'),
('2022-01-01', 'B'),
('2022-01-01', 'Z'),
('2022-01-01', 'Z'),
('2022-02-01', 'A'),
('2022-02-01', 'B'),
('2022-02-01', 'C'),
('2022-02-01', 'D'),
('2022-02-01', 'Z'),
('2022-02-01', 'A'),
('2022-02-01', 'F'),
('2022-03-01', 'A'),
('2022-03-01', 'B'),
('2022-03-01', 'B'),
('2022-03-01', 'C'),
('2022-04-01', 'G'),
('2022-04-01', 'H'),
('2022-05-01', 'G'),
('2022-05-01', 'H'),
('2022-05-01', 'I'),
('2022-06-01', 'I'),
('2022-06-01', 'J'),
('2022-06-01', 'K')],
['yyyy_mm_dd', 'email']
)
Desired output:
yyyy_mm_dd | count_distinct_email |
---|---|
2022-01-01 | 3 |
2022-02-01 | 6 |
2022-03-01 | 6 |
2022-04-01 | 8 |
2022-05-01 | 6 |
2022-06-01 | 5 |
As rangeBetween doesn't support months, I was forced to use SQL syntax to achieve the rolling distinct count. I tried this
users_base.createOrReplaceTempView('users_base')
users_unique = spark.sql(
'SELECT \
yyyy_mm_dd, \
COUNT(DISTINCT soylent_booker_email_id_orig) OVER ( \
ORDER BY CAST(yyyy_mm_dd AS timestamp) ASC\
RANGE BETWEEN INTERVAL 2 MONTHS PRECEDING AND CURRENT ROW \
) AS users_unique_count \
FROM \
users_base'
)
but it's not workking and I found out that COUNT DISTINCT is not supported for window functions. Does somebody have any idea how I can generate my desired output? Thanks!
CodePudding user response:
users_unique = spark.sql("""
SELECT yyyy_mm_dd,
size(array_distinct(flatten(collect_set(emails) over (order by cast (yyyy_mm_dd as timestamp) asc
range between interval 2 months preceding AND current row)))) as count_distinct_email
FROM (SELECT yyyy_mm_dd, collect_set(email) as emails
FROM users_base
GROUP BY yyyy_mm_dd)
""")
The subquery here does grouping and filtering out duplicates, then outer query runs the window function. Now we need to perform a few tricks:
emails
is a set, socollect_set(emails)
returns set of setsflatten
returns a flattened array (not set - so includes duplicates)array_distinct
filters out the duplicatessize
, finally gives the final distinct count
CodePudding user response:
Here's a way to do it using arrays (collect_set
, size
, aggregate
, array_union
).
data_sdf. \
groupBy('yyyy_mm_dd'). \
agg(func.collect_set('email').alias('unique_emails')). \
withColumn('curr_and_2_prev',
func.collect_list('unique_emails').over(wd.partitionBy().orderBy('yyyy_mm_dd').rowsBetween(-2, 0))
). \
withColumn('curr_and_2_prev',
func.expr('aggregate(curr_and_2_prev, cast(array() as array<string>), (x, y) -> array_union(x, y))')
). \
withColumn('count_distinct_email', func.size('curr_and_2_prev')). \
orderBy('yyyy_mm_dd'). \
show()
# ---------- ------------------ -------------------- --------------------
# |yyyy_mm_dd| unique_emails| curr_and_2_prev|count_distinct_email|
# ---------- ------------------ -------------------- --------------------
# |2022-01-01| [Z, B, A]| [Z, B, A]| 3|
# |2022-02-01|[F, C, Z, B, A, D]| [Z, B, A, F, C, D]| 6|
# |2022-03-01| [C, B, A]| [Z, B, A, F, C, D]| 6|
# |2022-04-01| [G, H]|[F, C, Z, B, A, D...| 8|
# |2022-05-01| [I, G, H]| [C, B, A, G, H, I]| 6|
# |2022-06-01| [K, J, I]| [G, H, I, K, J]| 5|
# ---------- ------------------ -------------------- --------------------
- create an array of unique emails grouped at the months
curr_and_2_prev
- create an array of the aforementioned arrays for current month and 2 previous months, using a sliding window- use
aggregate
function to union the arrays within the array of arrays usingarray_union
(it keeps unique values only)- you can also use
flatten
andarray_distinct
instead of theaggregate
- you can also use
- use
size
to count the elements within the resulting array
alternatively,
data_sdf. \
groupBy('yyyy_mm_dd'). \
agg(func.collect_set('email').alias('unique_emails')). \
withColumn('curr_and_2_prev',
func.collect_list('unique_emails').over(wd.partitionBy().orderBy('yyyy_mm_dd').rowsBetween(-2, 0))
). \
withColumn('curr_and_2_prev', func.array_distinct(func.flatten('curr_and_2_prev'))). \
withColumn('count_distinct_email', func.size('curr_and_2_prev')). \
orderBy('yyyy_mm_dd'). \
show()