We have developed live reloading of config properties with spring boot applications. I have a spring-kafka consumer and I wanted to leverage the live reloading where if I change the consumer property I should be able to start the container without rebooting the application. I used:
KafkaListenerEndpointRegistry.stop()
KafkaListenerEndpointRegistry.start()
I thought the above actually creates a new container but that is not the case. So I wanted to find out if I have to start a container with new config properties how do I do that
@Bean
@ConfigurationProperties(prefix = "container.config.properties")
@ConditionalOnMissingBean
@RefreshScope
ContainerConfigProperties containerConfigProperties() {
return new ContainerConfigProperties();
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnBean(value = {ContainerConfigProperties.class})
@RefreshScope
<K, V> ConcurrentKafkaListenerContainerFactory<K, ValueDeserializerContainer<V>> kafkaListenerContainerFactory(final ConsumerFactory<K, ValueDeserializerContainer<V>> consumerFactory,
final ContainerConfigProperties containerConfigProperties,
final Optional<IAMIdentity> iamIdentity) {
val factory = new ConcurrentKafkaListenerContainerFactory<K, ValueDeserializerContainer<V>>();
factory.setBatchListener(true);
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(containerConfigProperties.getAckMode());
factory.setConcurrency(containerConfigProperties.getConcurrency());
factory.getContainerProperties().setConsumerRebalanceListener(simpleConsumerRebalanceListener());
// update kafka consumer properties. Default is taken from the config file
iamIdentity.ifPresent(identity -> consumerFactory.updateConfigs(addIAMIdentity(identity)));
log.info("kafkaListenerContainerFactory");
return factory;
}
CodePudding user response:
Exactly which properties are you changing? The child containers are indeed recreated when stopping/starting the parent container so any ContainerProperties
changes will be picked up.
If you are talking about kafka consumer properties, you either need to reconfigure the consumer factory, or set the changed properties via the ContainerProperties.kafkaConsumerProperties
to override the consumer factory settings.
EDIT
Something like this might work:
@Bean
@RefreshScope
Object containerReconfigurer(KafkaListenerEndpointRegistry registry) {
registry.getListenerContainers().forEach(container -> {
container.stop();
// reconfigure container
container.start();
});
return null;
}