Home > Enterprise >  Parallel.ForEach with Priority Queue in .NET6
Parallel.ForEach with Priority Queue in .NET6

Time:03-15

I'm trying to run Parallel.ForEach on my Priority Queue but I am getting the following error:

Severity Code Description Project File Line Suppression State Error CS0411 The type arguments for method 'Parallel.ForEach(OrderablePartitioner, ParallelOptions, Action<TSource, ParallelLoopState, long>)' cannot be inferred from the usage. Try specifying the type arguments explicitly. TPL_POC.PL

I know how to execute Parallel.ForEach with IEnumerable and Lists but there's no luck with the following.

private void ProcessTasksParallely()
{
    PriorityQueue<string, int> activeTasksPriority = new PriorityQueue<string, int>();
    foreach (var task in this.tasks)
    {
        activeTasksPriority.Enqueue(task.Task, task.Id);
    }
    Console.WriteLine("Processing");

    var options = new ParallelOptions { MaxDegreeOfParallelism = (Environment.ProcessorCount / 2) * 10 };

    Parallel.ForEach(activeTasksPriority.TryDequeue(out string t, out int priority),
        options,
        (t, priority) =>
        {
            Console.WriteLine($" task {priority}, task = {t}, thread = {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(100);
        });
}

I am trying this because I need to process tasks parallel but according to the priority they were scheduled.

CodePudding user response:

The PriorityQueue<TElement, TPriority> class does not offer a way to consume it as an IEnumerable out of the box. It only has an UnorderedItems property, which is not what you want. This property yields the contents of the queue without consuming them, and in no particular order. It is easy though to implement a custom GetConsumingEnumerable method for the PriorityQueue<TElement, TPriority> class, like this:

/// <summary>
/// Gets an enumerable sequence that consumes the elements of the queue
/// in an ordered manner.
/// </summary>
public static IEnumerable<(TElement Element, TPriority Priority)>
    GetConsumingEnumerable<TElement, TPriority>(
    this PriorityQueue<TElement, TPriority> source)
{
    while (source.TryDequeue(out TElement element, out TPriority priority))
    {
        yield return (element, priority);
    }
}

Usage example:

Parallel.ForEach(activeTasksPriority.GetConsumingEnumerable(), options,
(t, priority) =>
{
    Console.WriteLine($"Priority: {priority}, Task: {t}");
    Thread.Sleep(100);
});

CodePudding user response:

If you want to implement priority in a pub/sub scenario, both Parallel.ForEach and PriorityQueue<T> are bad choices.

  • Parallel.ForEach is built for data parallelism - processing a ton of in-memory data by partitioning it and using roughly one worker task per core to process each partition with minimal synchronization. A PriorityQueue isn't needed here - if you want a specific order you can impose it using eg PLINQ and OrderBy.
  • Priorities inevitably change the perceived order of items and the queue state, which is a big no-no for concurrency.
  • Priorities can get inverted. All worker tasks may be busy processing low-priority items while a new high priority item is waiting. Worse, the default partitioner used by Parallel.ForEach buffers items. This means that a new high priority item may have to wait for multiple low priority items. You'd have to use Partitioner.Create with an option to disable buffering

In high-throughput networking and messaging, priority processing is performed through multiple queues not a single priority queue. Higher-priority queues get more resources or are processed before lower priority queues.

One queue per priority class

This is how highly-scaleable messaging systems work, because it doesn't require any synchronization to determine which item to process next.

One way to implement this strategy would be to use multiple ActionBlock instances, each with a different number of worker tasks :

async Task ProcessMessage(string msg) {...}

ExecutionDataflowBlockOptions WithDop(int dop)=>new ExecutionDataflowBlockOptions{ 
    MaxDegreeOfParallelism = dop
};


void BuildQueues()
{ 

   _highQueue=new ActionBlock<string>(ProcessMessage,WithDop(4));

   _midQueue=new ActionBlock<string>(ProcessMessage,WithDop(2));

   _lowQueue=new ActionBlock<string>(ProcessMessage,WithDop(1));
}

public void Process(string msg,int priority)
{
    var queue= priority switch {
        <=0 => _highQueue,
          1 => _midQueue,
          _ => _lowQueue
    }
    queue.Post(msg);    
}

async Task Complete()
{
    _highQueue.Complete();
    _midQueue.Complete();
    _lowQueue.Complete();
    await Task.WhenAll(
        _hiqhQueue.Completion, 
        _midQueue.Completion, 
        _lowQueue.Completion
    );
}

In this case Process uses pattern matching to route the message to the appropriate ActionBlock

  • Related