We are using BlockCollection to implement producer-consumer pattern in a real-time application, i.e.
BlockingCollection<T> collection = new BlockingCollection<T>();
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
// Starting up consumer
Task.Run(() => consumer(this.cancellationTokenSource.Token));
…
void Producer(T item)
{
collection.Add(item);
}
…
void consumer()
{
while (true)
{
var item = this.blockingCollection.Take(token);
process (item);
}
}
To be sure, this is a very simplified version of the actual production code. Sometimes when the application is under heavy load, we observe that the consuming part is lagging behind the producing part. Since the application logic is very complex, it involves interaction with other applications over network, as well as with SQL databases. Delays could be occurring in many places; they could occur in the calls to process(), which might in principle explain why the consuming part can be slow.
All the above considerations aside, is there something inherent in using BlockingCollection, which could explain this phenomenon? Are there more efficient options in .Net to realise producer-consumer pattern?
CodePudding user response:
First of all, BlockingCollection isn't the best choice for producer/consumer scenarios. There are at least two better options (Dataflow, Channels) and the choice depends on the actual application scenario - which is missing from the question.
It's also possible to create a producer/consumer pipeline without a buffer, by using async streams and IAsyncEnmerable.
Async Streams
In this case, the producer can be an async iterator. The consumer will receive the IAsyncEnumerable and iterate over it until it completes. It could also produce its own IAsyncEnumerable output, which can be passed to the next method in the pipeline:
The producer can be :
public static async IAsyncEnumerable<Message> ProducerAsync(CancellationToken token)
{
while(!token.IsCancellationRequested)
{
var msg=await Task.Run(()=>SomeHeavyWork());
yield return msg;
}
}
And the consumer :
async Task ConsumeAsync(IAsyncEnumerable<Message> source)
{
await foreach(var msg in source)
{
await consumeMessage(msg);
}
}
There's no buffering in this case, and the producer can't emit a new message until the consumer consumes the current one. The consumer can be parallelized with Parallel.ForEachAsync. Finally, the System.Linq.Async provides LINQ operations to async streams, allowing us to write eg :
List<OtherMsg> results=await ProducerAsync(cts.Token)
.Select(msg=>consumeAndReturn(msg))
.ToListAsync();
Dataflow - ActionBlock
Dataflow blocks can be used to construct entire processing pipelines, with each block receiving a message (data) from the previous one, processing it and passing it to the next block. Most blocks have input and where appropriate output buffers. Each block uses a single worker task but can be configured to use more. The application code doesn't have to handle the tasks though.
In the simplest case, a single ActionBlock can process messages posted to it by one or more producers, acting as a consumer:
async Task ConsumeAsync<Message>(Message message)
{
//Do something with the message
}
...
ExecutionDataflowBlockOptions _options= new () {
MaxDegreeOfParallelism=4,
BoundedCapacity=5
};
ActionBlock<Message> _block=new ActionBlock(ConsumeAsync,_options);
async Task ProduceAsync(CancellationToken token)
{
while(!token.IsCancellationRequested)
{
var msg=await produceNewMessageAsync();
await _block.SendAsync(msg);
}
_block.Complete();
await _block.Completion;
}
In this example the block uses 4 worker tasks and will block if more than 5 items are waiting in its input buffer, beyond those currently being processed.
BufferBlock as a producer/consumer queue
A BufferBlock is an inactive block that's used as a buffer by other blocks. It can be used as an asynchronous producer/consumer collection as shown in How to: Implement a producer-consumer dataflow pattern. In this case, the code needs to receive messages explicitly. Threading is up to the developer. :
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
target.Complete();
}
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed = data.Length;
}
return bytesProcessed;
}
static async Task Main()
{
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
var bytesProcessed = await consumerTask;
Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
Parallelized consumer
In .NET 6 the consumer can be simplified by using await foreach
and ReceiveAllAsync
:
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
await foreach(var data in source.ReceiveAllAsync())
{
bytesProcessed = data.Length;
}
return bytesProcessed;
}
And processed concurrently using Parallel.ForEachAsync
:
static async Task ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
var msgs=source.ReceiveAllAsync();
await Parallel.ForEachAsync(msgs,
new ParallelOptions { MaxDegreeOfParallelism = 4},
msg=>ConsumeMsgAsync(msg));
}
By default Parallel.ForeachAsync
will use as many worker tasks as there are cores
Channels
Channels are similar to Go's channels. They are built specifically for producer/consumer scenarios and allow creating pipelines at a lower level than the Dataflow library. If the Dataflow library was built today, it would be built on top of Channels.
A channel can't be accessed directly, only through its Reader or Writer interfaces. This is intentional, and allows easy pipelining of methods. A very common pattern is for a producer method to create an channel it owns and return only a ChannelReader. Consuming methods accept that reader as input. This way, the producer can control the channel's lifetime without worrying whether other producers are writing to it.
With channels, a producer would look like this :
ChannelReader<Message> Producer(CancellationToken token)
{
var channel=Channel.CreateBounded(5);
var writer=channel.Writer;
_ = Task.Run(()=>{
while(!token.IsCancellationRequested)
{
...
await writer.SendAsync(msg);
}
},token)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}
The unusual .ContinueWith(t=>writer.TryComplete(t.Exception));
is used to signal completion to the writer. This will signal readers to complete as well. This way completion propagates from one method to the next. Any exceptions are propagated as well
writer.TryComplete(t.Exception))
doesn't block or perform any significant work so it doesn't matter what thread it executes on. This means there's no need to use await
on the worker task, which would complicate the code by rethrowing any exceptions.
A consuming method only needs the ChannelReader
as source.
async Task ConsumerAsync(ChannelReader<Message> source)
{
await Parallel.ForEachAsync(source.ReadAllAsync(),
new ParallelOptions { MaxDegreeOfParallelism = 4},
msg=>consumeMessageAsync(msg)
);
}
A method may read from one channel and publish new data to another using the producer pattern :
ChannelReader<OtherMessage> ConsumerAsync(ChannelReader<Message> source)
{
var channel=Channel.CreateBounded<OtherMessage>();
var writer=channel.Writer;
await Parallel.ForEachAsync(source.ReadAllAsync(),
new ParallelOptions { MaxDegreeOfParallelism = 4},
async msg=>{
var newMsg=await consumeMessageAsync(msg);
await writer.SendAsync(newMsg);
})
.ContinueWith(t=>writer.TryComplete(t.Exception));
}
CodePudding user response:
You could look at using the Dataflow library. I'm not sure if it is more performant than a BlockingCollection. As others have said, there is no guarantee that you can consume faster than produce, so it is always possible to fall behind.