Can someone please help me understand why a message offset that is manually and immediately committed is re-processed by the KafkaListener when an exception occurs?
So I'm expecting the following behaviour:
- I receive an event in Kafka Listener
- I commit the offset
- An exception occurs
- I'm expecting that message not to be reprocessed because the offset was committed.
Not sure if my understanding is correct? Or does Spring rolls-back the manual Acknowledgment that we do in case of exception?
I have the following Listener code:
@KafkaListener(topics = {"${acknowledgement.topic}"}, containerFactory = "concurrentKafkaListenerContainerFactory")
public void onMessage(String message, Acknowledgment acknowledgment) throws InterruptedException {
acknowledgment.acknowledge();
throw new Exception1();
}
And the concurrentKafkaListenerContainerFactory code is:
@Bean
public ConsumerFactory<String, String> consumerFactory() {
kafkaProperties.getConsumer().setEnableAutoCommit(false);
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return concurrentKafkaListenerContainerFactory;
}
CodePudding user response:
Yes, the default error handler treats any exception as retryable by default, regardless of whether its offset has been committed.
You should either not throw an exception, or tell the DefaultErrorHandler
which exception(s) should not be retried.
/**
* Add exception types to the default list. By default, the following exceptions will
* not be retried:
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link ConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
* </ul>
* All others will be retried, unless {@link #defaultFalse()} has been called.
* @param exceptionTypes the exception types.
* @see #removeClassification(Class)
* @see #setClassifications(Map, boolean)
*/
public final void addNotRetryableExceptions(Class<? extends Exception>...
exceptionTypes) {