Home > Software design >  IAsyncEnumerable and throttling
IAsyncEnumerable and throttling

Time:06-16

Is it possible to throttle the data when consuming IAsyncEnumerable<T>? I have a stream of data coming in rapidly, and I'm only interested in the last element every N seconds.

Thanks.

CodePudding user response:

It would seem to make sense to have a different endpoint which returns the most recent event rather than a dealing with the stream.

If you have to deal with a queue/stream you could consume the events and assign each new incoming one to something like latest and read that at the desired interval.

CodePudding user response:

One solution is to convert IAsyncEnumerable<T> to IObservable<T> and leverage power of System.Reactive

First, you need a converter. I couldn't find builtin so I've created my own

using System;
using System.Collections.Generic;
using System.Reactive.Subjects;

public class AsyncEnumerableToObservable<T> : IObservable<T>
{
    private readonly IAsyncEnumerable<T> _source;
    private readonly Subject<T> _subject = new();

    public AsyncEnumerableToObservable(IAsyncEnumerable<T> source)
    {
        _source = source;
        BeginConsume();
    }

    public IDisposable Subscribe(IObserver<T> observer) => _subject.Subscribe(observer);

    private async void BeginConsume()
    {
        try
        {
            await foreach (var item in _source)
            {
                _subject.OnNext(item);
            }

            _subject.OnCompleted();
        }
        catch (Exception e)
        {
            _subject.OnError(e);
        }
    }
}

public static class AsyncEnumerableExtensions
{
    public static IObservable<T> ToObservable<T>(this IAsyncEnumerable<T> source)
    {
        return new AsyncEnumerableToObservable<T>(source);
    }
}

With this converter you can use myEnumerable.ToObservable() and use Sample method for throttling from System.Reactive

static class Program
{
    static async Task Main(string[] args)
    {
        IAsyncEnumerable<int> seq = CreateSeq();
        seq.ToObservable().Sample(TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);

        await Task.Delay(TimeSpan.FromSeconds(10));
    }

    private static async IAsyncEnumerable<int> CreateSeq()
    {
        int i = 0;
        while (true)
        {
            await Task.Yield();
            i  ;
            yield return i;
        }
    }
}
  • Related