Home > Net >  Return IAsyncEnumerable from grpc with timeout try-catch
Return IAsyncEnumerable from grpc with timeout try-catch

Time:11-29

I have a gRPC client and I want to have a method that simplifies its use. The method should return IAsyncEnumerable of items being streamed from the gRPC server. I have a specified timeout for the streaming not to exceed. If the timeout occurs, I want to just walk away with all the items I managed to fetch so far.

Here's what I tried to do:

    public async IAsyncEnumerable<Item> Search(
        SearchParameters parameters, 
        CancellationToken cancellationToken, 
        IDictionary<string, string> headers = null)
    {
        try
        {
            await _client.Search(
                    MapInput(parameters),
                    cancellationToken: cancellationToken,
                    deadline: DateTime.UtcNow.Add(_configuration.Timeout),
                    headers: MapHeaders(headers))
                .ResponseStream.ForEachAsync(item =>
                {
                    yield return MapSingleItem(item); // compilation error
                });
        }
        catch (RpcException ex) when (ex.StatusCode == StatusCode.DeadlineExceeded)
        {
            _logger.LogWarning("Steam finished due to timeout, a limited number of items has been returned");
        }
    }

Logically, that should work. However, the yield keyword is not supported within lambdas, so it does not compile. Is there any other way I could write it?

CodePudding user response:

This is available out of the box in gRPC.NET. This is the recommended client for .NET. The old gRPC for C# project is in maintenance mode which ends in May 2023. The initial plan was to end even that in May 2022 but the maintenance period was extended to allow .NET Framework applications to migrate.

ResponseStream can return results as an IAsyncEnumerable<T> through the ReadAllAsync method.

The Server Streaming Call example shows this :

var client = new Greet.GreeterClient(channel);
using var call = client.SayHellos(new HelloRequest { Name = "World" });

await foreach (var response in call.ResponseStream.ReadAllAsync())
{
    Console.WriteLine("Greeting: "   response.Message);
    // "Greeting: Hello World" is written multiple times
}

Even without ReadAllAsync you can use `MoveNext to retrieve the next item. You can use this in an asynchronous iterator to produce an IAsyncEnumerable result :

IAsyncEnumerable<HelloReply> GetRepliesAsync()
{
    var client = new Greet.GreeterClient(channel);
    using var call = client.SayHellos(new HelloRequest { Name = "World" });

    while (await call.ResponseStream.MoveNext())
    {
        yield return call.ResponseStream.Current;
    }
}

In fact, that's how ReadAllAsync is implemented :

private static async IAsyncEnumerable<T> ReadAllAsyncCore<T>(IAsyncStreamReader<T> streamReader, [EnumeratorCancellation]CancellationToken cancellationToken)
{
    while (await streamReader.MoveNext(cancellationToken).ConfigureAwait(false))
    {
        yield return streamReader.Current;
    }
}

You can add the timeout as a CancellationTokenSource in both cases. For example:

var cts=new CancellationTokenSource(60000);

var client = new Greet.GreeterClient(channel);
using var call = client.SayHellos(new HelloRequest { Name = "World" });

await foreach (var response in call.ResponseStream.ReadAllAsync(cts.Token))
{
    Console.WriteLine("Greeting: "   response.Message);
    // "Greeting: Hello World" is written multiple times
}

Or

IAsyncEnumerable<HelloReply> GetRepliesAsync(int timeout)
{
    var cts=new CancellationTokenSource(timeout);

    var client = new Greet.GreeterClient(channel);
    using var call = client.SayHellos(new HelloRequest { Name = "World" });

    while (await call.ResponseStream.MoveNext(cts.Token))
    {
        yield return call.ResponseStream.Current;
    }
}

CodePudding user response:

You need an intermediate buffer to hold the items, because the consumer of the IAsyncEnumerable<Item> can enumerate it at its own pace. An excellent asynchronous buffer for this purpose is the Channel<T> class.

Another thing that you might want to consider is what happens if the consumer abandons the enumeration of the IAsyncEnumerable<Item> prematurely, either deliberately by breaking or returning, or unwillingly because it suffered an exception. You need to watch for this occurrence, and the best way to do it is to cancel a linked CancellationTokenSource in the finally block of your iterator.

Putting everything together:

public async IAsyncEnumerable<Item> Search(
    SearchParameters parameters, 
    [EnumeratorCancellation] CancellationToken cancellationToken = default,
    IDictionary<string, string> headers = null)
{
    Channel<Item> channel = Channel.CreateUnbounded<Item>();
    using var linkedCTS = CancellationTokenSource
        .CreateLinkedTokenSource(cancellationToken);

    Task producer = Task.Run(async () =>
    {
        try
        {
            await _client.Search(
                    MapInput(parameters),
                    cancellationToken: linkedCTS.Token,
                    deadline: DateTime.UtcNow.Add(_configuration.Timeout),
                    headers: MapHeaders(headers))
                .ResponseStream.ForEachAsync(item =>
                {
                    channel.Writer.TryWrite(item);
                }).ConfigureAwait(false);
            channel.Writer.Complete();
        }
        catch (Exception ex) { channel.Writer.Complete(ex); }
    });

    try
    {
        await foreach (var item in channel.Reader.ReadAllAsync()
            .ConfigureAwait(false))
        {
            yield return item;
        }
    }
    finally
    {
        linkedCTS.Cancel();
        await producer.ConfigureAwait(false);
    }
}

Most likely the resulting IAsyncEnumerable<Item> will complete with an OperationCanceledException when the token is canceled. If you prefer your token to have stopping semantics, you should at first rename it to stoppingToken, and then handle accordingly a OperationCanceledException exception inside the producer task.

CodePudding user response:

With Rx.net you can do this with the .Debounce operator and the .TakeUntil operator.

var inputObservable = input .ToObservable()
      .Publish()
      .RefCount();

var timeout = inputObs
     .Throttle(TimeSpan.FromSeconds(10));
var outputObs = inputObservable
    .TakeUntil(timeout);
  

return outputObs
     .ToAsyncEnumerable()
     .ToListAsync();
  • Related