When sending data fast enough, InvalidOperationException is being thrown: 'There is already one outstanding 'SendAsync' call for this WebSocket instance. ClientWebSocket.ReceiveAsync
and ClientWebSocket.SendAsync
can be called simultaneously, but at most one outstanding operation for each of them is allowed at the same time.
The first example uses SemaphoreSlim which allows only one message at a time that prevents that issue from happening.
The question is do I need to do the same SemaphoreSlim
workaround in the second example since SingleReader = true
is specified in the channel options? It should basically be the same, but I would like someone to confirm it, so there are no surprises.
Without System.Threading.Channels
private readonly SemaphoreSlim sendAsyncSemaphore = new SemaphoreSlim(1);
public async Task<bool> SendMessageAsync(ReadOnlyMemory<byte> message)
{
if (_webSocket.State != WebSocketState.Open)
{
return false;
}
// Only one message can be sent at a time. Wait until a message can be sent.
await sendAsyncSemaphore.WaitAsync().ConfigureAwait(false);
// We can now send a message. Our thread has entered sendAsyncSemaphore, so we should be the one to release it.
try
{
await _Client.SendAsync(message, WebSocketMessageType.Text, true, connectionCancellation.Token).ConfigureAwait(true);
}
catch (TaskCanceledException)
{
sendAsyncSemaphore.Release();
return false;
}
// Our thread can now release the sendAsyncSemaphore so another message can be sent.
sendAsyncSemaphore.Release();
return true;
}
public void Dispose()
{
sendAsyncSemaphore.Dispose();
}
With System.Threading.Channels
public class WebSocketClient
{
private readonly WebSocket _webSocket;
private readonly Channel<string> _input;
private readonly Channel<string> _output;
public WebSocketClient(WebSocket webSocket)
{
_webSocket = webSocket;
_input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleWriter = true
});
_output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleReader = true
});
}
private async Task SendLoopAsync(CancellationToken cancellationToken)
{
await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
{
if (_webSocket.State != WebSocketState.Open)
{
return;
}
var bytesToSend = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
await _webSocket.SendAsync(bytesToSend, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
}
}
}
CodePudding user response:
Short answer yes! You still need something to limit the number of outstanding read calls when SingleReader = true
. The documentation clearly states, you set that property to true 'if readers to the channel guarantee that there will only ever be at most one read operation at a time'. So the property does not provide limitation on the senders.
Using a semaphore or any other synchronization primitive is not a work around in my opinion. If you take a look at the code, you'll see that setting the SingleReader
option gets you a SingleConsumerUnboundedChannel
. If you don't limit the readers yourself you'll end up unintentionally cancelling read operations.
CodePudding user response:
If you set SingleReader = true
you unnecessary use SemaphoreSlim
because that only allows true
readers from the channel to guarantee that there will only ever be at most one read operation at a time.
true
readers from the channel guarantee that there will only ever be at most one read operation at a time; if no such constraint is guaranteed.false
BTW you are only defining initialCount
but didn't limit the maxCount
which means you will unlimited signal count, here is the SemaphoreSlim source code, NO_MAXIMUM
is a const
value int.MaxValue
.
// No maximum constant
private const int NO_MAXIMUM = int.MaxValue;
public SemaphoreSlim(int initialCount)
: this(initialCount, NO_MAXIMUM)
{
}
you can try to use another construct method which provides 2 parameters, the first one is initialCount
second one is setting maxCount
private readonly SemaphoreSlim sendAsyncSemaphore = new SemaphoreSlim(1,1);