I have one thread responsible for enqueuing and one thread responsible for dequeuing. However, the frequency of the data being enqueued far surpasses the time needed to dequeue process the data. When I did the following, I ended up with a huge delay in data processing:
public void HandleData()
{
while (true)
{
try
{
if (Queue.Count > 0)
{
Queue.TryDequeue(out item);
ProcessData(item);
}
else
{
Thread.Sleep(10);
}
}
catch (Exception e)
{
//...
}
}
}
Next I tried, processing the data in separate tasks, but this ended up affecting other tasks in the project since this treatment ended up taking up most of the resources allocated to the application and generating a high thread count.
public void HandleData()
{
while (true)
{
try
{
if (Queue.Count > 0)
{
Queue.TryDequeue(out item);
Task.Run(() => ProcessData(item));
}
else
{
Thread.Sleep(10);
}
}
catch (Exception e)
{
//
}
}
}
Next, I tried the following :
public void HandleData()
{
List<Task> taskList = new List<Task>();
while (true)
{
try
{
if (Queue.Count > 0)
{
Queue.TryDequeue(out item);
if (taskList.Count <= 20)
{
Task t = Task.Run(() => ProcessData(item));
taskList.Add(t);
}
else
{
ProcessData(item);
}
}
else
{
Thread.Sleep(10);
}
taskList.RemoveAll(x => x.IsCompleted);
}
catch (Exception e)
{
//...
}
}
}
This seems to have solved the problem, but I wonder if there is a cleaner way to do it? a way to set a maximum concurrent threads number while dequeuing?
CodePudding user response:
ConcurrentQueue
isn't the proper container, especially since it doesn't provide asynchronous operations. A better option would be using an ActionBlock or a Channel combined with Parallel.ForEachAsync.
Using an ActionBlock
An ActionBlock combines both an input queue and workers to process the data asynchronously. The workers process the data as soon as it's available. Using an ActionBlock, you can create a block with a set number of workers and start posting data to it. The block will use only the configured number of worker tasks to process the data:
ActionBlock<Data> _block;
public void Initialize()
{
var options=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 20
}
_block =new ActionBlock(ProcessData,options);
}
Data/messages are posted to the block using either the Post or SendAsync methods. When there's no more data, the Complete method tells the block to shut down after processing any pending items. We can await for pending items to complete by awaiting the Completion property
public async Task Produce(CancellationToken cancel)
{
while(!cancel.IsCancellationRequested)
{
var data=ProduceSomething();
_block.Post(data);
}
_block.Complete();
await _block.Completion;
}
Using Channel
Another option is to use a Channel
instead of a ConcurrentQueue
. This class is equivalent to an asynchronous ConcurrentQueue that provides an IAsyncEnumerable<T>
stream that can be iterated with await foreach. You can create a specific number of workers to read from either the container itself, or through the IAsyncEnumerable<T>
stream. In .NET 6, the last part is a lot easier using Parallel.ForEachAsync
with a fixed degree of parallelism option:
ChannelReader<T> Producer(CancellationToken token)
{
var channel=Channel.CreateUnbounded<T>();
var writer=channel.Writer;
while(!token.IsCancellationRequested)
{
var someDate=ProduceData();
channel.WriteAsync(someData);
}
writer.Complete();
return channel.Reader;
}
async Task Consumer<T>(ChannelReader<T> input,int dop=20)
{
ParallelOptions parallelOptions = new()
{
MaxDegreeOfParallelism = dop
};
await Parallel.ForEachAsync(input.ReadAllAsync(), options,
data=>ProcessData(data));
}
CodePudding user response:
A cleaner solution is to use an ActionBlock<T>
for the TPL Dataflow library, which is part of the .NET standard libraries (from .NET Core onwards). This component invokes an action on each item it receives. It has its own input queue internally, which by default is unbounded, and processes the items with a configurable MaxDegreeOfParallelism
, which by default is 1. Here is how you could use it. First you create the block:
var block = new ActionBlock<Item>(item =>
{
ProcessData(item);
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded // Configurable
});
Then you feed it with work to do, at any moment you like:
block.Post(new Item());
Finally, when you are about to terminate the program, you mark the block as completed, and then await
(or Wait
) the block to complete for a clean termination:
block.Complete();
await block.Completion;
Note: In case the ProcessData
fails once, the ActionBlock<T>
will stop accepting more items (the Post
method will return false
instead of true
). If you want it to be fail-proof, you should catch
and handle all errors inside the ProcessData
(or inside the Action<T>
that calls the ProcessData
).