Home > database >  How to replace deprecated SeekToCurrentErrorHandler with DefaultErrorHandler (spring-kafka)?
How to replace deprecated SeekToCurrentErrorHandler with DefaultErrorHandler (spring-kafka)?

Time:02-10

I am trying to find a way to use the new DefaultErrorHandler instead of deprecated SeekToCurrentErrorHandler in spring-kafka 2.8.1, in order to override the retry default behavior in case of errors. I want to "stop" the retry process, so if an error occurs, no retry should be done.

Now I have, in a config class, the following bean that works as expected:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setErrorHandler(new **SeekToCurrentErrorHandler(new FixedBackOff(0L, 1L)**));
    factory.setConsumerFactory(requestConsumerFactory());
    factory.setReplyTemplate(kafkaTemplate());
    return factory;
}

Since in this spring kafka version, the STCEH is deprecated, I tried to do the following, in the same config class:

@Bean
public DefaultErrorHandler eh() {
    return new DefaultErrorHandler(new FixedBackOff(0, 1));
}

But it seems that it is not working. In case of error, the retry number is the default one, as I can see in logs:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR DefaultErrorHandler - Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for topicX

How should this DefaultErrorHandler be used in order to achieve the desired behavior? Or should I use something else?

Thx in advance!

CodePudding user response:

factory.setCommonErrorHandler(new Default....)

Boot auto configuration of a CommonErrorHandler bean requires Boot 2.6.

https://github.com/spring-projects/spring-boot/commit/c3583a4b06cff3f53b3322cd79f2b64d17211d0e

CodePudding user response:

  1. factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(0L, 1L))); Actually, it will retry a delivery up to 1 time (2 delivery attempts). (https://docs.spring.io/spring-kafka/docs/2.7.8/reference/html/#seek-to-current)

  2. The default number of retries is **9(( (FixedBackOff(0L, 9L)) instead of 1 (https://docs.spring.io/spring-kafka/docs/2.8.1/reference/html/#default-eh)

  3. You should try setCommonErrorHandler instead of setErrorHandler like factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0L, 0L));

  • Related