I am trying to find a way to use the new DefaultErrorHandler instead of deprecated SeekToCurrentErrorHandler in spring-kafka 2.8.1, in order to override the retry default behavior in case of errors. I want to "stop" the retry process, so if an error occurs, no retry should be done.
Now I have, in a config class, the following bean that works as expected:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new **SeekToCurrentErrorHandler(new FixedBackOff(0L, 1L)**));
factory.setConsumerFactory(requestConsumerFactory());
factory.setReplyTemplate(kafkaTemplate());
return factory;
}
Since in this spring kafka version, the STCEH is deprecated, I tried to do the following, in the same config class:
@Bean
public DefaultErrorHandler eh() {
return new DefaultErrorHandler(new FixedBackOff(0, 1));
}
But it seems that it is not working. In case of error, the retry number is the default one, as I can see in logs:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR DefaultErrorHandler - Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for topicX
How should this DefaultErrorHandler be used in order to achieve the desired behavior? Or should I use something else?
Thx in advance!
CodePudding user response:
factory.setCommonErrorHandler(new Default....)
Boot auto configuration of a CommonErrorHandler
bean requires Boot 2.6.
https://github.com/spring-projects/spring-boot/commit/c3583a4b06cff3f53b3322cd79f2b64d17211d0e
CodePudding user response:
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(0L, 1L))); Actually, it will retry a delivery up to 1 time (2 delivery attempts). (https://docs.spring.io/spring-kafka/docs/2.7.8/reference/html/#seek-to-current)
The default number of retries is **9(( (FixedBackOff(0L, 9L)) instead of 1 (https://docs.spring.io/spring-kafka/docs/2.8.1/reference/html/#default-eh)
You should try
setCommonErrorHandler
instead ofsetErrorHandler
likefactory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0L, 0L));