Home > Back-end >  Spring cloud stream - Kafka consumer consuming duplicate messages with StreamListener
Spring cloud stream - Kafka consumer consuming duplicate messages with StreamListener

Time:02-08

With our spring boot app, we notice kafka consumer consuming message twice randomly once in a while only in prod env. We have 6 instances with 6 partitions deployed in PCF.We have caught messages with same offset and partition received twice in same topic which causes duplicates and is a business critical for us. We haven't noticed this in non production env and it is hard to reproduce in non prod env. We have recently switched to Kafka and we are not able to find out the root issue.

We are using spring-cloud-stream/spring-cloud-stream-binder-kafka- 2.1.2 Here is the Config:

spring:
  cloud:
    stream:
      default.consumer.concurrency: 1 
      default-binder: kafka
      bindings:
        channel:
          destination: topic
          content_type: application/json
          autoCreateTopics: false
          group: group
          consumer:
            maxAttempts: 1
      kafka:
        binder:
          autoCreateTopics: false
          autoAddPartitions: false
          brokers: brokers list
        bindings:
          channel:
            consumer:
              autoCommitOnError: true
              autoCommitOffset: true
              configuration:
                max.poll.interval.ms: 1000000
                max.poll.records: 1 
                group.id: group

We use @Streamlisteners to consume the messages.

Here is the instance we received duplicate and the error message captured in server logs.

ERROR 46 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=group] Offset commit failed on partition topic-0 at offset 1291358: The coordinator is not aware of this member. ERROR 46 --- [container-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null OUT org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:871) ~[kafka-clients-2.0.1.jar!/:na]

There is no crash and all the instances are healthy at the time of duplicate. Also there is confusion with error log - Error while processing: null since Message was successfully processed twice. And max.poll.interval.ms: 100000 which is about 16 minutes and it is supposed to be enough time to process any message for the system and session timeout and heartbit config is default. Duplicate is received within 2 seconds in most of the instances. Any configs that we are missing ? Any suggestion/help is highly appreciated.

CodePudding user response:

Commit cannot be completed since the group has already rebalanced

A rebalance occurred because your listener took too long; you should adjust max.poll.records and max.poll.interval.ms to make sure you can always handle the records received within the timelimit.

In any case, Kafka does not guarantee exactly once delivery, only at least once delivery. You need to add idempotency to your application and detect/ignore duplicates.

CodePudding user response:

Also, keep in mind StreamListener and the annotation-based programming model has been deprecated for 3 years and has been removed from the current main, which means the next release will not have it. Please migrate your solution to a functional based programming model

  •  Tags:  
  • Related