Home > Net >  Aggregation 30days data in Spark Structured Streaming
Aggregation 30days data in Spark Structured Streaming

Time:10-06

I am using Spark Structured Streaming to calculate the monthly amount of a user. I am using the below code:

df = spark
        .readStream
        .format('kafka')
        .option("startingOffsets", "earliest")
        .option("failOnDataLoss", "false")
        .load()


df1 = df.groupby('client_id', 'id', window(col('date'), "30 days"))
.agg(sum(col('amount')).alias('amount'), count(col('id')).alias('count'))

df1.selectExpr("CAST(client_id AS STRING) AS key", "to_json(struct(*)) AS value")
       .writeStream
       .format('kafka')
       ........

I observed that the output is not correct. For example:

input

{"client_id":"1", "id":"1", "date":"2022-08-01", "amount": 10.0}
{"client_id":"1", "id":"1", "date":"2022-08-15", "amount": 10.0}
{"client_id":"1", "id":"1", "date":"2022-08-25", "amount": 10.0}
{"client_id":"1", "id":"1", "date":"2022-08-26", "amount": 10.0}
{"client_id":"1", "id":"1", "date":"2022-08-27", "amount": 10.0}
{"client_id":"1", "id":"1", "date":"2022-08-28", "amount": 10.0}
{"client_id":"1", "id":"1", "date":"2022-08-29", "amount": 10.0}

output

{"client_id":"1","id":"1","amount":10.0,"count":1}
{"client_id":"1","id":"1","amount":20.0,"count":2}
{"client_id":"1","id":"1","amount":30.0,"count":3}
{"client_id":"1","id":"1","amount":40.0,"count":4}
{"client_id":"1","id":"1","amount":50.0,"count":5}
{"client_id":"1","id":"1","amount":10.0,"count":1}
{"client_id":"1","id":"1","amount":20.0,"count":2}

The first input record was on "2022-08-01" with an amount of 10. So it should sum the amount for the next 30 days. So the final sum should be 70, but it is 50 and then 20. It is calculating the sum for the next 27 days. You can see that the "count" and "amount" is updated on "2022-08-28". It is not aggregating records for 30 days.

CodePudding user response:

According to the documentation, there's this 4th argument...

startTime : str, optional
The offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes.

This means, that windows are fixed durations since 1970-01-01 00:00:00 UTC, unless you specify some other startTime which must be an offset interval from this point in time.

If you use window function without the startTime specified and windowDuration="30 days", you will get intervals divided into 30-day periods starting from 1970-01-01 00:00:00 UTC. In your case, I don't really understand why the line between two 30-day windows is on 2022-08-28, because for me it's on 2022-08-26:

{2022-07-27 00:00:00, 2022-08-26 00:00:00}
{2022-08-26 00:00:00, 2022-09-25 00:00:00}

Even though there is a difference in exact dates, but the logic is the same: 30-day windows are created fixed from some specific point in time.

If I specify startTime="1 day", I get 30-day windows shifted:

{2022-07-28 00:00:00, 2022-08-27 00:00:00}
{2022-08-27 00:00:00, 2022-09-26 00:00:00}

Only by carefully choosing the startTime we can get time windows start at the date we want, but it will always be fixed until the next change of startTime.


To get what you intend, you could probably use slideDuration parameter of the window function and filtering based on aggregation result:

df1 = (df
    .groupby('client_id', 'id', F.window('date', "30 days", "1 day"))
    .agg(F.sum('amount').alias('amount'), F.count('id').alias('count'))
    .filter((F.date_add(F.sort_array(F.collect_set('date'), False)[0], 1) == F.col("window.end")))
).selectExpr("CAST(client_id AS STRING) AS key", "to_json(struct(*)) AS value")
  • Related