Home > Enterprise >  Scala Spark get sum by time bucket across team spans and key
Scala Spark get sum by time bucket across team spans and key

Time:11-11

I have a question that is very similar to How to group by time interval in Spark SQL

However, my metric is time spent (duration), so my data looks like

KEY |Event_Type | duration | Time 
001 |event1     | 10     | 2016-05-01 10:49:51
002 |event2     | 100    | 2016-05-01 10:50:53
001 |event3     | 20     | 2016-05-01 10:50:55
001 |event1     | 15     | 2016-05-01 10:51:50
003 |event1     | 13     | 2016-05-01 10:55:30
001 |event2     | 12     | 2016-05-01 10:57:00
001 |event3     | 11     | 2016-05-01 11:00:01

Is there a way to sum the time spent into five minute buckets, grouped by key, and know when the duration goes outside of the bound of the bucket?

For example, the first row starts at 10:49:51 and ends at 10:50:01 Thus, the bucket for key 001 in window [2016-05-01 10:45:00.0,2016-05-01 10:50:00.0] would get 8 seconds of duration (51 seconds to 60 seconds) and the and the 10:50 to 10:55 would get 2 seconds of duration, plus the relevant seconds from other log lines (20 seconds from the third row, 15 from the 4th row).

I want to sum the time in a specific bucket, but the solution on the other thread of df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric") would overcount in the buckets timestamps that overlap buckets start in, and undercount the subsequent buckets

Note: My Time column is also in Epoch timestamps like 1636503077, but I can easily cast it to the above format if that makes this calculation easier.

CodePudding user response:

for my opinion, maybe you need preprocess you data by spilt you duration to every minutes (or every five minutes).
as you wish, the first row

001 |event1     | 10     | 2016-05-01 10:49:51

should be convert to

001 |event1     | 9     | 2016-05-01 10:49:51
001 |event1     | 1     | 2016-05-01 10:50:00

then you can use spark window function to sum it properly.

df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")

that will not change the result if you only want to know the duration of time bucket, but will increasing the record counts.

  • Related