Home > database >  How to load Kafka consumer or producer properties from application.yaml?
How to load Kafka consumer or producer properties from application.yaml?

Time:06-28

Currently I have this code for creating a ConcurrentKafkaListenerContainerFactory.

@Bean
public ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps()));
    factory.setReplyTemplate(kafkaTemplate());
    return factory;
}

How do I create the DefaultKafkaConsumerFactory object with properties from application.yaml? I don't want to set them inside the consumerProps() method.

EDIT: My KafkaConfig class now looks like this.

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    public ReplyingKafkaTemplate<Object, KafkaExampleRecord, KafkaExampleRecord> replyingKafkaTemplate(ProducerFactory<Object, KafkaExampleRecord> producerFactory,
                                                                                                       ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> repliesContainer) {
        return new ReplyingKafkaTemplate<>(producerFactory, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> kafkaListenerContainerFactory) {

        ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> rc =
                kafkaListenerContainerFactory.createContainer("mytopic");
        rc.setAutoStartup(false);
        return rc;
    }

    @Bean
    public KafkaTemplate<Object, KafkaExampleRecord> kafkaTemplate(ProducerFactory<Object, KafkaExampleRecord> pf) {
        return new KafkaTemplate<>(pf);
    }
}

However now I get java.lang.IllegalStateException: a KafkaTemplate is required to support replies.

CodePudding user response:

Spring Boot automatically configures one; simply add it as a parameter to the factory method:

@Bean
public ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> kafkaListenerContainerFactory(
    ConsumerFactory<Object, KafkaExampleRecord> cf) {
...

However, you don't need to declare the container factory bean either; Boot will auto configure it for you and automatically set the KT, if it is defined as a bean.

See Boot's KafkaAnnotationDrivenConfiguration.

  • Related