Hi I am new Spring Boot @kafkaListener
, I have below code in kafka consumer class -
@KafkaListener(topics = "#{'${app.kafka.consumer.topic}'.split(',')}",
containerFactory = "kafkaListenerContainerFactory",
groupId = "${app.kafka.consumer.group-id}")
public void receivedMessage(@Payload List<ConsumerRecord<String, String>> records,
Acknowledgment acknowledgment) {
try{
for (ConsumerRecord<String, String> record : records){
process(record.value(),record.topic(), acknowledgment);
}
acknowledge(acknowledgment);
}catch(Exception ex){
log.error("ERROR ", ex);
}
}
In the above code I am consuming message from two different topics
, I want to log topic
name in catch
block if any error
occurred.
I tried @Header(KafkaHeaders.RECEIVED_TOPIC) String topic
but I am getting error for that. How can I know topic
name from which exception
occurred.
CodePudding user response:
You already have logic to extract the topic from the record (record.topic()
).
You need to put your try/catch inside the for loop.
CodePudding user response:
You've already used ConsumerRecord#topic
You need to extract that outside the try to use in the catch e.g.
String topic;
try {
for (ConsumerRecord<String, String> record : records){
topic = record.topic();
process(record.value(), topic, acknowledgment);
}
acknowledge(acknowledgment);
} catch(Exception ex){
log.error("ERROR on {}", topic, ex);
}
Depending on how you want to process the data, you may want to put the try-catch inside the loop