I wrote kafka producer / consumer for my app:
Consumer config:
@EnableKafka
@Configuration
class KafkaConsumerConfig {
@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
val props: MutableMap<String, Any> = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
props[ConsumerConfig.GROUP_ID_CONFIG] = "group12345"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
return DefaultKafkaConsumerFactory(props)
}
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
return factory
}
}
Producer config:
@Configuration
class KafkaProducerConfig {
@Bean
fun producerFactory(): ProducerFactory<String, String> {
val configProps: MutableMap<String, Any> = HashMap()
configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory(configProps)
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
}
Topic config:
@Configuration
class KafkaTopicConfig {
@Bean
fun kafkaAdmin(): KafkaAdmin {
val configs: MutableMap<String, Any?> = HashMap()
configs[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
return KafkaAdmin(configs)
}
@Bean
fun topic1(): NewTopic {
return NewTopic("kafkaTest", 1, 1.toShort())
}
}
Kafka service:
@Service
class KafkaService(
private val kafkaTemplate: KafkaTemplate<String, String>
) {
fun send() {
kafkaTemplate.send("kafkaTest", "test message ${System.currentTimeMillis()}")
}
@KafkaListener(topics = ["kafkaTest"], groupId = "group12345")
fun listenGroupFoo(message: String) {
println("--> $message")
}
}
That's ALL classes in my app. When I trying to run app, I get this exception:
2021-10-11 17:20:13.319 WARN 8544 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Error connecting to node 34bcfcc207e0:9092 (id: 1001 rack: null)
java.net.UnknownHostException: 34bcfcc207e0
I have no idea, what is host 34bcfcc207e0
. It appears at start or thread.
What's wrong?
CodePudding user response:
Kafka is not an HTTP service. Remove
http://
from all your stringsIf you're running Kafka in a Container, the default advertised listener is using its hostname (the container ID), and you need to change this to use an address you expect Connect to Kafka running in Docker