Home > Mobile >  Does ChannelReader<T>.ReadAllAsync throw any exceptions when being canceled by a CancellationT
Does ChannelReader<T>.ReadAllAsync throw any exceptions when being canceled by a CancellationT

Time:04-19

Does ChannelReader<T>.ReadAllAsync throw any exceptions when being canceled by a CancellationToken? It doesn't seem to be throwing OperationCanceledException/TaskCanceledException?

I know if these two methods were called in a fire and forget manner, i.e. _ = SendLoopAsync(); _ = ReceiveLoopAsync();, it would've crashed the task with no displayed message/exception because they were not awaited, meaning that we're losing the exceptions.

I don't want it to crash that task without letting me know that it actually has crashed/been cancelled, which means I should probably wrap the whole SendLoopAsync in a try/catch instead of what's between ReadAllAsync's branches.

A small example representing its behavior will be appreciated.

var clientWebSocket = new ClientWebSocket();
await clientWebSocket.ConnectAsync(new Uri("wss://www.deribit.com/ws/api/v2"), CancellationToken.None).ConfigureAwait(false);

var client = new ChannelWebSocket(clientWebSocket);

for (var i = 1; i <= 10; i  )
{
    client.Output.TryWrite($"Item: {i}");
}

var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(1));
await client.StartAsync(cts.Token).ConfigureAwait(false); // blocks the UI

Console.ReadLine();

public class ChannelExample
{
    private readonly WebSocket _webSocket;
    private readonly Channel<string> _input;
    private readonly Channel<string> _output;

    public ChannelExample(WebSocket webSocket)
    {
        _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));

        _input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleWriter = true
        });

        _output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleReader = true
        });
    }

    public ChannelReader<string> Input => _input.Reader;
    public ChannelWriter<string> Output => _output.Writer;

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var receiving = ReceiveLoopAsync(cancellationToken);
        var sending = SendLoopAsync(cancellationToken);

        var completedTask = await Task.WhenAny(receiving, sending).ConfigureAwait(false);

        if (completedTask.Exception != null)
        {
            Console.WriteLine("Exception");
        }
    }

    private async Task SendLoopAsync(CancellationToken cancellationToken)
    {
        await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
        {
            Console.WriteLine($"Sending: {message}");
            await Task.Delay(5000, cancellationToken).ConfigureAwait(false);
        }
    }

    private async Task ReceiveLoopAsync(CancellationToken cancellationToken)
    {
        using var buffer = MemoryPool<byte>.Shared.Rent();

        while (_webSocket.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested)
        {
            ValueWebSocketReceiveResult receiveResult;
            do
            {
                receiveResult = await _webSocket.ReceiveAsync(buffer.Memory, cancellationToken).ConfigureAwait(false);

                if (receiveResult.MessageType == WebSocketMessageType.Close)
                {
                    return;
                }
            } while (!receiveResult.EndOfMessage);
        }
    }
}

CodePudding user response:

I suspect that it would throw; of course, you can always test that, but - that is the general expected pattern in this scenario. So you would wrap it with a:

try
{
   // ...
}
catch (OperationCancelledException) when (cancellationToken.IsCancellationRequested)
{
    // treat as completion; swallow
}

Alternatively: you could pass CancellationToken.None into the channel read API, and just use the writer's completion to signify exit (making sure that you call .Complete(...) on the writer when exiting).

That said: ReadAllAsync probably isn't the preferred API here, since you don't really need it as IAsyncEnumerable<T> - so it may be preferable to use the native channel API, i.e.

while (await _output.Reader.WaitToReadAsync(cancellationToken))
{
    while (_output.Reader.TryRead(out var message))
    {
        // ...
    }
}

CodePudding user response:

I am not sure what does the Task returned by the StartAsync represent:

public async Task StartAsync(CancellationToken cancellationToken)
{
    var receiving = ReceiveLoopAsync(cancellationToken);
    var sending = SendLoopAsync(cancellationToken);

    var completedTask = await Task.WhenAny(receiving, sending).ConfigureAwait(false);

    if (completedTask.Exception != null)
    {
        Console.WriteLine("Exception");
    }
}

It seems that it represents the completion of any of the receiving and sending tasks, which is weird. Most probably this is an unintended consequence of trying to log the exceptions of the tasks. There are better ways to log task exceptions than this, with the simplest being to enclose all the code inside the asynchronous method in a try/catch block. Beyond that, the Exception property of a Task is not-null only when the task IsFaulted, not when it IsCanceled.

  • Related