Home > database >  Why two consumers with the same group.id receive the same message in Kafka
Why two consumers with the same group.id receive the same message in Kafka

Time:04-12

Spring-boot producer app sends a message == "AAA" with key == "0" to the topic == "Topic0". Consumer app (also spring-boot) has 3 listeners/consumers with the following configuration:

  1. Topic0, Partition == 0, Group0
  2. Topic0, Partition == 0, Group0
  3. Topic0, Partition == 0, Group1
public static final String KAFKA_TOPIC_0 = "Topic0";
public static final String KAFKA_GROUP_ID_0 = "Group0";
public static final String KAFKA_GROUP_ID_1 = "Group1";
@KafkaListener(topicPartitions = @TopicPartition(topic = KAFKA_TOPIC_0, partitions = {"0"}), groupId = KAFKA_GROUP_ID_0)
public void listenTopic0Partition0GroupId0(String message,
                                           @Header(KafkaHeaders.GROUP_ID) String groupId,
                                           @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
    System.out.printf("listenTopic0Partition0GroupId0 topic %s partition %d group %s message %s \n",
            meta.topic(),
            meta.partition(),
            groupId,
            message);
}
@KafkaListener(topicPartitions = @TopicPartition(topic = KAFKA_TOPIC_0, partitions = {"0"}), groupId = KAFKA_GROUP_ID_0)
public void listen2Topic0Partition0GroupId0(String message,
                                            @Header(KafkaHeaders.GROUP_ID) String groupId,
                                            @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
    System.out.printf("listen2Topic0Partition0GroupId0 topic %s partition %d group %s message %s \n",
            meta.topic(),
            meta.partition(),
            groupId,
            message);
}
@KafkaListener(topicPartitions = @TopicPartition(topic = KAFKA_TOPIC_0, partitions = {"0"}), groupId = KAFKA_GROUP_ID_1)
public void listenTopic0Partition0GroupId1(String message,
                                           @Header(KafkaHeaders.GROUP_ID) String groupId,
                                           @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {

    System.out.printf("listenTopic0Partition0GroupId1 topic %s partition %d group %s message %s \n",
            meta.topic(),
            meta.partition(),
            groupId,
            message);
}

Now I send one message:

kafkaTemplate.send("Topic0", "0", "AAA");

And I see that 3 listeners received it:

listenTopic0Partition0GroupId1 topic Topic0 partition 0 group Group1 message AAA 
listenTopic0Partition0GroupId0 topic Topic0 partition 0 group Group0 message AAA 
listen2Topic0Partition0GroupId0 topic Topic0 partition 0 group Group0 message AAA

According to https://docs.confluent.io/5.4.1/kafka/introduction.html:

Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

If all the consumer instances have the same consumer group, then the records will effectively be load-balanced over the consumer instances.

If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.

However, 2 consumers from the same group listening to the same topic and partition received the same message. How come?

I put also topic configuration:

@Bean
public NewTopic topicAAA() {
    return TopicBuilder.name(KAFKA_TOPIC_0)
            .partitions(2)
            .replicas(1)
            .build();
}

CodePudding user response:

You should not be using groupId with partitions as groupId will be ignored. Try removing partitions and check.

This is the actual answer to your question.

Refer to this answer: https://stackoverflow.com/a/56552126/2534090

You can upvote the linked answer if it is helpful, for additional info read below.


Adding some additional detail for whoever stumbles upon the why part:

For Kafka consumer, we have assign and subscribe. assign() doesn't use the group management functionalities while the subscribe() does.

It is during the assign that we give the topic partitions manually. For subscribe() we only give the topic name(s). The distribution of topic partitions across different consumers in the group through the subscribe() will be taken care internally. Things like consumer heartbeats, rebalances etc will come into picture as part of this group management functionality.

Since you are manually using the partitions field, assign() is likely to be invoked. See KafkaConsumer for these methods.


Depending on your use-case you can choose to decide whether you want subscribe() or not.

The rule of thumb is if you want to automatically distribute load across several consumers for the same set of topics and keep track of offsets till where you previously read, then go with the subscribe().

If you just want to glance at the data, more of viewing purpose like in kafka-console-consumer, then use assign().

  • Related