Home > OS >  Spring Boot Kafka Consumers does not consume in the order the messages are produced
Spring Boot Kafka Consumers does not consume in the order the messages are produced

Time:05-12

I have made two Spring Boot project. One with a kafka producer and the other with a listener. Then I have a dockerfile like this: Where I create a container for Zookeeper and Kafka and also one producer container and two consumer container.

version: '3'

services:

  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    restart: always
    ports:
      - 2181:2181

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    restart: always
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
    links:
      - zookeeper:zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

  consumer1:
    image: consumer:0.0.1-SNAPSHOT
    container_name: consumer1
    depends_on:
      - kafka
    restart: always
    ports:
      - 8081:8081
    environment:
      SERVER_PORT: 8081
    depends_on:
      - kafka
    links:
      - kafka:kafka

  consumer2:
    image: consumer:0.0.1-SNAPSHOT
    container_name: consumer2
    depends_on:
      - kafka
    restart: always
    ports:
      - 8082:8082 
    environment:
      SERVER_PORT: 8082
    depends_on:
      - kafka
    links:
      - kafka:kafka


  producer:
    image: producer:0.0.1-SNAPSHOT
    container_name: producer
    depends_on:
      - kafka
    restart: always
    ports:
      - 8080:8080
    environment:
      SERVER_PORT: 8080
    depends_on:
      - kafka
    links:
      - kafka:kafka
      

Now to my problem. I want my consumers to consume from the same topic which I have accomplished. BUT - It seems that they are not consuming the messages in the order that the producer produces them.

As you can see below "Number: 4" is consumed before "number: 3" for example:

producer     | i: 0
consumer2    | Number: 0
producer     | i: 1
consumer2    | Number: 1
producer     | i: 2
consumer1    | Number: 2
producer     | i: 3
producer     | i: 4
consumer2    | Number: 4
producer     | i: 5
consumer1    | Number: 6
producer     | i: 6
consumer2    | Number: 3
producer     | i: 7
producer     | i: 8
producer     | i: 9
consumer2    | Number: 5
producer     | i: 10
consumer1    | Number: 10
producer     | i: 11

My KafkaProducer class:

@Service
public class KafkaProducer {
    
    @Value("${topic.name.producer}")
    private String topicName;
    
    @Autowired
    private KafkaTemplate<String, String> kafkaStringTemplate;
    public void sendList(String word)   {
        kafkaStringTemplate.send(topicName, word);  
    }   
}

I have a for loop that feeds this one with

    for (int i = 0; i < 100; i  ) {
        producer.sendList("Number: "   i);
    }   

In my producer project I have a TopicConfiguration:

@Configuration
public class TopicConfiguration
{
    @Value(value = "${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapAddress;

    @Value(value = "${topic.name.producer}")
    private String topicName;

    @Bean
    public NewTopic generalTopic() {
        return TopicBuilder.name(topicName)
                .partitions(3)
                .replicas(1)
                .build();
    }

    @Bean
    public KafkaAdmin kafkaAdmin()
    {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
}

Application.Properties file:

server.port=${SERVER_PORT}
# Producer properties
spring.kafka.producer.bootstrap-servers=kafka:9092
#spring.kafka.producer.bootstrap-servers=172.21.0.2:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.group-id=group-1
topic.name.producer=test


# Common Kafka Properties
auto.create.topics.enable=true

My consumer project:

@Service
public class KafkaConsumer {
    
    @Value("${topic.name.consumer")
    private String topicName;

    @KafkaListener(topics = "${topic.name.consumer}", groupId = "group-1")
    public void consumeLinks(String word) throws InterruptedException {
        
        System.out.println(word);
        Thread.sleep(5000);
    }
}

ApplicationProperties file:

#server.port=8081
server.port=${SERVER_PORT}
# Producer properties
spring.kafka.consumer.bootstrap-servers=kafka:9092
#spring.kafka.consumer.bootstrap-servers=172.21.0.2:9092
#spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.group-id=group-1
topic.name.consumer=test

spring.kafka.consumer.auto-offset-reset=earliest

# Common Kafka Properties
auto.create.topics.enable=true

I have tried to google but have not found any solution or maybe I haven´t problem not understood how to solve. Is it someone that can tell me what is missing or have a link to a page for dummies how to solve it?

CodePudding user response:

Your topic has 3 partitions. There will be no order guarantee unless you use exactly one partition. More specifically, data is only ordered within a partition; each consumer is consuming data that is ordered within its assigned partitions.

To show this, try

@KafkaListener(topics = "${topic.name.consumer}", groupId = "group-1")
  public void consumeLinks(
    @Payload String word, 
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: "   word
          " from partition: "   partition);

And if you use one partition, that means you can only have one consumer in that consumer group.

  • Related