Home > Net >  Why spring-boot with Kafka failed to start?
Why spring-boot with Kafka failed to start?

Time:04-16

There is spring-boot application with kafka dependecy, there are two Kafka topics and need to read messages from them

tacocloud.orders.topic
tacocloud.tacos.topic

And already successful sent messages in it

Configured kafka config for listen this topics like this

 package com.example.tacocloud.config;

import com.example.tacocloud.model.jpa.Order;
import com.example.tacocloud.model.jpa.Taco;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
@EnableKafka
@EnableConfigurationProperties
public class KafkaConfig {

  // Order

    @Bean
    @ConfigurationProperties("spring.kafka.consumer.order")
    public Map<String, Object> orderConsumerConfig() {
        return new HashMap<>();
    }

    @Bean
    public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig")
        Map<String, Object> orderConsumerConfig) {
        return new DefaultKafkaConsumerFactory<>(orderConsumerConfig);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Order> order1KafkaMessageListenerContainer(
        @Qualifier("orderConsumerConfig") Map<String, Object> orderConsumerConfig,
        @Qualifier("orderConsumerFactory") DefaultKafkaConsumerFactory orderConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(orderConsumerFactory);
        return factory;
    }

    // Taco

    @Bean
    @ConfigurationProperties("spring.kafka.consumer.taco")
    public Map<String, Object> tacoConsumerConfig() {
        return new HashMap<>();
    }

    @Bean
    public DefaultKafkaConsumerFactory tacoConsumerFactory(
        @Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig) {
        return new DefaultKafkaConsumerFactory<>(tacoConsumerConfig);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory tacoConcurrentMessageListenerContainer(
        @Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig,
        @Qualifier("tacoConsumerFactory") DefaultKafkaConsumerFactory tacoConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(tacoConsumerFactory);
        return factory;
    }
}

So, there are two DefaultKafkaConsumerFactory and two ConcurrentKafkaListenerContainerFactory Aften that, created a service with @KafkaListener log messages:

package com.example.tacocloud.service;

import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@EnableKafka
public class KafkaService {

    @KafkaListener(topics = "tacocloud.orders.topic", groupId = "one")
    public void order() {
        System.out.println("Order");
    }

    @KafkaListener(topics ="tacocloud.tacos.topic", groupId = "two")
    public void taco() {
        System.out.println("Taco");
    }
}

application.yml file

spring:
  kafka:
    consumer:
      order:
        topic: tacocloud.orders.topic
        "[bootstrap.servers]": localhost:29888
        "[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
        "[value.deserializer]": com.example.tacocloud.model.serialization.OrderDeserializer
        template:
          "[default.topic]": tacocloud.orders.topic
      taco:
        topic: tacocloud.tacos.topic
        "[bootstrap.servers]": localhost:29888
        "[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
        "[value.deserializer]": com.example.tacocloud.model.serialization.TacoDeserializer
        template:
          "[default.topic]": tacocloud.tacos.topic

But, when I start my spring-boot application, I see error message(((

2022-04-15 21:38:35.918 ERROR 13888 --- [ restartedMain] o.s.boot.SpringApplication : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.16.jar:5.3.16] at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na] at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:740) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:415) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) ~[spring-boot-2.6.4.jar:2.6.4] at com.example.tacocloud.TacoCloudApplication.main(TacoCloudApplication.java:10) ~[classes/:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na] at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na] at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-2.6.4.jar:2.6.4] Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:493) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:640) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:665) ~[kafka-clients-2.8.0.jar:na] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:416) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:384) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:360) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:327) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:304) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:758) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:344) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:442) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:209) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:442) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:331) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:276) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.16.jar:5.3.16] ... 19 common frames omitted

Process finished with exit code 0

CodePudding user response:

Thank you for a sample.

So, I opened it locally and placed a break point into this bean definition:

@Bean
public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig")
    Map<String, Object> orderConsumerConfig) {
    return new DefaultKafkaConsumerFactory<Object, Order>(orderConsumerConfig);
}

That orderConsumerConfig map looks like this:

orderConsumerConfig = {LinkedHashMap@8587}  size = 1
 "orderConsumerConfig" -> {HashMap@8600}  size = 5
  key = "orderConsumerConfig"
  value = {HashMap@8600}  size = 5
   "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
   "template" -> {LinkedHashMap@8611}  size = 1
   "topic" -> "tacocloud.orders.topic"
   "bootstrap.servers" -> "localhost:29888"
   "value.deserializer" -> "sample.kafka.serializer.OrderDeserializer"

so, that's indeed not a surprise that your KafkaConsumer is not initialized properly. Your target map config is hidden under that orderConsumerConfig entry of this injected map.

Would you mind to share with me from where did you take an idea of the @ConfigurationProperties on the Map bean? And how that supposed to be used as dependency injection?

  • Related