Home > Blockchain >  KafkaConsumer hangs forever on consumer.Consume()
KafkaConsumer hangs forever on consumer.Consume()


I'm trying to consume data from producer.

 var config = new ConsumerConfig
                GroupId = consumerSettings.GroupId,
                BootstrapServers = consumerSettings.BootstrapServers,
                AutoOffsetReset = AutoOffsetReset.Earliest,

            using var consumer = new ConsumerBuilder<string, string>(config).Build();

                while (true)
                        var consumeResult = consumer.Consume();
                    catch (ConsumeException ex)
                        Console.WriteLine($"Error occured: {ex.Error.Reason}");


In this line, the application waits for an infinite amount of time and does not give any errors. makes you wait forever.

var consumeResult = consumer.Consume();

What should i do? Should i add timeout ?

CodePudding user response:

That is actually the expected behavior from Confluent's Kafka library for .NET. The Consume() invocation is a blocking operation and that's why, usually, you'll want to have it run on its own Thread / WorkerService / Context.

Consume() only returns when it successfully consumers something from that Topic.



using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())

    while (!cancelled)
        var consumeResult = consumer.Consume(cancellationToken);  // This hangs until a result is received from the topic.

        // handle consumed message.


If you're running an API or a Worker Service you'll probably want to inherit from a BackgroundService and do something within it's execution loop similar to what you can see below:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
            _logger.LogInformation("Starting {serviceName}", GetType().FullName);
            KafkaConsumer = serviceProvider
                .GetRequiredService<IConsumer<string, byte[]>>();
            while (!cancellationToken.IsCancellationRequested)
                    ConsumeResult<string, byte[]> result = KafkaConsumer.Consume(cancellationToken);
                    var cloudEvent = result.Message.ToCloudEvent(cloudEventFormatter);
                    if (cloudEvent.Data is Sport createdSport)
                        _logger.LogTrace("Attempting to update a report with the new sport");
                        Report report =
                            await sender.Send(new GetReportForDay(), cancellationToken)     // Either fetch the current report, or...
                            ?? await sender.Send(new CreateReport(), cancellationToken);    // Create a new one if none exists for today

                        _ = await sender.Send(new AppendCreatedSport
                            AppendTo = report,
                            Append = createdSport
                        }, cancellationToken);
                // Consumer errors should generally be ignored (or logged) unless fatal.
                catch (ConsumeException e) when (e.Error.IsFatal)
                    _logger.LogError(e, "A fatal consumer error happened");
                catch (Exception e)
                    _logger.LogError(e, "An exception happened, oh no");
                    await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);



  • Related