I have a Producer that sends messages in Avro format and a Consumer which listens to those messages.
I have also implemented non-blocking retries by using @RetryableTopic
in my Consumer for handling errors.
When the Consumer is unable to deserialize a message (due to schema change or whatever reasons), it does not put that message in the -retry
topic. It directly sends it to the -dlt
topic instead.
I want DeserializationException
s to be retried as well. Reason is that by the time these errors are retried, I can deploy a fix in my Consumer so that the retries could eventually succeed.
I tried the include
option in @RetryableTopic
but it doesn't seem to work for DeserializationException
.
@RetryableTopic(
attempts = "${app.consumer.retry.topic.count:5}",
backoff = @Backoff(delayExpression = "${app.consumer.retry.topic.back-off:2000}"),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
include = {DeserializationException.class} // does not work
)
Is it a bug in @RetryableTopic
or is there another way to achieve this?
CodePudding user response:
Since Spring Kafka 2.8.3
there's a set of global fatal exceptions that, as you described, will cause the record to be forwarded to the DLT
directly.
The usual pattern to handle this kind of exception is, after the fix has been deployed, have some kind of console application to retrieve the failed record from the DLT
and reprocess it, perhaps by sending the record back to the first retry topic so that there's no duplicate in the main topic.
For the pattern you described, you can manage this global set of FATAL
exceptions by providing a DestinationTopicResolver
bean, such as:
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(Clock.systemUTC(), applicationContext);
ddtr.removeClassification(DeserializationException.class);
return ddtr;
}
Please let me know if that works for you. Thanks.
CodePudding user response:
Here is how we have achieved it:
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver destinationTopicResolver(ApplicationContext context) {
DefaultDestinationTopicResolver resolver = new DefaultDestinationTopicResolver(systemUTC(), context);
resolver.setClassifications(emptyMap(), true);
return resolver;
}
This way we don't have to specify every exception to be included one by one. Another solution is as suggested by Tomaz:
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(systemUTC(), applicationContext);
ddtr.removeClassification(DeserializationException.class);
ddtr.removeClassification(ClassCastException.class);
return ddtr;
}