I have a code:
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
SetConnection();
SetChannel();
var consumer = new EventingBasicConsumer(_channel);
consumer.Received = async (_, ea) =>
{
using var scope = _serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<TestContext>();
try
{
var content = Encoding.UTF8.GetString(ea.Body.ToArray());
var newCommentModel = JsonConvert.DeserializeObject<Comment>(content);
await dbContext.AddMessageToDb(newCommentModel);
_channel.BasicAck(ea.DeliveryTag, false);
}
catch (Exception e)
{
_channel.BasicNack(ea.DeliveryTag, false, true);
Console.WriteLine(e);
throw;
}
finally
{
scope.Dispose();
}
};
_channel.BasicConsume(_queueName, false, consumer);
return Task.CompletedTask;
}
There are over 100k entries in my rabbitmq queue. After about 50-70 saves to the database, I get the error:
! An exception has been raised that is likely due to a transient failure. ---> Npgsql.PostgresException (0x80004005): 53300: remaining connection slots are reserved for non-replication superuser connections at Npgsql.Internal.NpgsqlConnector.g__ReadMessageLong|226_0(NpgsqlConnector connector, Boolean async, DataRowLoadingMode dataRowLoadingMode, Boolean readingNotifications, Boolean isReadingPrependedMessage) at Npgsql.Internal.NpgsqlConnector.g__OpenCore|208_1(NpgsqlConnector conn, SslMode sslMode, NpgsqlTimeout timeout, Boolean async, CancellationToken cancellationToken, Boolean isFirstAttempt) at Npgsql.Internal.NpgsqlConnector.Open(NpgsqlTimeout timeout, Boolean async, CancellationToken cancellationToken) at Npgsql.PoolingDataSource.OpenNewConnector(NpgsqlConnection conn, NpgsqlTimeout timeout, Boolean async, CancellationToken cancellationToken) at Npgsql.PoolingDataSource.g__RentAsync|28_0(NpgsqlConnection conn, NpgsqlTimeout timeout, Boolean async, CancellationToken cancellationToken) at Npgsql.NpgsqlConnection.g__OpenAsync|45_0(Boolean async, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Storage.RelationalConnection.OpenInternalAsync(Boolean errorsExpected, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Storage.RelationalConnection.OpenInternalAsync(Boolean errorsExpected, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Storage.RelationalConnection.OpenAsync(CancellationToken cancellationToken, Boolean errorsExpected) at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable
1 commandBatches, IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable
1 commandBatches, IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable1 commandBatches, IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.ChangeTracking.Internal.StateManager.SaveChangesAsync(IList
1 entriesToSave, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.ChangeTracking.Internal.StateManager.SaveChangesAsync(StateManager stateManager, Boolean acceptAllChangesOnSuccess, CancellationToken cancellationToken) at Npgsql.EntityFrameworkCore.PostgreSQL.Storage.Internal.NpgsqlExecutionStrategy.ExecuteAsync[TState,TResult](TState state, Func4 operation, Func
4 verifySucceeded, CancellationToken cancellationToken) Exception data: Severity: FATAL SqlState: 53300
In my postgre settings max_connection = 100. Most likely I get an error due to the fact that I do not close the connection to the database in time, is this so and how to fix it?
I try add
finally
{
scope.Dispose();
}
Do I need to somehow limit the operation of ExecuteAsync in the background? How can I limit the number of parallel processes to eg 100?
CodePudding user response:
The EventingBasicConsumer doesn't understand asynchronous event handlers so it doesn't check to see if the event handler completed. In EventingBasicConsumer.Received
the handler async (_, ea)
results in an async void
delegate that can't be awaited:
async void OnReceived(object sender, BasicDeliverEventArgs)
{
...
await dbContext.AddMessageToDb(newCommentModel);
...
}
async
methods return to the caller when the first await
is encountered. It's up to the caller to await the async operation in turn. With async void
though, there's no task to await. The client assumes the event was processed and pops the next message in the queue.
As a result, the current code will spawn DbContexts and by extension connections, without waiting for the INSERT operations to complete, quickly exhausting the database's connection limit.
According to the docs you need to use AsyncEventingBasicConsumer instead, with a special setting in the connection factory :
ConnectionFactory factory = new ConnectionFactory();
// ...
// use async-oriented consumer dispatcher. Only compatible with IAsyncBasicConsumer implementations
factory.DispatchConsumersAsync = true;
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received = async (ch, ea) =>
{
var body = ea.Body.ToArray();
// copy or deserialise the payload
// and process the message
// ...
channel.BasicAck(ea.DeliveryTag, false);
await Task.Yield();
};
// this consumer tag identifies the subscription
// when it has to be cancelled
string consumerTag = channel.BasicConsume(queueName, false, consumer);
This should receive messages one by one and await for DbContext or Dapper to INSERT them before receiving another.
This will solve the immediate problem, but it's probably too slow. Instead of executing 100K inserts one by one, a better idea would be to batch them and insert eg 100 at once.
One way to do that is to use eg a BatchBlock to batch messages together and an ActionBlock<Comment[]>
to insert these to the database. The Received
event handler will post comments to the BatchBlock without waiting for the entire batch to complete:
public MyServiceConstructor(...)
{
_batch=new BatchBlock<Comment>(100);
_importer=new ActionBlock<Comment[]>(ImportAsync);
_batch.LinkTo(importer,= new DataflowLinkOptions()
{
PropagateCompletion = true
});
...
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
...
consumer.Received = async (ch, ea) =>
{
...
await _batch.SendAsync(comment);
};
...
}
async Task ImportAsync(Comment[] comments)
{
using var scope = _serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<TestContext>();
try
{
dbContext.Comments.AddRange(comments);
await dbContext.SaveChangesAsync();
}
catch(Exception exc)
{
...
}
}
When the BackgroundService needs to exit, _batch.Complete()
should be called to tell the blocks they can complete, eg in StopAsync
. The code should wait for _importer
to complete with await _importer.Completion;
This leaves acknowledgements as an excercise to the reader. Acknowledging messages before batching risks losing messages in case of error. Acknowledging them inside Import
risks losing acknowledgements if an entire batch is lost.
Assuming Comment
somehow carries the tag, Import
could ACK or NACK an entire batch at once :
async Task ImportAsync(Comment[] comments)
{
using var scope = _serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<TestContext>();
try
{
dbContext.Comments.AddRange(comments);
await dbContext.SaveChangesAsync();
foreach(var c in comments)
{
_channel.BasicAck(c.DeliveryTag, false);
}
}
catch(Exception exc)
{
...
foreach(var c in comments)
{
_channel.BasicNack(c.DeliveryTag, false, true);
}
}
}