Home > Mobile >  Is SemaphoreSlim needed when Channel's SingleReader is set to true
Is SemaphoreSlim needed when Channel's SingleReader is set to true

Time:04-08

When sending data fast enough, InvalidOperationException is being thrown: 'There is already one outstanding 'SendAsync' call for this WebSocket instance. ClientWebSocket.ReceiveAsync and ClientWebSocket.SendAsync can be called simultaneously, but at most one outstanding operation for each of them is allowed at the same time.

The first example uses SemaphoreSlim which allows only one message at a time that prevents that issue from happening.

The question is do I need to do the same SemaphoreSlim workaround in the second example since SingleReader = true is specified in the channel options? It should basically be the same, but I would like someone to confirm it, so there are no surprises.

Without System.Threading.Channels

private readonly SemaphoreSlim sendAsyncSemaphore = new SemaphoreSlim(1);

public async Task<bool> SendMessageAsync(ReadOnlyMemory<byte> message)
{
    if (_webSocket.State != WebSocketState.Open)
    {
        return false;
    }
    
    // Only one message can be sent at a time. Wait until a message can be sent.
    await sendAsyncSemaphore.WaitAsync().ConfigureAwait(false);
    
    // We can now send a message. Our thread has entered sendAsyncSemaphore, so we should be the one to release it.
    try
    {
        await _Client.SendAsync(message, WebSocketMessageType.Text, true, connectionCancellation.Token).ConfigureAwait(true);
    }
    catch (TaskCanceledException)
    {
        sendAsyncSemaphore.Release();
        return false;
    }
    
    // Our thread can now release the sendAsyncSemaphore so another message can be sent.
    sendAsyncSemaphore.Release();
    return true;
}

public void Dispose()
{
    sendAsyncSemaphore.Dispose();
}

With System.Threading.Channels

public class WebSocketClient
{
    private readonly WebSocket _webSocket;
    private readonly Channel<string> _input;
    private readonly Channel<string> _output;

    public WebSocketClient(WebSocket webSocket)
    {
        _webSocket = webSocket;

        _input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleWriter = true
        });

        _output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleReader = true
        });
    }

    private async Task SendLoopAsync(CancellationToken cancellationToken)
    {
        await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
        {
            if (_webSocket.State != WebSocketState.Open)
            {
                return;
            }

            var bytesToSend = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
            await _webSocket.SendAsync(bytesToSend, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
        }
    }
}

CodePudding user response:

Short answer yes! You still need something to limit the number of outstanding read calls when SingleReader = true. The documentation clearly states, you set that property to true 'if readers to the channel guarantee that there will only ever be at most one read operation at a time'. So the property does not provide limitation on the senders.

Using a semaphore or any other synchronization primitive is not a work around in my opinion. If you take a look at the code, you'll see that setting the SingleReader option gets you a SingleConsumerUnboundedChannel. If you don't limit the readers yourself you'll end up unintentionally cancelling read operations.

CodePudding user response:

If you set SingleReader = true you unnecessary use SemaphoreSlim because that only allows true readers from the channel to guarantee that there will only ever be at most one read operation at a time.

true readers from the channel guarantee that there will only ever be at most one read operation at a time; if no such constraint is guaranteed.false

BTW you are only defining initialCount but didn't limit the maxCount which means you will unlimited signal count, here is the SemaphoreSlim source code, NO_MAXIMUM is a const value int.MaxValue.

// No maximum constant
private const int NO_MAXIMUM = int.MaxValue;


public SemaphoreSlim(int initialCount)
    : this(initialCount, NO_MAXIMUM)
{
}

you can try to use another construct method which provides 2 parameters, the first one is initialCount second one is setting maxCount

private readonly SemaphoreSlim sendAsyncSemaphore = new SemaphoreSlim(1,1);
  • Related