Home > database >  Spring integration properties for Kafka
Spring integration properties for Kafka

Time:03-31

While trying to use listener config properties in application.yml, I am facing an issue where the KafkaListener annotated method is not invoked at all if I use the application.yml config(listener.type= batch). It only gets invoked when I explicitly set setBatchListener to true in code. Here is my code and configuration.

  1. Consumer code:
@KafkaListener(containerFactory = "kafkaListenerContainerFactory",
        topics = "${spring.kafka.template.default-topic}",
        groupId = "${spring.kafka.consumer.group-id}")
    public void receive(List<ConsumerRecord<String,byte[]>> consumerRecords,Acknowledgment acknowledgment){
        processor.process(consumerRecords,acknowledgment);
    }
  1. application.yml:
    listener:
      missing-topics-fatal: false
      type: batch
      ack-mode: manual
  1. Consumer configuration:

     ConcurrentKafkaListenerContainerFactory<String, String> factory =
             new ConcurrentKafkaListenerContainerFactory<>();
     factory.setConsumerFactory(
             new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
     factory.setErrorHandler(new SeekToCurrentErrorHandler( new UpdateMessageErrorHandler(),new FixedBackOff(idleEventInterval,maxFailures)));
     final ContainerProperties properties = factory.getContainerProperties();
     properties.setIdleBetweenPolls(idleBetweenPolls);
     properties.setIdleEventInterval(idleEventInterval);
     return factory;
    

    }

CodePudding user response:

If I'm not mistaken, by using the ConcurrentKafkaListenerContainerFactory builder in your configuration you're essentially overriding a piece of code that is usually executed within ConcurrentKafkaListenerContainerFactoryConfigurer class within spring autoconfiguration package:

        if (properties.getType().equals(Type.BATCH)) {
            factory.setBatchListener(true);
            factory.setBatchErrorHandler(this.batchErrorHandler);
        } else {
            factory.setErrorHandler(this.errorHandler);
        }

Since it's hardcoded in your application.yaml file anyway, why is it a bad thing for it to be configured in your @Configuration file?

  • Related