I have a question that is very similar to How to group by time interval in Spark SQL
However, my metric is time spent (duration
), so my data looks like
KEY |Event_Type | duration | Time
001 |event1 | 10 | 2016-05-01 10:49:51
002 |event2 | 100 | 2016-05-01 10:50:53
001 |event3 | 20 | 2016-05-01 10:50:55
001 |event1 | 15 | 2016-05-01 10:51:50
003 |event1 | 13 | 2016-05-01 10:55:30
001 |event2 | 12 | 2016-05-01 10:57:00
001 |event3 | 11 | 2016-05-01 11:00:01
Is there a way to sum the time spent into five minute buckets, grouped by key, and know when the duration goes outside of the bound of the bucket?
For example, the first row starts at 10:49:51 and ends at 10:50:01
Thus, the bucket for key 001 in window [2016-05-01 10:45:00.0,2016-05-01 10:50:00.0]
would get 8 seconds of duration (51 seconds to 60 seconds) and the and the 10:50 to 10:55 would get 2 seconds of duration, plus the relevant seconds from other log lines (20 seconds from the third row, 15 from the 4th row).
I want to sum the time in a specific bucket, but the solution on the other thread of
df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")
would overcount in the buckets timestamps that overlap buckets start in, and undercount the subsequent buckets
Note: My Time
column is also in Epoch timestamps like 1636503077
, but I can easily cast it to the above format if that makes this calculation easier.
CodePudding user response:
for my opinion, maybe you need preprocess you data by spilt you duration to every minutes (or every five minutes).
as you wish, the first row
001 |event1 | 10 | 2016-05-01 10:49:51
should be convert to
001 |event1 | 9 | 2016-05-01 10:49:51
001 |event1 | 1 | 2016-05-01 10:50:00
then you can use spark window function to sum it properly.
df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")
that will not change the result if you only want to know the duration of time bucket, but will increasing the record counts.