I have a gRPC client and I want to have a method that simplifies its use. The method should return IAsyncEnumerable
of items being streamed from the gRPC server.
I have a specified timeout for the streaming not to exceed. If the timeout occurs, I want to just walk away with all the items I managed to fetch so far.
Here's what I tried to do:
public async IAsyncEnumerable<Item> Search(
SearchParameters parameters,
CancellationToken cancellationToken,
IDictionary<string, string> headers = null)
{
try
{
await _client.Search(
MapInput(parameters),
cancellationToken: cancellationToken,
deadline: DateTime.UtcNow.Add(_configuration.Timeout),
headers: MapHeaders(headers))
.ResponseStream.ForEachAsync(item =>
{
yield return MapSingleItem(item); // compilation error
});
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.DeadlineExceeded)
{
_logger.LogWarning("Steam finished due to timeout, a limited number of items has been returned");
}
}
Logically, that should work. However, the yield
keyword is not supported within lambdas, so it does not compile. Is there any other way I could write it?
CodePudding user response:
This is available out of the box in gRPC.NET. This is the recommended client for .NET. The old gRPC for C# project is in maintenance mode which ends in May 2023. The initial plan was to end even that in May 2022 but the maintenance period was extended to allow .NET Framework applications to migrate.
ResponseStream
can return results as an IAsyncEnumerable<T>
through the ReadAllAsync method.
The Server Streaming Call example shows this :
var client = new Greet.GreeterClient(channel);
using var call = client.SayHellos(new HelloRequest { Name = "World" });
await foreach (var response in call.ResponseStream.ReadAllAsync())
{
Console.WriteLine("Greeting: " response.Message);
// "Greeting: Hello World" is written multiple times
}
Even without ReadAllAsync
you can use `MoveNext to retrieve the next item. You can use this in an asynchronous iterator to produce an IAsyncEnumerable result :
IAsyncEnumerable<HelloReply> GetRepliesAsync()
{
var client = new Greet.GreeterClient(channel);
using var call = client.SayHellos(new HelloRequest { Name = "World" });
while (await call.ResponseStream.MoveNext())
{
yield return call.ResponseStream.Current;
}
}
In fact, that's how ReadAllAsync
is implemented :
private static async IAsyncEnumerable<T> ReadAllAsyncCore<T>(IAsyncStreamReader<T> streamReader, [EnumeratorCancellation]CancellationToken cancellationToken)
{
while (await streamReader.MoveNext(cancellationToken).ConfigureAwait(false))
{
yield return streamReader.Current;
}
}
You can add the timeout as a CancellationTokenSource in both cases. For example:
var cts=new CancellationTokenSource(60000);
var client = new Greet.GreeterClient(channel);
using var call = client.SayHellos(new HelloRequest { Name = "World" });
await foreach (var response in call.ResponseStream.ReadAllAsync(cts.Token))
{
Console.WriteLine("Greeting: " response.Message);
// "Greeting: Hello World" is written multiple times
}
Or
IAsyncEnumerable<HelloReply> GetRepliesAsync(int timeout)
{
var cts=new CancellationTokenSource(timeout);
var client = new Greet.GreeterClient(channel);
using var call = client.SayHellos(new HelloRequest { Name = "World" });
while (await call.ResponseStream.MoveNext(cts.Token))
{
yield return call.ResponseStream.Current;
}
}
CodePudding user response:
You need an intermediate buffer to hold the items, because the consumer of the IAsyncEnumerable<Item>
can enumerate it at its own pace. An excellent asynchronous buffer for this purpose is the Channel<T>
class.
Another thing that you might want to consider is what happens if the consumer abandons the enumeration of the IAsyncEnumerable<Item>
prematurely, either deliberately by break
ing or return
ing, or unwillingly because it suffered an exception. You need to watch for this occurrence, and the best way to do it is to cancel a linked CancellationTokenSource
in the finally block of your iterator.
Putting everything together:
public async IAsyncEnumerable<Item> Search(
SearchParameters parameters,
[EnumeratorCancellation] CancellationToken cancellationToken = default,
IDictionary<string, string> headers = null)
{
Channel<Item> channel = Channel.CreateUnbounded<Item>();
using var linkedCTS = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
Task producer = Task.Run(async () =>
{
try
{
await _client.Search(
MapInput(parameters),
cancellationToken: linkedCTS.Token,
deadline: DateTime.UtcNow.Add(_configuration.Timeout),
headers: MapHeaders(headers))
.ResponseStream.ForEachAsync(item =>
{
channel.Writer.TryWrite(item);
}).ConfigureAwait(false);
channel.Writer.Complete();
}
catch (Exception ex) { channel.Writer.Complete(ex); }
});
try
{
await foreach (var item in channel.Reader.ReadAllAsync()
.ConfigureAwait(false))
{
yield return item;
}
}
finally
{
linkedCTS.Cancel();
await producer.ConfigureAwait(false);
}
}
Most likely the resulting IAsyncEnumerable<Item>
will complete with an OperationCanceledException
when the token is canceled. If you prefer your token to have stopping semantics, you should at first rename it to stoppingToken
, and then handle accordingly a OperationCanceledException
exception inside the producer
task.
CodePudding user response:
With Rx.net you can do this with the .Debounce operator and the .TakeUntil operator.
var inputObservable = input .ToObservable()
.Publish()
.RefCount();
var timeout = inputObs
.Throttle(TimeSpan.FromSeconds(10));
var outputObs = inputObservable
.TakeUntil(timeout);
return outputObs
.ToAsyncEnumerable()
.ToListAsync();