Home > Net >  Springboot Kafka @Listener consumer pause/resume not working
Springboot Kafka @Listener consumer pause/resume not working

Time:07-08

I have a springboot Kafka Consumer & Producer. The consumer is expected to read data from topic 1 by 1, process(time consuming) it & write it to another topic and then manually commit the offset.

In order to avoid rebalancing, I have tried to call pause() and resume() on KafkaContainer but the consumer is always running & never responds to pause() call, tried it even with a while loop and faced no success(unable to pause the consumer). KafkaListenerEndpointRegistry is Autowired. Springboot version = 2.6.9, spring-kafka version = 2.8.7

@KafkaListener(id = "c1", topics = "${app.topics.topic1}", containerFactory = "listenerContainerFactory1")
public void poll(ConsumerRecord<String, String> record, Acknowledgment ack) {
    log.info("Received Message by consumer of topic1: "   value);
    String result = process(record.value());
    producer.sendMessage(result   " topic2");
    log.info("Message sent from "   topicIn   " to "   topicOut);
    ack.acknowledge();
    log.info("Offset committed by consumer 1");
}

private String process(String value) {
    try {
        pauseConsumer();
        // Perform time intensive network IO operations
        resumeConsumer();
    } catch (InterruptedException e) {
        log.error(e.getMessage());
    }
    return value;
}

private void pauseConsumer() throws InterruptedException {
    if (registry.getListenerContainer("c1").isRunning()) {
        log.info("Attempting to pause consumer");
        Objects.requireNonNull(registry.getListenerContainer("c1")).pause();
        Thread.sleep(5000);
        log.info("kafkalistener container state - "   registry.getListenerContainer("c1").isRunning());
    }
}

private void resumeConsumer() throws InterruptedException {
    if (registry.getListenerContainer("c1").isContainerPaused() || registry.getListenerContainer("c1").isPauseRequested()) {
        log.info("Attempting to resume consumer");
        Objects.requireNonNull(registry.getListenerContainer("c1")).resume();
        Thread.sleep(5000);
        log.info("kafkalistener container state - "   registry.getListenerContainer("c1").isRunning());
    }
}

Am I missing something? Could someone please guide me with the right way of achieving the required behaviour?

CodePudding user response:

You can try like this. This work for me

public class kafkaConsumer {
    public void run(String topicName) {
        try {
            Consumer<String, String> consumer = new KafkaConsumer<>(config);
            consumer.subscribe(Collections.singleton(topicName));
                while (true) {
                    try {
                        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(80000));
                        for (TopicPartition partition : consumerRecords.partitions()) {
                            List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(partition);
                            for (ConsumerRecord<String, String> record : partitionRecords) {

                                kafkaEvent = record.value();

                                consumer.pause(consumer.assignment());

                /** Implement Your Business Logic Here **/

                Once your processing done

                consumer.resume(consumer.assignment());

                                try {
                                    consumer.commitSync();
                                } catch (CommitFailedException e) {
                                }
                            }
                        }
                    } catch (Exception e) {
                        continue;
                    }
                }

        } catch (Exception e) {
        }
    }

CodePudding user response:

You are running the process() method on the listener thread so pause/resume will not have any effect; the pause only takes place when the listener thread exits the listener method (and after it has processed all the records received by the previous poll).

The next version (2.9), due later this month, has a new property pauseImmediate, which causes the pause to take effect after the current record is processed.

  • Related