I have added the following logic to add custom header in events that are sent to retry and dlt topics.
@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
public DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver destinationTopicResolver) {
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(destinationTopicResolver);
factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> dlpr.addHeadersFunction((consumerRecord, exception) -> {
return new RecordHeaders().add(EXTERNAL_SYSTEM_EXCEPTION_MESSAGE_HEADER, exc.getMessage().getBytes(StandardCharsets.UTF_8));
));
return factory;
}
Unfortunately the following header is passed to another topic. I'll give you an example: I have 2 retry topics and dlt. The exception is thrown and the header is added at main topic, then at retry-0 I receive the event, where that header is already there and then I added it again. So at retry-1 I have 2 same headers. It ends in dlt topic, where are 3 headers with the same name. It happens, because "Apache Kafka supports multiple headers with the same name" as it is written here: https://docs.spring.io/spring-kafka/reference/html/#dlpr-headers
I think of two solutions and I'm not sure how to achieve them:
- The header can be added at each level, but before addition, it should be checked, if it exists. If yes than should be removed and then added.
- The header should be added only to DLT topic.
Do you know if any of them is possible? I use spring-kafa 2.8.8.
Thanks.
CodePudding user response:
The headers function receives the current consumer record and the exception.
However, removing the header in that function will have no effect.
Please open a new feature suggestion.