Home > Back-end >  How to break the Parallel.ForEachAsync loop, not cancel it?
How to break the Parallel.ForEachAsync loop, not cancel it?

Time:02-12

In .NET 5 we had Parallel.ForEach which you were able to use ParallelLoopState.Break() method to stop additional iterations from processing. Allowing current ones to complete processing.

But the new .NET 6 Parallel.ForEachAsync does not have the ParallelLoopState class so we can't break it like we could with Parallel.ForEach. So is there a way to perform the same break functionality in ForEachAsync? CancellationToken passed to the func I don't believe is the right way since your not trying to cancel the running loop but preventing additional iterations from starting.

Something like this functionality but for the async version:

int count = 0;
Parallel.ForEach(enumerateFiles, new ParallelOptions() { CancellationToken = cancellationToken},
    (file, state) =>
    {
        Interlocked.Increment(ref count);
        if (count >= MaxFilesToProcess)
        {
            state.Break();
        }
...

As a workaround I can probably use .Take([xx]) on the TSource before it is passed into the parallel loop but that might not be an option for a complex condition to break on.

CodePudding user response:

The asynchronous API Parallel.ForEachAsync does not offer the Stop/Break functionality of its synchronous counterpart.

One way to replicate this functionality is to use a bool flag in combination with the TakeWhile LINQ operator:

bool breakFlag = false;
await Parallel.ForEachAsync(
    source.TakeWhile(_ => !Volatile.Read(ref breakFlag)),
    async (item, ct) =>
{
    // ...
    if (condition) Volatile.Write(ref breakFlag, true);
    // ...
});

The Parallel.ForEachAsync does not buffer aggressively elements from the source sequence, like the Parallel.ForEach does, so as soon as the condition is met, no more asynchronous operations are going to start.

Is case the source is an asynchronous enumerable (IAsyncEnumerable<T>), there is a compatible TakeWhile operator with identical functionality in the System.Linq.Async package.

  • Related