question :Does springboot always require to create a bean of type KafkaTemplate? Details/stacktrace/codebase below, please tell me what i am doing incorrect . Thank you
- I have been posting messages to a topic from a spring boot project
- In order to create callback mechanisms i have used org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord<K, V>, Callback) in order to send the message and also create a call back
- the reason i did this way is because listenablefuture when using KafkaTemplate only provides exception on failures( and i wanted to register callbacks as a separate reusable class across all my usecases)
- however, spring fails to start up when i dont define a bean of type KafkaTemplate with the below error
Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaTemplate' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]: Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {} at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:800) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:541) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1352) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1195) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:582) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1380) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.DefaultListableBeanFactory$DependencyObjectProvider.getIfUnique(DefaultListableBeanFactory.java:2063) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration.(KafkaAnnotationDrivenConfiguration.java:90) ~[spring-boot-autoconfigure-2.4.12.jar:2.4.12] at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:na] at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:na] at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:na] at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) ~[na:na] at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:211) ~[spring-beans-5.3.12.jar:5.3.12] ... 22 common frames omitted Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {} at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1790) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1346) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1300) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:887) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:791) ~[spring-beans-5.3.12.jar:5.3.12] ... 40 common frames omitted
My Kafka COnfig is below
@Configuration
public class KafkaEventConfig {
private final KafkaProperties kafkaProperties;
@Value("${client.id}")
private String clientId;
@Value("${topic.movie.name}")
private String movieTopicName;
@Value("${retry.backoff.ms}")
private int retryBackoffMilliseconds;
@Value("${request.timeout.ms}")
private int requestTimeoutMilliseconds;
public KafkaEventConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ProducerFactory<String, Movie> producerFactory() {
Map<String, Object> props = kafkaProperties.buildProducerProperties();
populateCommonProperties(props);
return new DefaultKafkaProducerFactory<>(props);
}
private void populateCommonProperties(Map<String, Object> props) {
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMilliseconds);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMilliseconds);
}
@Bean
public KafkaProducer<String, Movie> movieKafkaProducer() {
return new KafkaProducer<String, Movie>(producerFactory().getConfigurationProperties());
}
@Bean
public KafkaProducerMonitor kafkaProducerMonitor(KafkaProducer<String, Movie> kafkaProducer,
MeterRegistry registry) {
return new KafkaProducerMonitor(kafkaProducer, registry, Tags.of("topic", movieTopicName));
}
My Kafka Callback is below
@Slf4j
public class KafkaProducerCallBack<K, V> implements Callback {
private ProducerRecord<K, V> producerRecord;
public KafkaProducerCallBack(ProducerRecord<K, V> producerRecord) {
this.producerRecord = producerRecord;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
String topicName= metadata.topic();
long offset= metadata.offset();
if (exception != null) {
log.error("Failed to produce message [{}] to topic {} with exception {}", producerRecord, topicName, exception);
}
else {
log.info("Sucessfully published message [{}] to topic {} to offset {}", producerRecord, topicName , offset);
}
}
}
I publish messages like so
movieKafkaProducer.send(message, new KafkaProducerCallBack<String, Movie>(message));
Please note the moment i add the below lines in KafkaEventConfig everything works fine
@Bean
public KafkaTemplate<String, Movie> movieKafkaTemplate() {
return new KafkaTemplate<String, Movie>(producerFactory());
}
CodePudding user response:
Taking a closer look at your exception stacktrace reveals the issue.
Error creating bean with name 'kafkaTemplate' defined in class path resource
[org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]:
Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
The error comes from not being able to apply the Kafka based auto-configuration in Spring Boot. The class KafkaAutoConfiguration
expects and configures some beans, and backs off if certain ones are found. As you are configuring some of the beans, this will partially backoff and thus fail to auto-configure the Kafka classes.
To fix you can either exclude the KafkaAutoConfiguration
. You can do this in your @SpringBootApplication
annotation, like so
@SpringBootApplication(exclude={KafkaAutoConfiguration.class}
Or you can utilize the auto-configuration and let Spring Boot do the configuration and you use the provided KafkaTemplate
or ProducerFactory
to do what you want.
The latter would simplify your own configuration. I'm know too little of the Kafka auto-configuration and your usecase to provide a more helpful code snippet, but you should be able to figure that out yourself, or just exclude the KafkaAutoConfiguration
and go with what you have now.
CodePudding user response:
Additional to the latter that @M.Deinum mentioned:
Take a look at the KafkaAutoConfiguration
class:
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
Springboot will create a KafkaTemplate
bean for you if you don't create your own. This auto-configured bean depends on ProducerFactory<Object, Object>
bean, and because you declared a ProducerFactory<String, Movie>
. As you could see the type wasn't fit, that's why you got an error.
the reason i did this way is because listenablefuture when using KafkaTemplate only provides exception on failures( and i wanted to register callbacks as a separate reusable class across all my usecases)
Your case, you can still get the advantages of using KafkaTemplate
. Instead of implementing a Callback
, you can implement your own ProducerListener<K, V>
and bind it into your KafkaTemple
. E.g:
FullLoggingProducerListener.class
public class FullLoggingProducerListener<K, V> implements ProducerListener<K, V> {
@Override
public void onSuccess(ProducerRecord<K, V> record, RecordMetadata recordMetadata) {
log.info("Successful!");
}
@Override
public void one rror(ProducerRecord<K, V> record, @Nullable RecordMetadata recordMetadata, Exception exception) {
log.error("Error!");
}
}
YourConfigration.class
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<String, Movie> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) {
KafkaTemplate<String, Movie> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
kafkaTemplate.setProducerListener(kafkaProducerListener);
return kafkaTemplate;
}
Now, everytime you use KafkaTemplate
to send a record, you'll see the log.