I'm trying to handle poison pill scenario with spring-kafka.
currently I'm handling this with below approach, here the failed messages getting pushed to a different topic named <original-topic>.DLT
.
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
So instead of pushing failed message to <original-topic>.DLT
topic, I want to get it and push it in DB directly.
I tried to get the failed message but no success. can anybody help here. TIA.
CodePudding user response:
Simply implement your own ConsumerRecordRecoverer
and use it in the error handler instead of the DeadLetterPublishingRecoverer
.
/**
* A {@link BiConsumer} extension for recovering consumer records.
*
* @author Gary Russell
* @since 2.3
*
*/
@FunctionalInterface
public interface ConsumerRecordRecoverer extends BiConsumer<ConsumerRecord<?, ?>, Exception> {
}