Is it possible to throttle the data when consuming IAsyncEnumerable<T>
?
I have a stream of data coming in rapidly, and I'm only interested in the last element every N seconds.
Thanks.
CodePudding user response:
It would seem to make sense to have a different endpoint which returns the most recent event rather than a dealing with the stream.
If you have to deal with a queue/stream you could consume the events and assign each new incoming one to something like latest
and read that at the desired interval.
CodePudding user response:
One solution is to convert IAsyncEnumerable<T>
to IObservable<T>
and leverage power of System.Reactive
First, you need a converter. I couldn't find builtin so I've created my own
using System;
using System.Collections.Generic;
using System.Reactive.Subjects;
public class AsyncEnumerableToObservable<T> : IObservable<T>
{
private readonly IAsyncEnumerable<T> _source;
private readonly Subject<T> _subject = new();
public AsyncEnumerableToObservable(IAsyncEnumerable<T> source)
{
_source = source;
BeginConsume();
}
public IDisposable Subscribe(IObserver<T> observer) => _subject.Subscribe(observer);
private async void BeginConsume()
{
try
{
await foreach (var item in _source)
{
_subject.OnNext(item);
}
_subject.OnCompleted();
}
catch (Exception e)
{
_subject.OnError(e);
}
}
}
public static class AsyncEnumerableExtensions
{
public static IObservable<T> ToObservable<T>(this IAsyncEnumerable<T> source)
{
return new AsyncEnumerableToObservable<T>(source);
}
}
With this converter you can use myEnumerable.ToObservable()
and use Sample
method for throttling from System.Reactive
static class Program
{
static async Task Main(string[] args)
{
IAsyncEnumerable<int> seq = CreateSeq();
seq.ToObservable().Sample(TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);
await Task.Delay(TimeSpan.FromSeconds(10));
}
private static async IAsyncEnumerable<int> CreateSeq()
{
int i = 0;
while (true)
{
await Task.Yield();
i ;
yield return i;
}
}
}