I am building a client for continuously consuming new records from a data source. The integration is pull-based and my client periodically queries the data source for new records. I'm using IAsyncEnumerable
as a return type for this continuous stream of new records. Here is the gist of the method in question:
public async IAsyncEnumerable<Record> StreamRecords(...)
{
while (!cancellationToken.IsCancellationRequested)
{
using var response = await _httpClient.SendAsync(request, cancellationToken);
var records = _parser.Parse(response.Content);
foreach (var r in records) yield return r;
await Task.Delay(waitPeriod, cancellationToken);
}
}
Is this an appropriate use of IAsyncEnumerable? This stream should be "never ending" or "continuous" (at least until cancellationToken or error).
CodePudding user response:
As with most things: the key point is that the intent and behaviour is clearly communicated. I see no conceptual issue over a non-ending sequence, whether that is IEnumerable<T>
or IAsyncEnumerable<T>
- but obviously if the consumer calls ToList[Async]()
, things are going to end badly. This is not in itself a problem - to give a similar scenario: some I[Async]Enumerable<T>
sequences are non-repeatable - either they can only be iterated once (with subsequent attempts failing), or they could yield different results each time (without even requiring things like list changes). This is perfectly legal, yet code exists in the wild that assumes that sequences are repeatable and will yield the same data. The same discussion exists here, and ultimately, it isn't the producer at fault, but the consumer.
So: your sequence (producer) will work perfectly fine with a reasonable consumer that understands that the data should be treated as unbounded. If that's what your application has, then: great!
CodePudding user response:
Your StreamRecords
method returns essentially a consuming sequence, similar in nature with the sequences returned by the methods BlockingCollection<T>.GetConsumingEnumerable
and ChannelReader<T>.ReadAllAsync
. Consuming means that when the caller enumerates the sequence, the returned elements are permanently removed from some backing storage. In the case of these two methods the backing storage is an internal ConcurrentQueue<T>
. In your case (based on this comment) the backing storage is located server-side, with some client-side code that knows what data to fetch next.
Exposing a consuming sequence comes with some challenges:
- Should the method have the word Consuming or Destructive as part of its name?
- What happens in case of cancellation? Is the sequence responsive enough when a cancellation signal arrives? Are the caller's expectations met?
- What happens if the caller abandons the enumeration prematurely, either by deliberately
break
ing orreturn
ing from theawait foreach
loop, or unwillingly by a transient exception thrown inside the loop? Are any consumed elements in danger of being lost?
Regarding the 1st challenge you can read this GitHub issue, or even better watch this video, where the various options were debated. Spoiler alert, the Microsoft engineers settled for the seemingly innocuous ReadAllAsync
.
Regarding the 2nd challenge you can read this question, showing that the (technically justified) decision made by Microsoft regarding the ChannelReader<T>.ReadAllAsync
API, resulted in non-intuitive/unexpected behavior.
Regarding the 3rd challenge you could consider taking advantage of the mechanics of the finally
blocks inside iterator methods. Check out this answer for more details.
Because of these nuances, it might be a good idea to give additional consuming options to the callers. Something like public Task<Record[]> TakeAllNewRecords()
for example.