I would like to set authExceptionRetryInterval
to some value so that my kafka listener is not destroyed on random auth failures.
I have a simple listener class :
@Component
class Listener {
@KafkaListener(topics = "${topic}")
void consume(ConsumerRecord<String, Message> record) {
System.out.println(record);
}
}
And configuration which contains instance of ContainerCustomizer
:
@Bean
public ContainerCustomizer<String, Message, AbstractMessageListenerContainer<String, Message>> containerCustomizer() {
return container -> {
container.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofMillis(500));
};
}
Spring-kafka version is 2.8.8.
However the problem is that this customizer is not even called (even though it is created in the context) and the auth exception timeout is not being set.
So the question is how to use ContainerCustomizer
to customize needed properties or if this class is not suitable for it - how to set authExceptionRetryInterval
in different way?
CodePudding user response:
Boot does not automatically detect the customizer bean, you have to wire it into the container factory yourself.
@Bean
ContainerCustomizer<String, Message, ConcurrentMessageListenerContainer<String, Message>> containerCustomizer(
ConcurrentKafkaListenerContainerFactory<String, Message> factory) {
ContainerCustomizer<String, Message, ConcurrentMessageListenerContainer<String, Message>> cust = container -> {
container.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofMillis(500));
};
factory.setContainerCustomizer(cust);
return cust;
}