Home > Net >  Flink AggregateFunction in TumblingWindow is automatically splitted in two windows for big window si
Flink AggregateFunction in TumblingWindow is automatically splitted in two windows for big window si

Time:06-28

I'm calculating a simple mean on some records, using different windows sizes. Using 1 hour and 1 week windows there are no problems, and the results are computed correctly.

var keyed = src
        .filter(event -> event.getSensor_id() < 10000)
        .keyBy(Event::getSensor_id);

var hourResult = keyed
        .window(TumblingEventTimeWindows.of(Time.hours(1)))
        .aggregate(new AvgQ1(Config.HOUR))
        .setParallelism(5);

var weekResult = keyed
        .window(TumblingEventTimeWindows.of(Time.days(7)))
        .aggregate(new AvgQ1(Config.WEEK))
        .setParallelism(5);

Instead, using a window of 1 month (31 days), the window is splitted in half, and flink gave as output two results, one for records from 05-1 to 05-14 and one for records from 05-15 to 05-31.

SingleOutputOperator<OutputQuery> monthResult = keyed
    .window(TumblingEventTimeWindows.of(Time.days(31)))
    .aggregate(new AvgQ1(Config.MONTH))
    .setParallelism(5);

Using a window of size 30 days, the result is instead splitted into (05-1;05-27) and (05-28;05-31).

SingleOutputOperator<OutputQuery> monthResult = keyed
    .window(TumblingEventTimeWindows.of(Time.days(30)))
    .aggregate(new AvgQ1(Config.MONTH))
    .setParallelism(5);

This is the AggregateFunction.

public class AvgQ1 implements AggregateFunction<Event, AccumulatorQ1, OutputQuery> {
    String windowType;
    public AvgQ1(String windowType) {
        this.windowType = windowType;
    }

    public AccumulatorQ1 createAccumulator() {
        return new AccumulatorQ1();
    }

    @Override
    public AccumulatorQ1 add(Event values, AccumulatorQ1 acc) {
        acc.sum  = values.getTemperature();
        acc.sensor_id = values.getSensor_id();
        acc.last_timestamp = values.getTimestamp();
        acc.count  ;
        return acc;
    }

    @Override
    public AccumulatorQ1 merge(AccumulatorQ1 a, AccumulatorQ1 b) {
        a.count  = b.count;
        a.sum  = b.sum;
        return a;
    }

    @Override
    public OutQ1 getResult(AccumulatorQ1 acc) {
        double mean = acc.sum / (double) acc.count;
        OutQ1 result = new OutQ1(windowType);
        result.setSensor_id(acc.sensor_id);
        result.setTemperature(mean);
        result.setOccurrences(acc.count);
        if (windowType.equals(Config.HOUR)) {
            result.setTimestamp(Tools.getHourSlot(acc.last_timestamp));
        }
        if (windowType.equals(Config.WEEK)) {
            result.setTimestamp(Tools.getWeekSlot(acc.last_timestamp));
        }
        if (windowType.equals(Config.MONTH)) {
            result.setTimestamp(Tools.getMonthSlot(acc.last_timestamp));
        }
        return result;
    }
}

I think that the problem is somehow related to memory usage, as if the accumulator or the window couldn't hold too much data. So i tried to monitor jvm heap usage in WebUI, but it does not crosses the limit, and also change the backend state from hash to rockdb.

I'm using Flink on docker, reading DataStream from a kafka topic, any idea?

CodePudding user response:

The issue has to do with how Flink's time-based window assigner works. It divides the time since the Unix epoch (01-01-1970) into equal-sized chunks (windows) of the specified duration, and then assigns incoming events into those chunks (windows).

Thus with windows that are 30 days long, these windows cover these ranges:

01-01-1970 thru 30-01-1970
31-01-1970 thru 01-02-1970
...
29-04-2022 thru 28-05-2022
29-05-2022 thru 27-06-2022
...

This works okay for windows that are one second, one minute, one hour, one day, or even one week long, but for month-long windows it's not very convenient.

CodePudding user response:

Try to set the buffer timeout to -1 via .setTimeoutBuffer(-1) onExecutionEnvironment

  • Related