Home > Back-end >  Nack about kafka consumers Acknowledgment (int index, long sleep)
Nack about kafka consumers Acknowledgment (int index, long sleep)

Time:04-07

When using springboot integration of kafka, consumers use @ KafkaListener listening mass message, and then manually submit deviation, then I now have a demand: need to consumers is the trigger specific logic kafka suspended pull 90 seconds, can be used to suspend kafka nack? Have a great god used? Note: I used KafkaListenerEndpointRegistry can call start () and stop () to manually start and stop the kafka consumption, but I don't want to manual intervention, and I found KafkaListenerEndpointRegistry inside the start () easy to trigger more balanced, kafka up slowly, I just wanted to let kafak suspended for a period of time, and then automatically to pull, simple example code is as follows:

@ KafkaListener (switchable viewer=KafkaConstants TOPIC_NAME, containerFactory="batchFactory")
Public void batchPullMessage (List Records, Acknowledgment ack) {
Try {
The info (" kafka pull the number of messages: {} ", records. The size ());
Final CountDownLatch latch=new CountDownLatch (records. The size ());//initialize the counter
For (ConsumerRecord Record: records) {
Log. The error (" news offset: {} ", record the offset ());
DemoService. Test (record. The value (), latch);//asynchronous consumption
}
Latch. Await ();//wait for all current
the child thread has been completedIf (" business logic code trigger need kafka suspended pull 90 seconds ") {
Ack. Nack (records. The size () - 1, 90 * 1000);
} else {
Ack. Acknowledge ();//submit kafka offset
}
} the catch (Exception e) {
The error (" kafka consumers exception ", e);
}
}
  • Related