I've implemented IAsyncEnumerable
to my HttpClient
requests where there is a pagination but I also need to GroupBy
them. So I've implemented code like below;
public class Item
{
public int Id {get; set;}
public string Name {get; set;}
}
public async IAsyncEnumerable<Item> GetItems()
{
while(hasMorePage)
{
// ... get paginated items
foreach(var item in paginatedItems)
{
yield return item;
}
}
}
// should find most repeated item(by Id) and count of them.
public async Task GroupItems()
{
IAsyncEnumerable<Item> items = GetItems();
//IAsyncGrouping
await foreach(var item in items.GroupBy(i => i.Id).
OrderByDescendingAwait(i => i.CountAsync()).
Take(10))
{
Console.WriteLine(item.Key.ToString() (await item.CountAsync()).ToString());
}
}
This code works perfectly fine as I expected. But I would like to understand how GroupBy
works here, because of it should have all items to group by id is there something that I miss? or is there anything I can refactor for performance?
CodePudding user response:
First of all, the ALinq repo linked in the comments has nothing to do with .NET's IAsyncEnumerable or System.Linq.Async. It's an 8 year old repo that doesn't even target .NET Core. System.Linq.Async is maintained by the same team that built Reactive Excetions for .NET and its code is in
Notice that Rx.NET's GroupBy
actually partitions the event stream by the grouping key and emits streams not groupings. Subscribers will subscribe to those streams and process their events. This Aggregation example demonstrates this:
var source = Observable.Interval(TimeSpan.FromSeconds(0.1)).Take(10);
var group = source.GroupBy(i => i % 3);
group.Subscribe(
grp =>
grp.Min().Subscribe(
minValue =>
Console.WriteLine("{0} min value = {1}", grp.Key, minValue)),
() => Console.WriteLine("Completed"));
If you need to process a long-running IAsyncEnumerable<> stream you can use ToObservable