In .NET 5 we had Parallel.ForEach
which you were able to use ParallelLoopState.Break()
method to stop additional iterations from processing. Allowing current ones to complete processing.
But the new .NET 6 Parallel.ForEachAsync
does not have the ParallelLoopState
class so we can't break it like we could with Parallel.ForEach
. So is there a way to perform the same break functionality in ForEachAsync
? CancellationToken
passed to the func I don't believe is the right way since your not trying to cancel the running loop but preventing additional iterations from starting.
Something like this functionality but for the async version:
int count = 0;
Parallel.ForEach(enumerateFiles, new ParallelOptions() { CancellationToken = cancellationToken},
(file, state) =>
{
Interlocked.Increment(ref count);
if (count >= MaxFilesToProcess)
{
state.Break();
}
...
As a workaround I can probably use .Take([xx])
on the TSource
before it is passed into the parallel loop but that might not be an option for a complex condition to break on.
CodePudding user response:
The asynchronous API Parallel.ForEachAsync
does not offer the Stop
/Break
functionality of its synchronous counterpart.
One way to replicate this functionality is to use a bool
flag in combination with the TakeWhile
LINQ operator:
bool breakFlag = false;
await Parallel.ForEachAsync(
source.TakeWhile(_ => !Volatile.Read(ref breakFlag)),
async (item, ct) =>
{
// ...
if (condition) Volatile.Write(ref breakFlag, true);
// ...
});
The Parallel.ForEachAsync
does not buffer aggressively elements from the source sequence, like the Parallel.ForEach
does, so as soon as the condition is met, no more asynchronous operations are going to start.
Is case the source
is an asynchronous enumerable (IAsyncEnumerable<T>
), there is a compatible TakeWhile
operator with identical functionality in the System.Linq.Async package.