Currently I have this code for creating a ConcurrentKafkaListenerContainerFactory
.
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps()));
factory.setReplyTemplate(kafkaTemplate());
return factory;
}
How do I create the DefaultKafkaConsumerFactory
object with properties from application.yaml
? I don't want to set them inside the consumerProps()
method.
EDIT: My KafkaConfig class now looks like this.
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ReplyingKafkaTemplate<Object, KafkaExampleRecord, KafkaExampleRecord> replyingKafkaTemplate(ProducerFactory<Object, KafkaExampleRecord> producerFactory,
ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> repliesContainer) {
return new ReplyingKafkaTemplate<>(producerFactory, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> repliesContainer(
ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> kafkaListenerContainerFactory) {
ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> rc =
kafkaListenerContainerFactory.createContainer("mytopic");
rc.setAutoStartup(false);
return rc;
}
@Bean
public KafkaTemplate<Object, KafkaExampleRecord> kafkaTemplate(ProducerFactory<Object, KafkaExampleRecord> pf) {
return new KafkaTemplate<>(pf);
}
}
However now I get java.lang.IllegalStateException: a KafkaTemplate is required to support replies
.
CodePudding user response:
Spring Boot automatically configures one; simply add it as a parameter to the factory method:
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> kafkaListenerContainerFactory(
ConsumerFactory<Object, KafkaExampleRecord> cf) {
...
However, you don't need to declare the container factory bean either; Boot will auto configure it for you and automatically set the KT, if it is defined as a bean.
See Boot's KafkaAnnotationDrivenConfiguration
.