I have to set up Kafka consumers without using annotations. I have something like this:
public class MyCustomListener<V> implements MessageListener<String, V> {
private final String topicName;
private final Class<V> recordType;
private final String recordTypeName;
private final KafkaProperties props;
private final Logger logger;
private final ConcurrentMessageListenerContainer<String, V> listenerContainer;
public MyCustomListener(
@NotNull String topicName,
@NotNull Class<V> recordType,
@NotNull KafkaProperties props,
Logger logger
) {
this.topicName = topicName;
this.recordType = recordType;
this.recordTypeName = recordType.getSimpleName();
this.props = props;
this.logger = logger;
this.listenerContainer = setup();
}
protected ConcurrentMessageListenerContainer<String, V> setup() {
logger.info("Setting up Kafka listener container for topic {}/{}", topicName, recordTypeName);
var consumerFactory =
new DefaultKafkaConsumerFactory<>(
new HashMap<>() {{
put(BOOTSTRAP_SERVERS_CONFIG, props.getBootstrapServers());
put(GROUP_ID_CONFIG, props.getConsumer().getGroupId());
}},
new StringDeserializer(),
new MyDataDeserializer<>()
);
var containerProps = new ContainerProperties(topicName);
containerProps.setMessageListener(this);
var container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProps);
container.start();
return container;
}
@Override
public final void onMessage(ConsumerRecord<String, V> record) {
logger.info(
"(#{}) Received {} message in Kafka: ({})",
Thread.currentThread().getId(),
recordTypeName,
record.key()
);
# Actual processing of record code here ...
}
}
The above works for the most part.
How do I associate/provide
RetryTopicConfiguration
to the above setup? So that I am able to publish to retry topic and DLT topics and backoff as specified in the retry config.
I am fine with just the retry topic config part being a Bean
, say:
public RetryTopicConfiguration myRetryTopicConfig(
KafkaProperties props,
KafkaTemplate<String, MyData> template,
...
) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(..)
.useSingleTopicForFixedDelays()
...
.create(template);
}
CodePudding user response:
Such usage of RetryTopicConfiguration with manually created listeners / containers without @KafkaListener
annotations is currently not supported:
Currently, the non-blocking retries mechanism is built around @KafkaListener; you would have to do a lot of manual wiring to set it up with manually created container(s). We hope to make it easier for such use cases in future, perhaps in 3.0.
You can upvote or comment this issue in the project's repository to show your interest - the 3.0
is due later this year.