I am using Spring Cloud Stream and Kafka Binder to consume messages in batches from a Kafka Topic. I am trying to implement an error handling mechanism. As per my understanding I can't use Spring Cloud Stream's enableDLQ
property in batch mode.
I have found RecoveringBatchErrorHandler
and DeadLetterPublishingRecoverer
to retry and send failure messages from the spring-kafka documentation. But I am not able to understand how to send the records to a custom DLQ topic following the functional programming standards. All the examples I can see is using KafkaTemplates.
Are there any good example where I can find the implementation?
This is the spring doc I have been referring to.
https://docs.spring.io/spring-kafka/docs/2.5.12.RELEASE/reference/html/#recovering-batch-eh
CodePudding user response:
That version is no longer supported as OSS https://spring.io/projects/spring-kafka#support
With the current version, use the DefaultErrorHandler
configured with a DeadLetterPublishingRecoverer
and throw a BatchListenerExcecutionFailedException
to tell the framework which record in the batch failed.
See https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling and https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters and https://docs.spring.io/spring-kafka/docs/current/reference/html/#legacy-eh
Add a ListenerContainerCustomizer
bean to add your configured error handler to the listener container.