Home > Mobile >  How GroupBy works with IAsyncEnumerable?
How GroupBy works with IAsyncEnumerable?

Time:07-29

I've implemented IAsyncEnumerable to my HttpClient requests where there is a pagination but I also need to GroupBy them. So I've implemented code like below;

public class Item 
{
   public int Id {get; set;}
   public string Name {get; set;}
}

public async IAsyncEnumerable<Item> GetItems()
{
   while(hasMorePage)
   {
       // ... get paginated items

       foreach(var item in paginatedItems)
       {
         yield return item;
       }
   }
}

// should find most repeated item(by Id) and count of them.
public async Task GroupItems()
{
  IAsyncEnumerable<Item> items = GetItems();
  
  //IAsyncGrouping
  await foreach(var item in items.GroupBy(i => i.Id).
                                  OrderByDescendingAwait(i => i.CountAsync()).
                                  Take(10))
 {
    Console.WriteLine(item.Key.ToString()   (await item.CountAsync()).ToString());
 }
}

This code works perfectly fine as I expected. But I would like to understand how GroupBy works here, because of it should have all items to group by id is there something that I miss? or is there anything I can refactor for performance?

CodePudding user response:

First of all, the ALinq repo linked in the comments has nothing to do with .NET's IAsyncEnumerable or System.Linq.Async. It's an 8 year old repo that doesn't even target .NET Core. System.Linq.Async is maintained by the same team that built Reactive Excetions for .NET and its code is in Reactive Extensions GroupBy illustration

Notice that Rx.NET's GroupBy actually partitions the event stream by the grouping key and emits streams not groupings. Subscribers will subscribe to those streams and process their events. This Aggregation example demonstrates this:

var source = Observable.Interval(TimeSpan.FromSeconds(0.1)).Take(10);
var group = source.GroupBy(i => i % 3);
group.Subscribe(
  grp => 
    grp.Min().Subscribe(
      minValue => 
        Console.WriteLine("{0} min value = {1}", grp.Key, minValue)),
  () => Console.WriteLine("Completed"));

If you need to process a long-running IAsyncEnumerable<> stream you can use ToObservable

  • Related