Home > Mobile >  How to know if there is message ready to consume
How to know if there is message ready to consume


I am new to Kafka and looking for a way to know if the message is ready for consumption to the consumer before calling consume method.

I am doing the POC on integrating C# with Kafka, previously I did that for RabbitMQ which has a method "MessageCount", but for Kafka, I cannot find any.

CodePudding user response:

Actually Kafka has an infinite loop, in which it calls the poll() function to get eventual new records from a partition.

The configuration : max.poll.intervall.ms, specifies the interval of time after which, if the poll() function is not called, the consumer is considered dead and a rebalance is operated.

So to answer your question, Kafka always calls the poll() function to check if a message is available to be consummed. However, there some consumer configurations that allow to wait for a minimum size of messages before consumming the message:

  • fetch.min.bytes : you will wait untill you have x bytes of messages to consume them
  • fetch.max.wait.ms : set how much time you are gonna wait for the fetch.min.bytes to be gathered

CodePudding user response:

In theory, if you can view if messages exist you are already using processes to connect to kafka. So you might as well just do a try catch with consume with the same performance.

CodePudding user response:

    Properties props = new Properties();
    props.add("bootstrap.servers", "broker1:9092,broker2:9092");
    props.add("group.id", "CountryCounter");

    KafkaConsumer<String, String> consumer =
        new KafkaConsumer<String, String>(props);

    //Subscribing to Topics

    //The Poll Loop
        while (true)
            ConsumerRecords<String, String> records = consumer.poll(100);
            foreach (ConsumerRecord<String, String> record in records)
                Console.WriteLine(@"topic = {0}, partition = {1}, offset = {2},
                    customer = {3}, country = {4}",
                    record.topic(), record.partition(), record.offset(),
                    record.key(), record.value());

                int updatedCount = 1;
                if (custCountryMap.countainsKey(record.value()))
                    updatedCount = custCountryMap.get(record.value())   1;
                custCountryMap.add(record.value(), updatedCount)

                JSONObject json = new JSONObject(custCountryMap);
  • Related