Consider the following setup:
public class WorkItem
{
public string Name { get; set; }
}
public class Worker
{
public async Task DoWork(WorkItem workItem)
{
await Task.Delay(1000); //run task
}
}
public class Engine
{
private ConcurrentQueue<WorkItem> _workItems = new();
private List<Worker> _workers = new();
public Engine(int workers, int threads)
{
ConcurrentThreadsForEachWorker = threads;
_workers = new();
for (int i = 0; i < workers; i )
{
_workers.Add(new());
}
}
public int ConcurrentThreadsForEachWorker { get; private set; }
public async Task RunAsync(CancellationToken token)
{
while(!token.IsCancellationRequested)
{
//distribute work amongst workers here
}
}
}
Let's say on the constructor for the Engine
class, I get 2 for workers and 4 for threads. I want to implement the RunAsync
method so that the workers have equal load as follows:
WorkItem 1 -> Worker 1 (running 1)
WorkItem 2 -> Worker 2 (running 1)
WorkItem 3 -> Worker 1 (running 2)
WorkItem 4 -> Worker 2 (running 2)
WorkItem 5 -> Worker 1 (running 3)
WorkItem 6 -> Worker 2 (running 3)
WorkItem 7 -> Worker 1 (running 4 - full)
WorkItem 8 -> Worker 2 (running 4 - full)
WorkItem 9 -> Both workers are full, so wait until one of them is free
CodePudding user response:
A simple solution might be to store the workers in a Queue<T>
instead of a List<T>
, dequeue a worker every time you need one, and enqueue it back immediately:
Queue<Worker> _workers = new();
for (int i = 0; i < workersCount; i ) _workers.Enqueue(new());
ParallelOptions options = new() { MaxDegreeOfParallelism = 10 };
await Parallel.ForEachAsync(workItems, options, async (workItem, ct) =>
{
Worker worker;
lock (_workers)
{
worker = _workers.Dequeue();
_workers.Enqueue(worker);
}
await worker.DoWork(workItem);
});
This way the workers will be used in a round robin fashion, as an unlimited resource. The MaxConcurrencyPerWorker
policy will not be enforced.
If you want to enforce this policy, then you must use them as a limited resource, so enqueue them back in the queue only after the completion of the DoWork
operation. You must also enqueue each Worker
multiple times in the queue (MaxConcurrencyPerWorker
times), in an interleaving manner. You must also deal with the case that the pool of workers has been exhausted, in which case the execution flow will have to be suspended until a worker becomes available. A Queue<T>
doesn't offer this functionality. You will need a Channel<T>
:
Channel<Worker> workerPool = Channel.CreateUnbounded<Worker>();
for (int i = 0; i < MaxConcurrencyPerWorker; i )
foreach (Worker worker in _workers)
workerPool.Writer.TryWrite(worker);
ParallelOptions options = new() { MaxDegreeOfParallelism = workerPool.Reader.Count };
await Parallel.ForEachAsync(workItems, options, async (workItem, ct) =>
{
Worker worker = await workerPool.Reader.ReadAsync();
try
{
await worker.DoWork(workItem);
}
finally { workerPool.Writer.TryWrite(worker); }
});
The Channel<T>
is an asynchronous version of the BlockingCollection<T>
. The ChannelReader.ReadAsync
method returns a worker synchronously if one is stored currently in the channel, or asynchronously if the channel is currently empty. In the above example the ReadAsync
will always return synchronously a worker, because the degree of parallelism of the Parallel.ForEachAsync
loop has been limited to the number of the total (not distinct) available workers.