Home > Enterprise >  Simple .NET method of a parallel work queue with controllable concurrency
Simple .NET method of a parallel work queue with controllable concurrency

Time:09-30

This sounds like an overly trivial question, and I think I am overcomplicating it because I haven't been able to find the answer for months. There are easy ways of doing this in Golang, Scala/Akka, etc but I can't seem to find anything in .NET.

What I need is an ability to have a list of Tasks that are all independent of each other, and the ability to execute them concurrently on a specified (and easily changeable) number of threads.

Basically something like:


int numberOfParallelThreads = 3;                // changeable
Queue<Task> pendingTasks = GetPendingTasks();   // returns 80 items

await SomeBuiltInDotNetParallelExecutableManager.RunAllTasksWithSpecifiedConcurrency(pendingTasks, numberOfParallelThreads);

And that SomeBuiltInDotNetParallelExecutableManager would execute 80 tasks three at a time; i.e. when one finishes it draws the next one from the queue, until the queue is exhausted.

There is Task.WhenAll and Task.WaitAll, but you can't specify the max number of parallel threads in them.

Is there a built in, simple way to do this?

CodePudding user response:

Parallel.ForEachAsync (or depending on actual workload it's sync counterpart - Parallel.ForEach, but it will not handle functions returning Task correctly):

IEnumerable<int> x = ...;

await Parallel.ForEachAsync(x, new ParallelOptions
{
    MaxDegreeOfParallelism = 3
}, async (i, token) => await Task.Delay(i * 1000, token));

Also it is highly recommended that methods in C# return so called "hot", i.e. started tasks, so "idiomatically" Queue<Task> should be a collection of already started tasks, so you will have no control over number of them executing in parallel cause it will be controlled by ThreadPool/TaskScheduler.

And there is port of Akka to .NET - Akka.NET if you want to go down that route.

CodePudding user response:

Microsoft's Reactive Framework makes this easy too:

IEnumerable<int> values = ...;

IDisposable subscription =
    values
        .ToObservable()
        .Select(v => Observable.Defer(() => Observable.Start(() => { /* do work on each value */ })))
        .Merge(3)
        .Subscribe();
  • Related