Home > Software engineering >  Log topic name from @kafkaListener
Log topic name from @kafkaListener

Time:10-20

Hi I am new Spring Boot @kafkaListener, I have below code in kafka consumer class -

@KafkaListener(topics = "#{'${app.kafka.consumer.topic}'.split(',')}",
        containerFactory = "kafkaListenerContainerFactory",
        groupId = "${app.kafka.consumer.group-id}")
 public void receivedMessage(@Payload List<ConsumerRecord<String, String>> records,
                            Acknowledgment acknowledgment) {


   try{
       for (ConsumerRecord<String, String> record : records){
            process(record.value(),record.topic(), acknowledgment);
       }
       acknowledge(acknowledgment);
    }catch(Exception ex){
      log.error("ERROR ", ex);
    }
}

In the above code I am consuming message from two different topics, I want to log topic name in catch block if any error occurred. I tried @Header(KafkaHeaders.RECEIVED_TOPIC) String topic but I am getting error for that. How can I know topic name from which exception occurred.

CodePudding user response:

You already have logic to extract the topic from the record (record.topic()).

You need to put your try/catch inside the for loop.

CodePudding user response:

You've already used ConsumerRecord#topic

You need to extract that outside the try to use in the catch e.g.

   String topic;
   try {
       for (ConsumerRecord<String, String> record : records){
            topic = record.topic();
            process(record.value(), topic, acknowledgment);
       }
       acknowledge(acknowledgment);
    } catch(Exception ex){
      log.error("ERROR on {}", topic, ex);
    }

Depending on how you want to process the data, you may want to put the try-catch inside the loop

  • Related