Home > Net >  How to run a Parallel.ForEachAsync loop with NoBuffering?
How to run a Parallel.ForEachAsync loop with NoBuffering?

Time:07-29

The synchronous Parallel.ForEach method has many overloads, and some of them allow to configure the parallel loop with the EnumerablePartitionerOptions.NoBuffering option:

Create a partitioner that takes items from the source enumerable one at a time and does not use intermediate storage that can be accessed more efficiently by multiple threads. This option provides support for low latency (items will be processed as soon as they are available from the source) and provides partial support for dependencies between items (a thread cannot deadlock waiting for an item that the thread itself is responsible for processing).

There is no such option or overload for the asynchronous Parallel.ForEachAsync. And this is a problem for me, because I want to use this method with a Channel<T> as source, in a producer-consumer scenario as the consumer. In my scenario it is important that the consumer bites exactly what it can chew, and no more. I don't want the consumer pulling aggressively the Channel<T>, and then putting the pulled elements in its personal hidden buffer. I want the Channel<T> to be the only queue in the system, so that I can monitor it, and have accurate statistics about the elements that are waiting to be processed/consumed.

Until recently I was under the impression that the Parallel.ForEachAsync method is not buffering by design. But in order to be sure I asked Microsoft on GitHub for a clarification. I got feedback very quickly, but not what I expected:

It's an implementation detail. With Parallel.ForEach, the buffering is done to handle body delegates that might be really fast, and thus it's attempting to minimize / amortize the cost of taking the lock to access the shared enumerator. With ForEachAsync, it's expected that the body delegates will be at least a bit meatier, and thus it doesn't attempt to do such amortization. At least not today.

Being dependent on an implementation detail is highly undesirable. So I have to rethink my approach.

My question is: Is it possible to configure the Parallel.ForEachAsync API so that is has guaranteed NoBuffering behavior? If yes, how?

Clarification: I am not asking how to reinvent the Parallel.ForEachAsync from scratch. I am asking for some kind of thin wrapper around the existing Parallel.ForEachAsync API, that will "inject" the desirable NoBuffering behavior. Something like this:

public static Task ForEachAsync_NoBuffering<TSource>(
    IAsyncEnumerable<TSource> source,
    ParallelOptions parallelOptions,
    Func<TSource, CancellationToken, ValueTask> body)
{
    // Some magic here
    return Parallel.ForEachAsync(source, parallelOptions, body);
}

The wrapper should behave exactly the same with the Parallel.ForEachAsync method on .NET 6.


Update: Here is the basic layout of my scenario:

class Processor
{
    private readonly Channel<Item> _channel;
    private readonly Task _consumer;

    public Processor()
    {
        _channel = Channel.CreateUnbounded<Item>();
        _consumer = StartConsumer();
    }

    public int PendingItemsCount => _channel.Reader.Count;
    public Task Completion => _consumer;

    public void QueueItem(Item item) => _channel.Writer.TryWrite(item);

    private async Task StartConsumer()
    {
        ParallelOptions options = new() { MaxDegreeOfParallelism = 2 };
        await Parallel.ForEachAsync(_channel.Reader.ReadAllAsync(), options, async (item, _) =>
        {
            // Call async API
            // Persist the response of the API in an RDBMS
        });
    }
}

There might be other tools available that could also be used for this purpose, but I prefer to use the smoking hot (.NET 6) Parallel.ForEachAsync API. This is the focus of this question.

CodePudding user response:

I think that I've found a way to implement the ForEachAsync_NoBuffering method. The idea is to feed the underlying Parallel.ForEachAsync loop with a fake infinite IEnumerable<TSource>, and do the actual enumeration of the IAsyncEnumerable<TSource> source inside the body:

/// <summary>
/// Executes a for-each operation on an asynchronous sequence, in which iterations
/// may run in parallel. Items are taken from the source sequence one at a time,
/// and no intermediate storage is used.
/// </summary>
public static Task ForEachAsync_NoBuffering<TSource>(
    IAsyncEnumerable<TSource> source,
    ParallelOptions parallelOptions,
    Func<TSource, CancellationToken, ValueTask> body)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(parallelOptions);
    ArgumentNullException.ThrowIfNull(body);
    bool completed = false;
    IEnumerable<TSource> Infinite()
    {
        while (!Volatile.Read(ref completed)) yield return default;
    }
    SemaphoreSlim semaphore = new(1, 1);
    IAsyncEnumerator<TSource> enumerator = source.GetAsyncEnumerator();
    return Parallel.ForEachAsync(Infinite(), parallelOptions, async (_, ct) =>
    {
        // Take the next item in the sequence, after acquiring an exclusive lock.
        TSource item;
        await semaphore.WaitAsync(); // Continue on captured context.
        try
        {
            if (completed) return;
            if (!(await enumerator.MoveNextAsync())) // Continue on captured context.
            {
                completed = true; return;
            }
            item = enumerator.Current;
        }
        finally { semaphore.Release(); }
        // Invoke the body with the item that was taken.
        await body(item, ct).ConfigureAwait(false);
    }).ContinueWith(async t =>
    {
        // Dispose the enumerator.
        await enumerator.DisposeAsync().ConfigureAwait(false);
        semaphore.Dispose();
        return t;
    }, default, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default)
        .Unwrap().Unwrap();
}

The final ContinueWith is needed in order to dispose the enumerator, as well as the SemaphoreSlim that is used for serializing the operations on the enumerator. The advantage of the ContinueWith over a simpler await is that it propagates all the exceptions of the parallel loop.

  • Related