Home > Software engineering >  How to configure 2 RetryTopicConfiguration with individual DLT Handlers
How to configure 2 RetryTopicConfiguration with individual DLT Handlers

Time:11-11

I am trying to use the RetryTopicConfiguration in my project. I have to create two RetryTopicConfiguration with individual topics.

public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(3000)
            .dltHandlerMethod(XYZConsumer.class, "processDltForXYZ")
            .maxAttempts(5)
            .includeTopics("my-xyz-topic", "my-other-xyz-topic")
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1000, 2, 5000)
            .maxAttempts(4)
            .dltHandlerMethod(ABCConsumer.class, "processDltForABC")
            .excludeTopics("my-abc-topic", "my-other-abc-topic")
            .retryOn(MyException.class)
            .create(template);
}

But all failed messages are reaching to processDltForABC method.

public void processDltForXYZ(@Header(KafkaHeaders.ORIGINAL_TOPIC) String topic, Object obj){

}

public void processDltForABC(@Header(KafkaHeaders.ORIGINAL_TOPIC) String topic, Object obj){

}

I try to use only include topics in both places still it dint work. How to have two retryTopicconfiguration with individual dlt listeners.

CodePudding user response:

This works perfectly fine for me.

@SpringBootApplication
public class So69913673Application {

    public static void main(String[] args) {
        SpringApplication.run(So69913673Application.class, args);
    }

    @Bean
    public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, String> template) {
        return RetryTopicConfigurationBuilder
                .newInstance()
                .fixedBackOff(3000)
                .dltHandlerMethod(Listener1.class, "dlt")
                .maxAttempts(5)
                .includeTopic("XYZ")
                .create(template);
    }

    @Bean
    public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, String> template) {
        return RetryTopicConfigurationBuilder
                .newInstance()
                .exponentialBackoff(1000, 2, 5000)
                .maxAttempts(4)
                .dltHandlerMethod(Listener2.class, "dlt")
                .excludeTopic("XYZ")
                .create(template);
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("XYZ", "to-XYZ");
            template.send("ABC", "to-ABC");
        };
    }

}

@Component
class Listener1 {

    @KafkaListener(id = "xyz", topics = "XYZ")
    void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("testXYZ");
    }

    public void dlt(String in) {
        System.out.println("DLT1:"   in);
    }

}

@Component
class Listener2 {

    @KafkaListener(id = "abc", topics = "ABC")
    void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("testABC");
    }

    public void dlt(String in) {
        System.out.println("DLT2:"   in);
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
...
to-XYZ
[2m2021-11-10 09:34:24.068[0;39m [32m INFO[0;39m [35m18798[0;39m [2m---[0;39m [2m[z-retry-2-0-C-1][0;39m [36mo.a.k.clients.consumer.KafkaConsumer    [0;39m [2m:[0;39m [Consumer clientId=consumer-xyz-retry-2-10, groupId=xyz-retry-2] Seeking to offset 0 for partition XYZ-retry-2-0
[2m2021-11-10 09:34:24.068[0;39m [33m WARN[0;39m [35m18798[0;39m [2m---[0;39m [2m[z-retry-2-0-C-1][0;39m [36messageListenerContainer$ListenerConsumer[0;39m [2m:[0;39m Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic XYZ-retry-2 is not ready for consumption, backing off for approx. 2495 millis.
to-ABC
to-XYZ
DLT2:to-ABC
[2m2021-11-10 09:34:27.102[0;39m [32m INFO[0;39m [35m18798[0;39m [2m---[0;39m [2m[z-retry-3-0-C-1][0;39m [36mo.a.k.clients.consumer.KafkaConsumer    [0;39m [2m:[0;39m [Consumer clientId=consumer-xyz-retry-3-11, groupId=xyz-retry-3] Seeking to offset 0 for partition XYZ-retry-3-0
[2m2021-11-10 09:34:27.102[0;39m [33m WARN[0;39m [35m18798[0;39m [2m---[0;39m [2m[z-retry-3-0-C-1][0;39m [36messageListenerContainer$ListenerConsumer[0;39m [2m:[0;39m Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic XYZ-retry-3 is not ready for consumption, backing off for approx. 2495 millis.
to-XYZ
DLT1:to-XYZ

Is the first configuration missing a @Bean annotation?

CodePudding user response:

Besides Gary Russell's points, in the example you provided you're including one pair of topics in one Configuration bean ("my-xyz-topic", "my-other-xyz-topic") and excluding a different pair in the other ("my-abc-topic", "my-other-abc-topic").

If you do that both configurations will be eligible to handle the "my-xyz-topic" and "my-other-xyz-topic" topics, and that might lead to the behavior you're describing where the wrong DLT handler is being used.

Also, you might want to check if the rest of the configuration is being assigned properly e.g. fixed vs exponential back off.

  • Related