Home > database >  Adding custom header only to DLT topic, when using non-blocking retries
Adding custom header only to DLT topic, when using non-blocking retries

Time:01-04

I have added the following logic to add custom header in events that are sent to retry and dlt topics.

@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
public DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver destinationTopicResolver) {
    DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(destinationTopicResolver);
    factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> dlpr.addHeadersFunction((consumerRecord, exception) -> {
     
return new RecordHeaders().add(EXTERNAL_SYSTEM_EXCEPTION_MESSAGE_HEADER, exc.getMessage().getBytes(StandardCharsets.UTF_8));

    ));
    return factory;
}

Unfortunately the following header is passed to another topic. I'll give you an example: I have 2 retry topics and dlt. The exception is thrown and the header is added at main topic, then at retry-0 I receive the event, where that header is already there and then I added it again. So at retry-1 I have 2 same headers. It ends in dlt topic, where are 3 headers with the same name. It happens, because "Apache Kafka supports multiple headers with the same name" as it is written here: https://docs.spring.io/spring-kafka/reference/html/#dlpr-headers

I think of two solutions and I'm not sure how to achieve them:

  1. The header can be added at each level, but before addition, it should be checked, if it exists. If yes than should be removed and then added.
  2. The header should be added only to DLT topic.

Do you know if any of them is possible? I use spring-kafa 2.8.8.

Thanks.

CodePudding user response:

The headers function receives the current consumer record and the exception.

However, removing the header in that function will have no effect.

Please open a new feature suggestion.

  • Related