Home > Blockchain >  Using Task.WhenAny to await capacity on a SemaphoreSlim
Using Task.WhenAny to await capacity on a SemaphoreSlim

Time:08-04

I have an Async processing pipeline. I'm implementing a constraint such that I need to limit the number of submissions to the next stage. For my component, I have:

  • a single input source (items are tagged with a source id)
  • a single destination that I need to propagate the inputs to in a round-robin fashion

If capacity is available for multiple clients, I'll forward a message for each (i.e. if I wake because client 3's semaphore has finally become available, I may first send a message for client 2, then 3, etc)

The processing loop is thus waiting on one or more of the following conditions to continue processing:

  • more input has arrived (it might be for a client that is not at its limit)
  • capacity has been released for a client that we are holding data for

Ideally, I'd thus use Task.WhenAny with

  • a task representing the input c.Reader.WaitToReadAsync(ct).AsTask()
  • N tasks representing the clients for which we are holding data, but it's not yet valid for submission (the Wait for the SemaphoreSlim would fail)

SemaphoreSlim's AvailableWaitHandle would be ideal - I want to know when it's available but I don't want to reserve it yet as I have a chain of work to process - I just want to know if one of my trigger conditions has arisen

Is there a way to await the AvailableWaitHandle ?

My current approach is a hack derived from this answer to a similar question by @usr - posting for reference

My actual code is here - there's also some more detail about the whole problem in my self-answer below

CodePudding user response:

I want to know when it's available but I don't want to reserve it yet as I have a chain of work to process

This is very strange and it seems like SemaphoreSlim may not be what you want to use. SemaphoreSlim is a kind of mutual exclusion object that can allow multiple takers. It is sometimes used for throttling. But I would not want to use it as a signal.

It seems like something more like an asynchronous manual-reset event would be what you really want. Or, if you wanted to maintain a locking/concurrent-collection kind of concept, an asynchronous monitor or condition variable.

That said, it is possible to use a SemaphoreSlim as a signal. I just strongly hesitate suggesting this as a solution, since it seems like this requirement is highlighting a mistake in the choice of synchronization primitive.

Is there a way to await the AvailableWaitHandle?

Yes. You can await anything by using TaskCompletionSource. For WaitHandles in particular, ThreadPool.RegisterWaitForSingleObject gives you an efficient wait.

So, what you want to do is create a TCS, register the handle with the thread pool, and complete the TCS in the callback for that handle. Keep in mind that you want to be sure that the TCS is eventually completed and that everything is disposed properly.

I have support for this in my AsyncEx library (WaitHandleAsyncFactory.FromWaitHandle); code is here.

My AsyncEx library also has support for asynchronous manual-reset events, monitors, and condition variables.

CodePudding user response:

Variation of @usr's answer which solved my problem

class SemaphoreSlimExtensions

    public static Task AwaitButReleaseAsync(this SemaphoreSlim s) => 
        s.WaitAsync().ContinueWith(_t -> s.Release(), TaskContinuationOptions.ExecuteSynchronously);
    public static bool TryTake(this SemaphoreSlim s) => 
        s.Wait(0);

In my use case, the await is just a trigger for synchronous logic that then walks the full set - the TryTake helper is in my case a natural way to handle the conditional acquisition of the semaphore and the processing that's contingent on that. My wait looks like this:

SemaphoreSlim[] throttled = Enumerable.Empty();
while (!ct.IsCancellationRequested)
{
    var throttledClients = from s in throttled select s.AwaitButReleaseAsync();
    var timeout = 3000;
    var otherConditions = new[] { input.Reader.WaitToReadAsync().ToTask(), Task.Delay(ct, timeout) };
    
    await Task.WhenAny(throttledClients.Append(otherConditions));
    throttled = propagateStuff();
}

The actual code is here - I have other cases that follow the same general pattern. The bottom line is that I want to separate the concern of waiting for the availability of capacity on a SemaphoreSlim from actually reserving that capacity.

  • Related