Home > Blockchain >  KafkaConsumer hangs forever on consumer.Consume()
KafkaConsumer hangs forever on consumer.Consume()

Time:08-25

I'm trying to consume data from producer.

 var config = new ConsumerConfig
            {
                GroupId = consumerSettings.GroupId,
                BootstrapServers = consumerSettings.BootstrapServers,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                
            };

            using var consumer = new ConsumerBuilder<string, string>(config).Build();
            consumer.Subscribe(topic);

            try
            {
                while (true)
                {
                    try
                    {
                        var consumeResult = consumer.Consume();
                    }
                    catch (ConsumeException ex)
                    {
                        Console.WriteLine($"Error occured: {ex.Error.Reason}");
                    }
                }

            }

In this line, the application waits for an infinite amount of time and does not give any errors. makes you wait forever.

var consumeResult = consumer.Consume();

What should i do? Should i add timeout ?

CodePudding user response:

That is actually the expected behavior from Confluent's Kafka library for .NET. The Consume() invocation is a blocking operation and that's why, usually, you'll want to have it run on its own Thread / WorkerService / Context.

Consume() only returns when it successfully consumers something from that Topic.

Source

Samples:

using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
    consumer.Subscribe(topics);

    while (!cancelled)
    {
        var consumeResult = consumer.Consume(cancellationToken);  // This hangs until a result is received from the topic.

        // handle consumed message.
        ...
    }

    consumer.Close();
}

If you're running an API or a Worker Service you'll probably want to inherit from a BackgroundService and do something within it's execution loop similar to what you can see below:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogInformation("Starting {serviceName}", GetType().FullName);
            KafkaConsumer = serviceProvider
                .CreateScope().ServiceProvider
                .GetRequiredService<IConsumer<string, byte[]>>();
            KafkaConsumer.Subscribe(Constants.CloudEvents.SportCreatedTopic);
            while (!cancellationToken.IsCancellationRequested)
            {
                try
                {
                    ConsumeResult<string, byte[]> result = KafkaConsumer.Consume(cancellationToken);
                    var cloudEvent = result.Message.ToCloudEvent(cloudEventFormatter);
                    if (cloudEvent.Data is Sport createdSport)
                    {
                        _logger.LogTrace("Attempting to update a report with the new sport");
                        Report report =
                            await sender.Send(new GetReportForDay(), cancellationToken)     // Either fetch the current report, or...
                            ?? await sender.Send(new CreateReport(), cancellationToken);    // Create a new one if none exists for today

                        _ = await sender.Send(new AppendCreatedSport
                        {
                            AppendTo = report,
                            Append = createdSport
                        }, cancellationToken);
                    }
                }
                // Consumer errors should generally be ignored (or logged) unless fatal.
                catch (ConsumeException e) when (e.Error.IsFatal)
                {
                    _logger.LogError(e, "A fatal consumer error happened");
                    throw;
                }
                catch (Exception e)
                {
                    _logger.LogError(e, "An exception happened, oh no");
                }
                finally
                {
                    await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
                }

            }
            KafkaConsumer.Unsubscribe();

        }

  • Related