Home > other >  Reload consumer properties with spring kafka
Reload consumer properties with spring kafka

Time:04-28

We have developed live reloading of config properties with spring boot applications. I have a spring-kafka consumer and I wanted to leverage the live reloading where if I change the consumer property I should be able to start the container without rebooting the application. I used:

KafkaListenerEndpointRegistry.stop()
KafkaListenerEndpointRegistry.start()

I thought the above actually creates a new container but that is not the case. So I wanted to find out if I have to start a container with new config properties how do I do that

 @Bean
    @ConfigurationProperties(prefix = "container.config.properties")
    @ConditionalOnMissingBean
    @RefreshScope
    ContainerConfigProperties containerConfigProperties() {
        return new ContainerConfigProperties();
    }

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnBean(value = {ContainerConfigProperties.class})
    @RefreshScope
    <K, V> ConcurrentKafkaListenerContainerFactory<K, ValueDeserializerContainer<V>> kafkaListenerContainerFactory(final ConsumerFactory<K, ValueDeserializerContainer<V>> consumerFactory,
                                                                                                                   final ContainerConfigProperties containerConfigProperties,
                                                                                                                   final Optional<IAMIdentity> iamIdentity) {
        val factory = new ConcurrentKafkaListenerContainerFactory<K, ValueDeserializerContainer<V>>();
        factory.setBatchListener(true);
        factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(containerConfigProperties.getAckMode());
        factory.setConcurrency(containerConfigProperties.getConcurrency());
        factory.getContainerProperties().setConsumerRebalanceListener(simpleConsumerRebalanceListener());

        // update kafka consumer properties. Default is taken from the config file
        iamIdentity.ifPresent(identity -> consumerFactory.updateConfigs(addIAMIdentity(identity)));
        log.info("kafkaListenerContainerFactory");

        return factory;
    }

CodePudding user response:

Exactly which properties are you changing? The child containers are indeed recreated when stopping/starting the parent container so any ContainerProperties changes will be picked up.

If you are talking about kafka consumer properties, you either need to reconfigure the consumer factory, or set the changed properties via the ContainerProperties.kafkaConsumerProperties to override the consumer factory settings.

EDIT

Something like this might work:

@Bean
@RefreshScope
Object containerReconfigurer(KafkaListenerEndpointRegistry registry) {
    registry.getListenerContainers().forEach(container -> {
        container.stop();
        // reconfigure container
        container.start();
    });
    return null;
}
  • Related