I have an Async processing pipeline. I'm implementing a constraint such that I need to limit the number of submissions to the next stage. For my component, I have:
- a single input source (items are tagged with a source id)
- a single destination that I need to propagate the inputs to in a round-robin fashion
If capacity is available for multiple clients, I'll forward a message for each (i.e. if I wake because client 3's semaphore has finally become available, I may first send a message for client 2, then 3, etc)
The processing loop is thus waiting on one or more of the following conditions to continue processing:
- more input has arrived (it might be for a client that is not at its limit)
- capacity has been released for a client that we are holding data for
Ideally, I'd thus use Task.WhenAny
with
- a task representing the input
c.Reader.WaitToReadAsync(ct).AsTask()
- N tasks representing the clients for which we are holding data, but it's not yet valid for submission (the
Wait
for theSemaphoreSlim
would fail)
SemaphoreSlim's AvailableWaitHandle
would be ideal - I want to know when it's available but I don't want to reserve it yet as I have a chain of work to process - I just want to know if one of my trigger conditions has arisen
Is there a way to await
the AvailableWaitHandle
?
My current approach is a hack derived from this answer to a similar question by @usr - posting for reference
My actual code is here - there's also some more detail about the whole problem in my self-answer below
CodePudding user response:
I want to know when it's available but I don't want to reserve it yet as I have a chain of work to process
This is very strange and it seems like SemaphoreSlim
may not be what you want to use. SemaphoreSlim
is a kind of mutual exclusion object that can allow multiple takers. It is sometimes used for throttling. But I would not want to use it as a signal.
It seems like something more like an asynchronous manual-reset event would be what you really want. Or, if you wanted to maintain a locking/concurrent-collection kind of concept, an asynchronous monitor or condition variable.
That said, it is possible to use a SemaphoreSlim
as a signal. I just strongly hesitate suggesting this as a solution, since it seems like this requirement is highlighting a mistake in the choice of synchronization primitive.
Is there a way to await the AvailableWaitHandle?
Yes. You can await
anything by using TaskCompletionSource
. For WaitHandle
s in particular, ThreadPool.RegisterWaitForSingleObject
gives you an efficient wait.
So, what you want to do is create a TCS, register the handle with the thread pool, and complete the TCS in the callback for that handle. Keep in mind that you want to be sure that the TCS is eventually completed and that everything is disposed properly.
I have support for this in my AsyncEx library (WaitHandleAsyncFactory.FromWaitHandle
); code is here.
My AsyncEx library also has support for asynchronous manual-reset events, monitors, and condition variables.
CodePudding user response:
Variation of @usr's answer which solved my problem
class SemaphoreSlimExtensions
public static Task AwaitButReleaseAsync(this SemaphoreSlim s) =>
s.WaitAsync().ContinueWith(_t -> s.Release(), TaskContinuationOptions.ExecuteSynchronously);
public static bool TryTake(this SemaphoreSlim s) =>
s.Wait(0);
In my use case, the await
is just a trigger for synchronous logic that then walks the full set - the TryTake
helper is in my case a natural way to handle the conditional acquisition of the semaphore and the processing that's contingent on that. My wait looks like this:
SemaphoreSlim[] throttled = Enumerable.Empty();
while (!ct.IsCancellationRequested)
{
var throttledClients = from s in throttled select s.AwaitButReleaseAsync();
var timeout = 3000;
var otherConditions = new[] { input.Reader.WaitToReadAsync().ToTask(), Task.Delay(ct, timeout) };
await Task.WhenAny(throttledClients.Append(otherConditions));
throttled = propagateStuff();
}
The actual code is here - I have other cases that follow the same general pattern. The bottom line is that I want to separate the concern of waiting for the availability of capacity on a SemaphoreSlim
from actually reserving that capacity.