Home > database >  How to Pause/Resume an asynchronous worker, without incurring memory allocation overhead?
How to Pause/Resume an asynchronous worker, without incurring memory allocation overhead?

Time:07-20

I have an asynchronous method that contains an endless while loop. This loop is paused and resumed frequently by the main thread of the program. For this purpose I am using currently the PauseTokenSource class from the Nito.AsyncEx package:

private readonly PauseTokenSource _pausation = new();

private async Task StartWorker()
{
    while (true)
    {
        await _pausation.Token.WaitWhilePausedAsync();
        DoSomething();
    }
}

This works pretty well, but I noticed that around 100 bytes are allocated each time the PauseTokenSource is paused and resumed. Since this happens many times per second, I am searching for a way to eliminate this overhead. My idea is to replace the PauseTokenSource with a home-made pausation mechanism that has a WaitWhilePausedAsync method that returns a ValueTask instead of Task. To make the implementation allocation-free, the ValueTask should be backed by something that implements the IValueTaskSource interface. Here is my current (failed) attempt to implement this mechanism:

public class Pausation : IValueTaskSource
{
    private ManualResetValueTaskSourceCore<bool> _source;
    private bool _paused;

    public Pausation() => _source.RunContinuationsAsynchronously = true;

    public void Pause()
    {
        _paused = true;
        _source.Reset();
    }

    public void Resume()
    {
        _paused = false;
        _source.SetResult(default);
    }

    public ValueTask WaitWhilePausedAsync()
    {
        if (!_paused) return ValueTask.CompletedTask;
        return new ValueTask(this, _source.Version);
    }

    void IValueTaskSource.GetResult(short token)
    {
        _source.GetResult(token);
    }

    ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
    {
        return _source.GetStatus(token);
    }

    void IValueTaskSource.OnCompleted(Action<object> continuation, object state,
        short token, ValueTaskSourceOnCompletedFlags flags)
    {
        _source.OnCompleted(continuation, state, token, flags);
    }
}

My Pausation class is based on the ManualResetValueTaskSourceCore<T> struct, which is supposed to simplify the most common IValueTaskSource implementations. Apparently I am doing something wrong, because when my worker awaits the WaitWhilePausedAsync method, it crashes with an InvalidOperationException.

My question is: How can I fix the Pausation class, so that it works correctly? Should I add more state beyond the _paused field? Am I calling the ManualResetValueTaskSourceCore<T> methods in the wrong places? I am asking either for detailed fixing instructions, or for a complete and working implementation.

Specifics: The Pausation class is intended to be used in a single worker - single controller scenario. There is only one asynchronous worker (the StartWorker method), and only one controller thread issues Pause and Resume commands. Also there is no need for cancellation support. The termination of the worker is handled independently by a CancellationTokenSource (removed from the above snippet for brevity). The only functionality that is needed is the Pause, Resume and WaitWhilePausedAsync methods. The only requirement is that it works correctly, and it doesn't allocate memory.

A runnable online demo of my worker - controller scenario can be found here.

Output with the Nito.AsyncEx.PauseTokenSource class:

Controller loops: 112,748
Worker loops: 84, paused: 36 times

Output with my Pausation class:

Controller loops: 117,397
Unhandled exception. System.InvalidOperationException: Operation is not valid due to the current state of the object.
   at System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore`1.GetStatus(Int16 token)
   at Program.Pausation.System.Threading.Tasks.Sources.IValueTaskSource.GetStatus(Int16 token)
   at Program.<>c__DisplayClass0_0.<<Main>g__StartWorker|0>d.MoveNext()

CodePudding user response:

I believe you're misunderstanding Reset. ManualResetValueTaskSourceCore<T>.Reset is nothing at all like ManualResetEvent.Reset. For ManualResetValueTaskSourceCore<T>, Reset means "the previous operation has completed, and now I want to reuse the value task, so change the version". So, it should be called after GetResult (i.e., after the paused code is done awaiting), not within Pause. The easiest way to encapsulate this IMO is to Reset the state immediately before returning the ValueTask.

Similarly, your code shouldn't call SetResult unless there's a value task already returned. ManualResetValueTaskSourceCore<T> is really designed around sequential operations pretty strictly, and having a separate "controller" complicates things. You could probably get it working by keeping track of whether or not the consumer is waiting, and only attempting to complete if there is one:

public class Pausation : IValueTaskSource
{
    private ManualResetValueTaskSourceCore<bool> _source;
    private readonly object _mutex = new();
    private bool _paused;
    private bool _waiting;

    public Pausation() => _source.RunContinuationsAsynchronously = true;

    public void Pause()
    {
        lock (_mutex)
            _paused = true;
    }

    public void Resume()
    {
        var wasWaiting = false;
        lock (_mutex)
        {
            wasWaiting = _waiting;
            _paused = _waiting = false;
        }

        if (wasWaiting)
            _source.SetResult(default);
    }

    public ValueTask WaitWhilePausedAsync()
    {
        lock (_mutex)
        {
            if (!_paused) return ValueTask.CompletedTask;
            _waiting = true;
            _source.Reset();
            return new ValueTask(this, _source.Version);
        }
    }

    void IValueTaskSource.GetResult(short token)
    {
        _source.GetResult(token);
    }

    ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
    {
        return _source.GetStatus(token);
    }

    void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
    {
        _source.OnCompleted(continuation, state, token, flags);
    }
}

I fixed some of the obvious race conditions using a mutex, but I can't guarantee there aren't more subtle ones remaining. It will certainly be limited to a single controller and single consumer, at least.

CodePudding user response:

You may have misunderstood the purpose of ValueTask. Stephen Toub breaks down why it was introduced here: https://devblogs.microsoft.com/dotnet/understanding-the-whys-whats-and-whens-of-valuetask/

But the most relevant bit of information is that ValueTask does not allocate iff the asynchronous method completes synchronously and successfully.

If it does need to complete asynchronously, there is no getting around needing to allocate a Task class due to the state machine in the background. So this may need another solution entirely.

  • Related