Home > Net >  Multiple kafka Consumer Not receiving messages
Multiple kafka Consumer Not receiving messages


I am using embeddedKafkaBroker and Kafka Binder Streams.

@Profile({"dev", "test"})
public class EmbeddedKafkaBrokerConfig {

  private static final String TMP_EMBEDDED_KAFKA_LOGS =
      String.format("/tmp/embedded-kafka-logs-%1$s/", UUID.randomUUID());
  private static final String PORT = "port";
  private static final String LOG_DIRS = "log.dirs";
  private static final String LISTENERS = "listeners";
  private static final Integer KAFKA_PORT = 9092;
  private static final String LISTENERS_VALUE = "PLAINTEXT://localhost:"   KAFKA_PORT;
  private static final Integer ZOOKEEPER_PORT = 2181;

  private EmbeddedKafkaBroker embeddedKafkaBroker;

   * bean for the embeddedKafkaBroker.
   * @return local embeddedKafkaBroker
  public EmbeddedKafkaBroker embeddedKafkaBroker() {
    Map<String, String> brokerProperties = new HashMap<>();
    brokerProperties.put(LISTENERS, LISTENERS_VALUE);
    brokerProperties.put(PORT, KAFKA_PORT.toString());
    brokerProperties.put(LOG_DIRS, TMP_EMBEDDED_KAFKA_LOGS);
    this.embeddedKafkaBroker =
        new EmbeddedKafkaBroker(1, true, 2)
    return embeddedKafkaBroker;

  /** close the embeddedKafkaBroker on destroy. */
  public void preDestroy() {
    if (embeddedKafkaBroker != null) {
      log.warn("[EmbeddedKafkaBrokerConfig]  destroying kafka broker {}", embeddedKafkaBroker);

Using Rest Controller to trigger publish data to the topic

public class DemoController {

  DemoSupplier demoSupplier;

  public String helloController(){
    return "Hello World!";


public class DemoSupplier {

  public EmbeddedKafkaBroker kafkaBroker;

  private KafkaTemplate<String,String> kafkaTemplate;

  private String topicName;
  public KafkaTemplate<String, String> stringKafkaTemplate(){
    Map<String, Object> producerConfigs =new HashMap<>();
    producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs));

  public void supply(){
    for(int i =0 ;i<100;i  ){
      kafkaTemplate.send(topicName, "Message:" i*2);


public class DemoConsumer {

  public Consumer<KStream<String, String>> demoConsumerProcessor(){
    return input -> input.foreach(((key, value) -> System.out.println(value)));
  public Consumer<KStream<String, String>> demoConsumerProcessor2(){
    return input -> input.foreach(((key, value) -> System.out.println("This is second consumer 2: " value)));


# ===============================
# = Profiles
# ===============================
# ===============================
# = Kafka Topics
# ===============================
# ===============================
# ===============================

Note- In this property(spring.cloud.stream.function.definition), the name of bean that appears first will consume the message published to the topic. But only one of them receives them. Both consumer have the same group id, according to my knowledge set using applicationId, saw the same in logs also.

Now here comes my deduction-

Number of partitions created by embedded Kafka are always 1. I have tried changing it to 2 when I am creating its bean(See constuctor of it- (count:1, controlledShutdownn:true, partitions:2). But i think somethings are not in places.

Important logs-

[Consumer clientId=group_id-359878ed-1b41-4cf0-b9b8-6e21e5e1f0fe-StreamThread-1-consumer, groupId=group_id] Updating assignment with
Assigned partitions:                       [demoTopic-0]
Current owned partitions:                  []
Added partitions (assigned - owned):       [demoTopic-0]
Revoked partitions (owned - assigned):     []

Consumer clientId=group_id-4dce1ba5-7d97-4c18-92c3-cb79dab271b5-StreamThread-1-consumer, groupId=group_id] Updating assignment with
Assigned partitions:                       []
Current owned partitions:                  []
Added partitions (assigned - owned):       []
Revoked partitions (owned - assigned):     [] 

Now according to logs , maybe only one partition is created for the topic.

Now some confusion is regarding "Updating assignment", is their any more property i have to set to use multiple consumer. Or Some problem with embeddedKafa. Please look from other perspective dont want to be a XY Problem. Full logs are too big. I will share if needed.

CodePudding user response:

If there's only one partition, then only one consumer in the same group will be able to process that partition.

Ideally, you create topics yourself rather than letting the broker do it automatically. In fact, it's recommended to disable that setting.

Your other options are to use distinct groups or to call two different processing methods but use one group

  public Consumer<KStream<String, String>> demoConsumerProcessor(){
    return input -> input.foreach(((key, value) -> {
        System.out.println("second consumer "    value))) 
  })) ;

CodePudding user response:

The solution i found is not ideal, but it did the job-

I am asking the broker to create topic while creating its bean-

 this.embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, 2,"demoTopic") 
    return embeddedKafkaBroker;

Now both consumer receiving the record.

  • Related