I am new to Kafka and looking for a way to know if the message is ready for consumption to the consumer before calling consume method.
I am doing the POC on integrating C# with Kafka, previously I did that for RabbitMQ which has a method "MessageCount", but for Kafka, I cannot find any.
CodePudding user response:
Actually Kafka has an infinite loop, in which it calls the poll() function to get eventual new records from a partition.
The configuration : max.poll.intervall.ms, specifies the interval of time after which, if the poll() function is not called, the consumer is considered dead and a rebalance is operated.
So to answer your question, Kafka always calls the poll() function to check if a message is available to be consummed. However, there some consumer configurations that allow to wait for a minimum size of messages before consumming the message:
- fetch.min.bytes : you will wait untill you have x bytes of messages to consume them
- fetch.max.wait.ms : set how much time you are gonna wait for the fetch.min.bytes to be gathered
CodePudding user response:
In theory, if you can view if messages exist you are already using processes to connect to kafka. So you might as well just do a try catch with consume with the same performance.
CodePudding user response:
Properties props = new Properties();
props.add("bootstrap.servers", "broker1:9092,broker2:9092");
props.add("group.id", "CountryCounter");
props.add("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.add("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer =
new KafkaConsumer<String, String>(props);
//Subscribing to Topics
consumer.subscribe(Collections.singletonList("customerCountries"));
//The Poll Loop
try
{
while (true)
{
ConsumerRecords<String, String> records = consumer.poll(100);
foreach (ConsumerRecord<String, String> record in records)
{
Console.WriteLine(@"topic = {0}, partition = {1}, offset = {2},
customer = {3}, country = {4}",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsKey(record.value()))
{
updatedCount = custCountryMap.get(record.value()) 1;
}
custCountryMap.add(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
Console.WriteLine(json.toString());
}
}
}
finally
{
consumer.close();
}