Home > Mobile >  How to know if there is message ready to consume
How to know if there is message ready to consume

Time:03-25

I am new to Kafka and looking for a way to know if the message is ready for consumption to the consumer before calling consume method.

I am doing the POC on integrating C# with Kafka, previously I did that for RabbitMQ which has a method "MessageCount", but for Kafka, I cannot find any.

CodePudding user response:

Actually Kafka has an infinite loop, in which it calls the poll() function to get eventual new records from a partition.

The configuration : max.poll.intervall.ms, specifies the interval of time after which, if the poll() function is not called, the consumer is considered dead and a rebalance is operated.

So to answer your question, Kafka always calls the poll() function to check if a message is available to be consummed. However, there some consumer configurations that allow to wait for a minimum size of messages before consumming the message:

  • fetch.min.bytes : you will wait untill you have x bytes of messages to consume them
  • fetch.max.wait.ms : set how much time you are gonna wait for the fetch.min.bytes to be gathered

CodePudding user response:

In theory, if you can view if messages exist you are already using processes to connect to kafka. So you might as well just do a try catch with consume with the same performance.

CodePudding user response:

    Properties props = new Properties();
    props.add("bootstrap.servers", "broker1:9092,broker2:9092");
    props.add("group.id", "CountryCounter");
    props.add("key.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer");
    props.add("value.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer =
        new KafkaConsumer<String, String>(props);

    //Subscribing to Topics
    consumer.subscribe(Collections.singletonList("customerCountries"));

    //The Poll Loop
    try
    {
        while (true)
        {
            ConsumerRecords<String, String> records = consumer.poll(100);
            foreach (ConsumerRecord<String, String> record in records)
            {
                Console.WriteLine(@"topic = {0}, partition = {1}, offset = {2},
                    customer = {3}, country = {4}",
                    record.topic(), record.partition(), record.offset(),
                    record.key(), record.value());

                int updatedCount = 1;
                if (custCountryMap.countainsKey(record.value()))
                {
                    updatedCount = custCountryMap.get(record.value())   1;
                }
                custCountryMap.add(record.value(), updatedCount)


                JSONObject json = new JSONObject(custCountryMap);
                Console.WriteLine(json.toString());
            }
        }
    }
    finally
    {
        consumer.close();
    }
  • Related