Home > front end >  spring-kafka: How to provide RetryTopicConfiguration to ConcurrentMessageListenerContainer?
spring-kafka: How to provide RetryTopicConfiguration to ConcurrentMessageListenerContainer?

Time:08-06

I have to set up Kafka consumers without using annotations. I have something like this:

public class MyCustomListener<V> implements MessageListener<String, V> {
  private final String                          topicName;
  private final Class<V>                        recordType;
  private final String                          recordTypeName;
  private final KafkaProperties                 props;
  private final Logger logger;
  private final ConcurrentMessageListenerContainer<String, V> listenerContainer;

  public MyCustomListener(
    @NotNull String topicName,
    @NotNull Class<V> recordType,
    @NotNull KafkaProperties props,
    Logger logger
  ) {
    this.topicName = topicName;
    this.recordType = recordType;
    this.recordTypeName = recordType.getSimpleName();
    this.props = props;
    this.logger = logger;
    this.listenerContainer = setup();
  }

  protected ConcurrentMessageListenerContainer<String, V> setup() {
    logger.info("Setting up Kafka listener container for topic {}/{}", topicName, recordTypeName);

    var consumerFactory =
      new DefaultKafkaConsumerFactory<>(
        new HashMap<>() {{
          put(BOOTSTRAP_SERVERS_CONFIG, props.getBootstrapServers());
          put(GROUP_ID_CONFIG, props.getConsumer().getGroupId());
        }},
        new StringDeserializer(),
        new MyDataDeserializer<>()
      );

    var containerProps = new ContainerProperties(topicName);
    containerProps.setMessageListener(this);

    var container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProps);
    container.start();
    return container;
  }

  @Override
  public final void onMessage(ConsumerRecord<String, V> record) {
    logger.info(
      "(#{}) Received {} message in Kafka: ({})",
      Thread.currentThread().getId(),
      recordTypeName,
      record.key()
    );

    # Actual processing of record code here ...
  }
}


The above works for the most part.

How do I associate/provide RetryTopicConfiguration to the above setup? So that I am able to publish to retry topic and DLT topics and backoff as specified in the retry config.

I am fine with just the retry topic config part being a Bean, say:

public RetryTopicConfiguration myRetryTopicConfig(
    KafkaProperties props,
    KafkaTemplate<String, MyData> template,
    ...
  ) {
  return RetryTopicConfigurationBuilder
    .newInstance()
    .fixedBackOff(..)
    .useSingleTopicForFixedDelays()
    ...
    .create(template);
}

CodePudding user response:

Such usage of RetryTopicConfiguration with manually created listeners / containers without @KafkaListener annotations is currently not supported:

Currently, the non-blocking retries mechanism is built around @KafkaListener; you would have to do a lot of manual wiring to set it up with manually created container(s). We hope to make it easier for such use cases in future, perhaps in 3.0.

You can upvote or comment this issue in the project's repository to show your interest - the 3.0 is due later this year.

  • Related