Home > Back-end >  Buffering IAsyncEnumerable in producer/consumer scenario
Buffering IAsyncEnumerable in producer/consumer scenario

Time:10-26

I Have a scenario in which I am reading some data from a database. This data is returned in the form of IAsyncEnumerable<MyData>. After reading the data I want to send it to a consumer. This consumer is asynchronous. Right now my code looks something like this:

// C#
IAsyncEnumerable<MyData> enumerable = this.dataSource.Read(query);

await foreach (var data in enumerable) 
{
    await this.consumer.Write(data);
}

My problem with this is that while I am enumerating the database, I am holding a lock on the data. I don't want to hold this lock for longer than I need to.

In the event that the consumer is consuming data slower than the producer is producing it, is there any way I can eagerly read from the datasource without just calling ToList or ToListAsync. I want to avoid reading all the data into memory at once, which would cause the opposite problem if now the producer is slower than the consumer. It is ok if the lock on the database is not as short as possible, I want a configurable tradeoff between how much data is in memory at once, and how long we keep the enumeration running.

My thought is that there would be some way to use a queue or channel-like datastructure to act as a buffer between the producer and consumer.

In Golang I would do something like this:

// go
queue := make(chan MyData, BUFFER_SIZE)
go dataSource.Read(query, queue)

// Read sends data on the channel, closes it when done

for data := range queue {
    consumer.Write(data)
}

Is there any way to get similar behavior in C#?

CodePudding user response:

Here is a more robust implementation of the ConsumeBuffered extension method in Rafael's answer. This one uses a Channel<T> as buffer, instead of a BlockingCollection<T>. The advantage is that the enumeration of the two sequences, the source and the buffered, does not block one thread each. Care has been taken to complete the enumeration of the source sequence, in case the enumeration of the buffered sequence is abandoned prematurely by the consumer downstream.

public static async IAsyncEnumerable<T> ConsumeBuffered<T>(
    this IAsyncEnumerable<T> source, int capacity,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);
    Channel<T> channel = Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
    {
        SingleWriter = true,
        SingleReader = true,
    });
    using CancellationTokenSource completionCts = new();

    Task producer = Task.Run(async () =>
    {
        Exception exception = null;
        try
        {
            await foreach (T item in source.WithCancellation(completionCts.Token)
                .ConfigureAwait(false))
            {
                await channel.Writer.WriteAsync(item, completionCts.Token)
                    .ConfigureAwait(false);
            }
        }
        catch (Exception ex) { exception = ex; }
        channel.Writer.Complete(exception);
    });

    try
    {
        await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken)
            .ConfigureAwait(false))
        {
            yield return item;
            cancellationToken.ThrowIfCancellationRequested();
        }
    }
    finally // Happens when the caller disposes the output enumerator
    {
        completionCts.Cancel();
        await producer.ConfigureAwait(false);
    }
}

Setting the SingleWriter and SingleReader options of the bounded channel is a bit academic, and could be omitted. Currently (.NET 6) there is only one bounded Channel<T> implementation in the System.Threading.Channels library, regardless of the supplied options. This implementation is based on a Deque<T> (internal .NET type similar with a Queue<T>) synchronized with a lock.

The channel is enumerated inside a try/finally block, because C# iterators execute the finally blocks as part of the Dispose/DisposeAsync method of the autogenerated IEnumerator<T>/IAsyncEnumerator<T>.

The above ConsumeBuffered method has not been tested. In theory it should be bug-free, but in practice it might not be.

CodePudding user response:

Thank you to @Evk for pointing me towards the BlockingCollection<T>, this is the solution I came up with. It allows me to eagerly produce from an IAsyncEnumerable even if the consumer can't keep up. It may also be possible to come up with a similar solution using System.Threading.Channels to mimic the Go example.

public static async IAsyncEnumerable<T> ConsumeBuffered<T>(this IAsyncEnumerable<T> enumerable, int? maxBuffer = null)
    where T: class
{
    using (BlockingCollection<T> queue = maxBuffer == null ? new BlockingCollection<T>() : new BlockingCollection<T>(maxBuffer.Value))
    {
        Task producer = Task.Run(
            async () =>
            {
                await foreach (T item in enumerable.ConfigureAwait(false))
                {
                    queue.Add(item);
                }

                queue.CompleteAdding();
            });

        while (true)
        {
            T next;
            try
            {
                next = queue.Take();
            }
            catch (InvalidOperationException _)
            {
                // thrown when we try to Take after last item
                break;
            }

            yield return next;
        }

        // this might not be needed, task must be done 
        // if we exited the loop
        await producer.ConfigureAwait(false);
    }
}

Probably needs some polishing and testing for edge cases, but seems to work in UT

  • Related