I'm trying to run Parallel.ForEach
on my Priority Queue
but I am getting the following error:
Severity Code Description Project File Line Suppression State Error CS0411 The type arguments for method 'Parallel.ForEach(OrderablePartitioner, ParallelOptions, Action<TSource, ParallelLoopState, long>)' cannot be inferred from the usage. Try specifying the type arguments explicitly. TPL_POC.PL
I know how to execute Parallel.ForEach
with IEnumerable
and List
s but there's no luck with the following.
private void ProcessTasksParallely()
{
PriorityQueue<string, int> activeTasksPriority = new PriorityQueue<string, int>();
foreach (var task in this.tasks)
{
activeTasksPriority.Enqueue(task.Task, task.Id);
}
Console.WriteLine("Processing");
var options = new ParallelOptions { MaxDegreeOfParallelism = (Environment.ProcessorCount / 2) * 10 };
Parallel.ForEach(activeTasksPriority.TryDequeue(out string t, out int priority),
options,
(t, priority) =>
{
Console.WriteLine($" task {priority}, task = {t}, thread = {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(100);
});
}
I am trying this because I need to process tasks parallel but according to the priority they were scheduled.
CodePudding user response:
The PriorityQueue<TElement, TPriority>
class does not offer a way to consume it as an IEnumerable
out of the box. It only has an UnorderedItems
property, which is not what you want. This property yields the contents of the queue without consuming them, and in no particular order. It is easy though to implement a custom GetConsumingEnumerable
method for the PriorityQueue<TElement, TPriority>
class, like this:
/// <summary>
/// Gets an enumerable sequence that consumes the elements of the queue
/// in an ordered manner.
/// </summary>
public static IEnumerable<(TElement Element, TPriority Priority)>
GetConsumingEnumerable<TElement, TPriority>(
this PriorityQueue<TElement, TPriority> source)
{
while (source.TryDequeue(out TElement element, out TPriority priority))
{
yield return (element, priority);
}
}
Usage example:
Parallel.ForEach(activeTasksPriority.GetConsumingEnumerable(), options,
(t, priority) =>
{
Console.WriteLine($"Priority: {priority}, Task: {t}");
Thread.Sleep(100);
});
CodePudding user response:
If you want to implement priority in a pub/sub scenario, both Parallel.ForEach
and PriorityQueue<T>
are bad choices.
Parallel.ForEach
is built for data parallelism - processing a ton of in-memory data by partitioning it and using roughly one worker task per core to process each partition with minimal synchronization. A PriorityQueue isn't needed here - if you want a specific order you can impose it using eg PLINQ andOrderBy
.- Priorities inevitably change the perceived order of items and the queue state, which is a big no-no for concurrency.
- Priorities can get inverted. All worker tasks may be busy processing low-priority items while a new high priority item is waiting. Worse, the default partitioner used by
Parallel.ForEach
buffers items. This means that a new high priority item may have to wait for multiple low priority items. You'd have to usePartitioner.Create
with an option to disable buffering
In high-throughput networking and messaging, priority processing is performed through multiple queues not a single priority queue. Higher-priority queues get more resources or are processed before lower priority queues.
One queue per priority class
This is how highly-scaleable messaging systems work, because it doesn't require any synchronization to determine which item to process next.
One way to implement this strategy would be to use multiple ActionBlock instances, each with a different number of worker tasks :
async Task ProcessMessage(string msg) {...}
ExecutionDataflowBlockOptions WithDop(int dop)=>new ExecutionDataflowBlockOptions{
MaxDegreeOfParallelism = dop
};
void BuildQueues()
{
_highQueue=new ActionBlock<string>(ProcessMessage,WithDop(4));
_midQueue=new ActionBlock<string>(ProcessMessage,WithDop(2));
_lowQueue=new ActionBlock<string>(ProcessMessage,WithDop(1));
}
public void Process(string msg,int priority)
{
var queue= priority switch {
<=0 => _highQueue,
1 => _midQueue,
_ => _lowQueue
}
queue.Post(msg);
}
async Task Complete()
{
_highQueue.Complete();
_midQueue.Complete();
_lowQueue.Complete();
await Task.WhenAll(
_hiqhQueue.Completion,
_midQueue.Completion,
_lowQueue.Completion
);
}
In this case Process
uses pattern matching to route the message to the appropriate ActionBlock