Home > Software design >  Spring Kafka | How to make DeserializationException retryable?
Spring Kafka | How to make DeserializationException retryable?

Time:04-25

I have a Producer that sends messages in Avro format and a Consumer which listens to those messages.

I have also implemented non-blocking retries by using @RetryableTopic in my Consumer for handling errors.

When the Consumer is unable to deserialize a message (due to schema change or whatever reasons), it does not put that message in the -retry topic. It directly sends it to the -dlt topic instead.

I want DeserializationExceptions to be retried as well. Reason is that by the time these errors are retried, I can deploy a fix in my Consumer so that the retries could eventually succeed.

I tried the include option in @RetryableTopic but it doesn't seem to work for DeserializationException.

  @RetryableTopic(
    attempts = "${app.consumer.retry.topic.count:5}",
    backoff = @Backoff(delayExpression = "${app.consumer.retry.topic.back-off:2000}"),
    fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
    include = {DeserializationException.class}  // does not work
  )

Is it a bug in @RetryableTopic or is there another way to achieve this?

CodePudding user response:

Since Spring Kafka 2.8.3 there's a set of global fatal exceptions that, as you described, will cause the record to be forwarded to the DLT directly.

The usual pattern to handle this kind of exception is, after the fix has been deployed, have some kind of console application to retrieve the failed record from the DLT and reprocess it, perhaps by sending the record back to the first retry topic so that there's no duplicate in the main topic.

For the pattern you described, you can manage this global set of FATAL exceptions by providing a DestinationTopicResolver bean, such as:

@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
    DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(Clock.systemUTC(), applicationContext);
    ddtr.removeClassification(DeserializationException.class);
    return ddtr;
}

Please let me know if that works for you. Thanks.

CodePudding user response:

Here is how we have achieved it:

  @Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
  public DestinationTopicResolver destinationTopicResolver(ApplicationContext context) {
    DefaultDestinationTopicResolver resolver = new DefaultDestinationTopicResolver(systemUTC(), context);
    resolver.setClassifications(emptyMap(), true);
    return resolver;
  }

This way we don't have to specify every exception to be included one by one. Another solution is as suggested by Tomaz:

  @Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
  public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
    DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(systemUTC(), applicationContext);
    ddtr.removeClassification(DeserializationException.class);
    ddtr.removeClassification(ClassCastException.class);
    return ddtr;
  }
  • Related