I have spring cloud stream application with kafka binder that consumes and send messages. In application i configure custom error handler with retry policy, and add not retryable exception to handler. Configuration exaple:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
SeekToCurrentErrorHandler customErrorHandler
) {
return (((container, destinationName, group) -> {
container.setErrorHandler(customErrorHandler);
}));
}
@Bean
public SeekToCurrentErrorHandler customErrorHandler() {
var errorHandler = new SeekToCurrentErrorHandler(
(consumerRecord, e) -> log.error("Got exception skip record record: {}", consumerRecord, e),
new FixedBackOff(1000L, 10)
);
errorHandler.addNotRetryableException(App.MyCustomException.class);
return errorHandler;
}
But i see, that if exception throws, than application retry to process message 3 times. Expected behaveor - will not repeat to consume messages if App.MyCustomException.class throws. How to configure retry policy for spring cloud stream kafka binder application?
Code exaple here: github
Run test for reproduce issue.
CodePudding user response:
The customizations you provide are for the container-level error handler. Binder has a different retrying mechanism. You can add the following to your configuration to ensure that the record is not re-tried when the exception occurs.
spring.cloud.stream:
bindings:
processor-in-0:
...
consumer:
retryableExceptions:
ru.vichukano.kafka.binder.retry.App.MyCustomException: false
When I tried that, I didn't see the message being re-delivered.
Here are some explanations for this.