Home > other >  Distribute ConcurrentQueue<> amongst workers equally
Distribute ConcurrentQueue<> amongst workers equally

Time:12-24

Consider the following setup:

public class WorkItem
{
    public string Name { get; set; }
}

public class Worker
{
    public async Task DoWork(WorkItem workItem)
    {
        await Task.Delay(1000); //run task
    }
}


public class Engine
{
    private ConcurrentQueue<WorkItem> _workItems = new();
    private List<Worker> _workers = new();

    public Engine(int workers, int threads)
    {
        ConcurrentThreadsForEachWorker = threads;
        _workers = new();
        for (int i = 0; i < workers; i  )
        {
            _workers.Add(new());
        }
    }

    public int ConcurrentThreadsForEachWorker { get; private set; }    

    public async Task RunAsync(CancellationToken token)
    {
        while(!token.IsCancellationRequested)
        {
            //distribute work amongst workers here
        }
    }
}

Let's say on the constructor for the Engine class, I get 2 for workers and 4 for threads. I want to implement the RunAsync method so that the workers have equal load as follows:

WorkItem 1 -> Worker 1 (running 1)
WorkItem 2 -> Worker 2 (running 1)
WorkItem 3 -> Worker 1 (running 2)
WorkItem 4 -> Worker 2 (running 2)
WorkItem 5 -> Worker 1 (running 3)
WorkItem 6 -> Worker 2 (running 3)
WorkItem 7 -> Worker 1 (running 4 - full)
WorkItem 8 -> Worker 2 (running 4 - full)
WorkItem 9 -> Both workers are full, so wait until one of them is free

CodePudding user response:

A simple solution might be to store the workers in a Queue<T> instead of a List<T>, dequeue a worker every time you need one, and enqueue it back immediately:

Queue<Worker> _workers = new();
for (int i = 0; i < workersCount; i  ) _workers.Enqueue(new());

ParallelOptions options = new() { MaxDegreeOfParallelism = 10 };
await Parallel.ForEachAsync(workItems, options, async (workItem, ct) =>
{
    Worker worker;
    lock (_workers)
    {
        worker = _workers.Dequeue();
        _workers.Enqueue(worker);
    }
    await worker.DoWork(workItem);
});

This way the workers will be used in a round robin fashion, as an unlimited resource. The MaxConcurrencyPerWorker policy will not be enforced.

If you want to enforce this policy, then you must use them as a limited resource, so enqueue them back in the queue only after the completion of the DoWork operation. You must also enqueue each Worker multiple times in the queue (MaxConcurrencyPerWorker times), in an interleaving manner. You must also deal with the case that the pool of workers has been exhausted, in which case the execution flow will have to be suspended until a worker becomes available. A Queue<T> doesn't offer this functionality. You will need a Channel<T>:

Channel<Worker> workerPool = Channel.CreateUnbounded<Worker>();
for (int i = 0; i < MaxConcurrencyPerWorker; i  )
    foreach (Worker worker in _workers)
        workerPool.Writer.TryWrite(worker);

ParallelOptions options = new() { MaxDegreeOfParallelism = workerPool.Reader.Count };
await Parallel.ForEachAsync(workItems, options, async (workItem, ct) =>
{
    Worker worker = await workerPool.Reader.ReadAsync();
    try
    {
        await worker.DoWork(workItem);
    }
    finally { workerPool.Writer.TryWrite(worker); }
});

The Channel<T> is an asynchronous version of the BlockingCollection<T>. The ChannelReader.ReadAsync method returns a worker synchronously if one is stored currently in the channel, or asynchronously if the channel is currently empty. In the above example the ReadAsync will always return synchronously a worker, because the degree of parallelism of the Parallel.ForEachAsync loop has been limited to the number of the total (not distinct) available workers.

  • Related