I'm trying to write kafka consumer on .NET 6 I've created a basic consumer class like this.
public List<KafkaCdrModel> Consume(string topic)
{
List<KafkaCdrModel> result = new();
var config = new ConsumerConfig
{
GroupId = consumerSettings.GroupId,
BootstrapServers = consumerSettings.BootstrapServers,
AutoOffsetReset = AutoOffsetReset.Earliest,
};
using var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe(topic);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume();
var serializedResult = JsonConvert.DeserializeObject<KafkaCdrModel>(consumeResult.Message.Value);
result.Add(serializedResult);
}
catch (ConsumeException ex)
{
Console.WriteLine($"Error occured: {ex.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
return result;
}
When I first consumed, the records came, but when there were no more records, it got stuck in the while loop. When I changed the Group Id and tried, I thought that the records would come again, so I would try to serialize it to my custom model. At this moment. I've 2 questions.
First, How can I pull the same records over and over for testing?
Second, How can I get it to exit while loop when it can't find a record?
CodePudding user response:
- How can I pull the same records over and over for testing?
R. You need to use a different GroupId
. Each KafkaConsumer will consume once every message, but if you use another GroupId
it means that are different consumers, and then you can read/consume the same message again.
- How can I get it to exit while loop when it can't find a record?
R. Basically, you can use break
, example:
int index = 0;
while(true)
{
index ;
if (index >= 2)
break;
}
You can also see more details in this another question: https://stackoverflow.com/a/6719647/14477325