Home > other >  Dataflow job has high data freshness and events are dropped due to lateness
Dataflow job has high data freshness and events are dropped due to lateness

Time:06-01

I deployed an apache beam pipeline to GCP dataflow in a DEV environment and everything worked well. Then I deployed it to production in Europe environment (to be specific - job region:europe-west1, worker location:europe-west1-d) where we get high data velocity and things started to get complicated.

I am using a session window to group events into sessions. The session key is the tenantId/visitorId and its gap is 30 minutes. I am also using a trigger to emit events every 30 seconds to release events sooner than the end of session (writing them to BigQuery).

The problem appears to happen in the EventToSession/GroupPairsByKey. In this step there are thousands of events under the droppedDueToLateness counter and the dataFreshness keeps increasing (increasing since when I deployed it). All steps before this one operates good and all steps after are affected by it, but doesn't seem to have any other problems.

I looked into some metrics and see that the EventToSession/GroupPairsByKey step is processing between 100K keys to 200K keys per second (depends on time of day), which seems quite a lot to me. The cpu utilization doesn't go over the 70% and I am using streaming engine. Number of workers most of the time is 2. Max worker memory capacity is 32GB while the max worker memory usage currently stands on 23GB. I am using e2-standard-8 machine type.

I don't have any hot keys since each session contains at most a few dozen events.

My biggest suspicious is the huge amount of keys being processed in the EventToSession/GroupPairsByKey step. But on the other, session is usually related to a single customer so google should expect handle this amount of keys to handle per second, no?

Would like to get suggestions how to solve the dataFreshness and events droppedDueToLateness issues.

Adding the piece of code that generates the sessions:

input = input.apply("SetEventTimestamp", WithTimestamps.of(event -> Instant.parse(getEventTimestamp(event))
                            .withAllowedTimestampSkew(new Duration(Long.MAX_VALUE)))
                    .apply("SetKeyForRow", WithKeys.of(event -> getSessionKey(event))).setCoder(KvCoder.of(StringUtf8Coder.of(), input.getCoder()))
                    .apply("CreatingWindow", Window.<KV<String, TableRow>>into(Sessions.withGapDuration(Duration.standardMinutes(30)))
                           .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
                           .discardingFiredPanes()
                           .withAllowedLateness(Duration.standardDays(30)))
                    .apply("GroupPairsByKey", GroupByKey.create())
                    .apply("CreateCollectionOfValuesOnly", Values.create())
                    .apply("FlattenTheValues", Flatten.iterables());

CodePudding user response:

After doing some research I found the following:

  • regarding constantly increasing data freshness: as long as allowing late data to arrive a session window, that specific window will persist in memory. This means that allowing 30 days late data will keep every session for at least 30 days in memory, which obviously can over load the system. Moreover, I found we had some ever-lasting sessions by bots visiting and taking actions in websites we are monitoring. These bots can hold sessions forever which also can over load the system. The solution was decreasing allowed lateness to 2 days and use bounded sessions (look for "bounded sessions").
  • regarding events dropped due to lateness: these are events that on time of arrival they belong to an expired window, such window that the watermark has passed it's end (See documentation for the droppedDueToLateness here). These events are being dropped in the first GroupByKey after the session window function and can't be processed later. We didn't want to drop any late data so the solution was to check each event's timestamp before it is going to the sessions part and stream to the session part only events that won't be dropped - events that meet this condition: event_timestamp >= event_arrival_time - (gap_duration allowed_lateness). The rest will be written to BigQuery without the session data (Apparently apache beam drops an event if the event's timestamp is before event_arrival_time - (gap_duration allowed_lateness) even if there is a live session this event belongs to...)

p.s - in the bounded sessions part where he demonstrates how to implement a time bounded session I believe he has a bug allowing a session to grow beyond the provided max size. Once a session exceeded the max size, one can send late data that intersects this session and is prior to the session, to make the start time of the session earlier and by that expanding the session. Furthermore, once a session exceeded max size it can't be added events that belong to it but don't extend it.

In order to fix that I switched the order of the current window span and if-statement and edited the if-statement (the one checking for session max size) in the mergeWindows function in the window spanning part, so a session can't pass the max size and can only be added data that doesn't extend it beyond the max size. This is my implementation:

public void mergeWindows(MergeContext c) throws Exception {
        List<IntervalWindow> sortedWindows = new ArrayList<>();
        for (IntervalWindow window : c.windows()) {
            sortedWindows.add(window);
        }
        Collections.sort(sortedWindows);
        List<MergeCandidate> merges = new ArrayList<>();
        MergeCandidate current = new MergeCandidate();

        for (IntervalWindow window : sortedWindows) {
            MergeCandidate next = new MergeCandidate(window);
            if (current.intersects(window)) {
                if ((current.union == null || new Duration(current.union.start(), window.end()).getMillis() <= maxSize.plus(gapDuration).getMillis())) {
                    current.add(window);
                    continue;
                }
            }
            merges.add(current);
            current = next;
        }
        merges.add(current);
        for (MergeCandidate merge : merges) {
            merge.apply(c);
        }
    }
  • Related