Home > Blockchain >  Does spring always require KafkaTemplate?
Does spring always require KafkaTemplate?

Time:12-16

question :Does springboot always require to create a bean of type KafkaTemplate? Details/stacktrace/codebase below, please tell me what i am doing incorrect . Thank you

  1. I have been posting messages to a topic from a spring boot project
  2. In order to create callback mechanisms i have used org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord<K, V>, Callback) in order to send the message and also create a call back
  3. the reason i did this way is because listenablefuture when using KafkaTemplate only provides exception on failures( and i wanted to register callbacks as a separate reusable class across all my usecases)
  4. however, spring fails to start up when i dont define a bean of type KafkaTemplate with the below error
    Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaTemplate' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]: Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
        at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:800) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:541) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1352) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1195) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:582) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1380) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.beans.factory.support.DefaultListableBeanFactory$DependencyObjectProvider.getIfUnique(DefaultListableBeanFactory.java:2063) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration.(KafkaAnnotationDrivenConfiguration.java:90) ~[spring-boot-autoconfigure-2.4.12.jar:2.4.12]
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:na]
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:na]
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:na]
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) ~[na:na]
        at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:211) ~[spring-beans-5.3.12.jar:5.3.12]
        ... 22 common frames omitted
    Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1790) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1346) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1300) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:887) ~[spring-beans-5.3.12.jar:5.3.12]
        at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:791) ~[spring-beans-5.3.12.jar:5.3.12]
        ... 40 common frames omitted

My Kafka COnfig is below

  @Configuration
public class KafkaEventConfig {

    private final KafkaProperties kafkaProperties;

    @Value("${client.id}")
    private String clientId;


    @Value("${topic.movie.name}")
    private String movieTopicName;
    
    @Value("${retry.backoff.ms}")
    private int retryBackoffMilliseconds;

    @Value("${request.timeout.ms}")
    private int requestTimeoutMilliseconds;

    public KafkaEventConfig(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public ProducerFactory<String, Movie> producerFactory() {
        Map<String, Object> props = kafkaProperties.buildProducerProperties();
        populateCommonProperties(props);
        return new DefaultKafkaProducerFactory<>(props);
    }

    private void populateCommonProperties(Map<String, Object> props) {
        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMilliseconds);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMilliseconds);
    }
    
    @Bean
    public KafkaProducer<String, Movie> movieKafkaProducer() {
        return new KafkaProducer<String, Movie>(producerFactory().getConfigurationProperties());
    }

    @Bean
    public KafkaProducerMonitor kafkaProducerMonitor(KafkaProducer<String, Movie> kafkaProducer,
            MeterRegistry registry) {
        return new KafkaProducerMonitor(kafkaProducer, registry, Tags.of("topic", movieTopicName));
    }

My Kafka Callback is below

       @Slf4j 
public class KafkaProducerCallBack<K, V> implements Callback {

    private ProducerRecord<K, V> producerRecord;

    public KafkaProducerCallBack(ProducerRecord<K, V> producerRecord) {

        this.producerRecord = producerRecord;
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        String topicName= metadata.topic();
        long offset= metadata.offset();
        
        if (exception != null) {

            log.error("Failed to produce message [{}] to topic {} with exception {}", producerRecord, topicName, exception);
        }

        else {

            log.info("Sucessfully published message [{}] to topic {} to offset {}", producerRecord, topicName , offset);
            
        }

    }

}

I publish messages like so

movieKafkaProducer.send(message, new KafkaProducerCallBack<String, Movie>(message));

Please note the moment i add the below lines in KafkaEventConfig everything works fine

@Bean
    public KafkaTemplate<String, Movie> movieKafkaTemplate() {
        return new KafkaTemplate<String, Movie>(producerFactory());
    }

CodePudding user response:

Taking a closer look at your exception stacktrace reveals the issue.

Error creating bean with name 'kafkaTemplate' defined in class path resource
 [org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]: 
Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}

The error comes from not being able to apply the Kafka based auto-configuration in Spring Boot. The class KafkaAutoConfiguration expects and configures some beans, and backs off if certain ones are found. As you are configuring some of the beans, this will partially backoff and thus fail to auto-configure the Kafka classes.

To fix you can either exclude the KafkaAutoConfiguration. You can do this in your @SpringBootApplication annotation, like so

@SpringBootApplication(exclude={KafkaAutoConfiguration.class}

Or you can utilize the auto-configuration and let Spring Boot do the configuration and you use the provided KafkaTemplate or ProducerFactory to do what you want.

The latter would simplify your own configuration. I'm know too little of the Kafka auto-configuration and your usecase to provide a more helpful code snippet, but you should be able to figure that out yourself, or just exclude the KafkaAutoConfiguration and go with what you have now.

CodePudding user response:

Additional to the latter that @M.Deinum mentioned:

Take a look at the KafkaAutoConfiguration class:

    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
            ProducerListener<Object, Object> kafkaProducerListener,
            ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

Springboot will create a KafkaTemplate bean for you if you don't create your own. This auto-configured bean depends on ProducerFactory<Object, Object> bean, and because you declared a ProducerFactory<String, Movie>. As you could see the type wasn't fit, that's why you got an error.


the reason i did this way is because listenablefuture when using KafkaTemplate only provides exception on failures( and i wanted to register callbacks as a separate reusable class across all my usecases)

Your case, you can still get the advantages of using KafkaTemplate. Instead of implementing a Callback, you can implement your own ProducerListener<K, V> and bind it into your KafkaTemple. E.g:

FullLoggingProducerListener.class

public class FullLoggingProducerListener<K, V> implements ProducerListener<K, V> {
    @Override
    public void onSuccess(ProducerRecord<K, V> record, RecordMetadata recordMetadata) {
        log.info("Successful!");
    }

    @Override
    public void one rror(ProducerRecord<K, V> record, @Nullable RecordMetadata recordMetadata, Exception exception) {
        log.error("Error!");
    }
}

YourConfigration.class

    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<String, Movie> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) {
        KafkaTemplate<String, Movie> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        return kafkaTemplate;
    }

Now, everytime you use KafkaTemplate to send a record, you'll see the log.

  • Related