I'm trying to consume data from producer.
var config = new ConsumerConfig
{
GroupId = consumerSettings.GroupId,
BootstrapServers = consumerSettings.BootstrapServers,
AutoOffsetReset = AutoOffsetReset.Earliest,
};
using var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe(topic);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume();
}
catch (ConsumeException ex)
{
Console.WriteLine($"Error occured: {ex.Error.Reason}");
}
}
}
In this line, the application waits for an infinite amount of time and does not give any errors. makes you wait forever.
var consumeResult = consumer.Consume();
What should i do? Should i add timeout ?
CodePudding user response:
That is actually the expected behavior from Confluent's Kafka library for .NET. The Consume()
invocation is a blocking operation and that's why, usually, you'll want to have it run on its own Thread / WorkerService / Context.
Consume()
only returns when it successfully consumers something from that Topic.
Samples:
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe(topics);
while (!cancelled)
{
var consumeResult = consumer.Consume(cancellationToken); // This hangs until a result is received from the topic.
// handle consumed message.
...
}
consumer.Close();
}
If you're running an API or a Worker Service you'll probably want to inherit from a BackgroundService
and do something within it's execution loop similar to what you can see below:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Starting {serviceName}", GetType().FullName);
KafkaConsumer = serviceProvider
.CreateScope().ServiceProvider
.GetRequiredService<IConsumer<string, byte[]>>();
KafkaConsumer.Subscribe(Constants.CloudEvents.SportCreatedTopic);
while (!cancellationToken.IsCancellationRequested)
{
try
{
ConsumeResult<string, byte[]> result = KafkaConsumer.Consume(cancellationToken);
var cloudEvent = result.Message.ToCloudEvent(cloudEventFormatter);
if (cloudEvent.Data is Sport createdSport)
{
_logger.LogTrace("Attempting to update a report with the new sport");
Report report =
await sender.Send(new GetReportForDay(), cancellationToken) // Either fetch the current report, or...
?? await sender.Send(new CreateReport(), cancellationToken); // Create a new one if none exists for today
_ = await sender.Send(new AppendCreatedSport
{
AppendTo = report,
Append = createdSport
}, cancellationToken);
}
}
// Consumer errors should generally be ignored (or logged) unless fatal.
catch (ConsumeException e) when (e.Error.IsFatal)
{
_logger.LogError(e, "A fatal consumer error happened");
throw;
}
catch (Exception e)
{
_logger.LogError(e, "An exception happened, oh no");
}
finally
{
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
}
KafkaConsumer.Unsubscribe();
}