There is spring-boot application with kafka dependecy, there are two Kafka topics and need to read messages from them
tacocloud.orders.topic
tacocloud.tacos.topic
And already successful sent messages in it
Configured kafka config for listen this topics like this
package com.example.tacocloud.config;
import com.example.tacocloud.model.jpa.Order;
import com.example.tacocloud.model.jpa.Taco;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
@EnableKafka
@EnableConfigurationProperties
public class KafkaConfig {
// Order
@Bean
@ConfigurationProperties("spring.kafka.consumer.order")
public Map<String, Object> orderConsumerConfig() {
return new HashMap<>();
}
@Bean
public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig")
Map<String, Object> orderConsumerConfig) {
return new DefaultKafkaConsumerFactory<>(orderConsumerConfig);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> order1KafkaMessageListenerContainer(
@Qualifier("orderConsumerConfig") Map<String, Object> orderConsumerConfig,
@Qualifier("orderConsumerFactory") DefaultKafkaConsumerFactory orderConsumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(orderConsumerFactory);
return factory;
}
// Taco
@Bean
@ConfigurationProperties("spring.kafka.consumer.taco")
public Map<String, Object> tacoConsumerConfig() {
return new HashMap<>();
}
@Bean
public DefaultKafkaConsumerFactory tacoConsumerFactory(
@Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig) {
return new DefaultKafkaConsumerFactory<>(tacoConsumerConfig);
}
@Bean
public ConcurrentKafkaListenerContainerFactory tacoConcurrentMessageListenerContainer(
@Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig,
@Qualifier("tacoConsumerFactory") DefaultKafkaConsumerFactory tacoConsumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(tacoConsumerFactory);
return factory;
}
}
So, there are two DefaultKafkaConsumerFactory and two ConcurrentKafkaListenerContainerFactory Aften that, created a service with @KafkaListener log messages:
package com.example.tacocloud.service;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@EnableKafka
public class KafkaService {
@KafkaListener(topics = "tacocloud.orders.topic", groupId = "one")
public void order() {
System.out.println("Order");
}
@KafkaListener(topics ="tacocloud.tacos.topic", groupId = "two")
public void taco() {
System.out.println("Taco");
}
}
application.yml file
spring:
kafka:
consumer:
order:
topic: tacocloud.orders.topic
"[bootstrap.servers]": localhost:29888
"[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
"[value.deserializer]": com.example.tacocloud.model.serialization.OrderDeserializer
template:
"[default.topic]": tacocloud.orders.topic
taco:
topic: tacocloud.tacos.topic
"[bootstrap.servers]": localhost:29888
"[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
"[value.deserializer]": com.example.tacocloud.model.serialization.TacoDeserializer
template:
"[default.topic]": tacocloud.tacos.topic
But, when I start my spring-boot application, I see error message(((
2022-04-15 21:38:35.918 ERROR 13888 --- [ restartedMain] o.s.boot.SpringApplication : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.16.jar:5.3.16] at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na] at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:740) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:415) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) ~[spring-boot-2.6.4.jar:2.6.4] at com.example.tacocloud.TacoCloudApplication.main(TacoCloudApplication.java:10) ~[classes/:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na] at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na] at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-2.6.4.jar:2.6.4] Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:493) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:640) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:665) ~[kafka-clients-2.8.0.jar:na] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:416) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:384) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:360) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:327) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:304) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:758) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:344) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:442) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:209) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:442) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:331) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:276) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.16.jar:5.3.16] ... 19 common frames omitted
Process finished with exit code 0
CodePudding user response:
Thank you for a sample.
So, I opened it locally and placed a break point into this bean definition:
@Bean
public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig")
Map<String, Object> orderConsumerConfig) {
return new DefaultKafkaConsumerFactory<Object, Order>(orderConsumerConfig);
}
That orderConsumerConfig
map looks like this:
orderConsumerConfig = {LinkedHashMap@8587} size = 1
"orderConsumerConfig" -> {HashMap@8600} size = 5
key = "orderConsumerConfig"
value = {HashMap@8600} size = 5
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
"template" -> {LinkedHashMap@8611} size = 1
"topic" -> "tacocloud.orders.topic"
"bootstrap.servers" -> "localhost:29888"
"value.deserializer" -> "sample.kafka.serializer.OrderDeserializer"
so, that's indeed not a surprise that your KafkaConsumer
is not initialized properly. Your target map config is hidden under that orderConsumerConfig
entry of this injected map.
Would you mind to share with me from where did you take an idea of the @ConfigurationProperties
on the Map
bean? And how that supposed to be used as dependency injection?