I have a SpringBoot application where I am using the @KafkaListener annotation with a topic and a groupId. I have two topics my listener needs to listen for. When listening for one topic, either one works fine
@KafkaListener(topics = "topic1", groupId = "local")
or
@KafkaListener(topics = "topic2", groupId = "local")
however, when I combine the two like this (which is my understanding of how to do multiple topics on one listener):
@KafkaListener(topics = {"topic1,topic2"}, groupId = "local")
, I get scrolling exceptions like the following:
{
"app": "myApp",
"@timestamp": "2022-12-20T12:01:55.004Z",
"userId": "",
"hostname": "",
"ipAddress": "",
"logger": "org.apache.kafka.clients.Metadata",
"level": "ERROR",
"thread": "org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1",
"message": "[Consumer clientId=consumer-local-1, groupId=local] Metadata response reported invalid topics [topic1,topic2]",
"class": "org.apache.kafka.clients.Metadata",
"method": "checkInvalidTopics",
"file": "Metadata.java",
"line": 294
}
{
"app": "myApp",
"@timestamp": "2022-12-20T12:01:55.004Z",
"userId": "",
"hostname": "",
"ipAddress": "",
"logger": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
"level": "INFO",
"thread": "org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1",
"message": "[Consumer clientId=consumer-local-1, groupId=local] Rebalance failed.",
"class": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
"method": "joinGroupIfNeeded",
"file": "AbstractCoordinator.java",
"line": 470,
"stack": "org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [topic1,topic2]\n"
}
Any ideas on what I am doing wrong are greatly appreciated. Thank you
CodePudding user response:
You can listen to multiple topics in the same @KafkaListener by passing them in a comma separated object. In your case it should be
@KafkaListener(topics = {"topic1", "topic2"}, groupId = "local")
CodePudding user response:
Try this:
{"topic1", "topic2"}