I have an asynchronous method that contains an endless while
loop. This loop is paused and resumed frequently by the main thread of the program. For this purpose I am using currently the PauseTokenSource
class from the Nito.AsyncEx package:
private readonly PauseTokenSource _pausation = new();
private async Task StartWorker()
{
while (true)
{
await _pausation.Token.WaitWhilePausedAsync();
DoSomething();
}
}
This works pretty well, but I noticed that around 100 bytes are allocated each time the PauseTokenSource
is paused and resumed. Since this happens many times per second, I am searching for a way to eliminate this overhead. My idea is to replace the PauseTokenSource
with a home-made pausation mechanism that has a WaitWhilePausedAsync
method that returns a ValueTask
instead of Task
. To make the implementation allocation-free, the ValueTask
should be backed by something that implements the IValueTaskSource
interface. Here is my current (failed) attempt to implement this mechanism:
public class Pausation : IValueTaskSource
{
private ManualResetValueTaskSourceCore<bool> _source;
private bool _paused;
public Pausation() => _source.RunContinuationsAsynchronously = true;
public void Pause()
{
_paused = true;
_source.Reset();
}
public void Resume()
{
_paused = false;
_source.SetResult(default);
}
public ValueTask WaitWhilePausedAsync()
{
if (!_paused) return ValueTask.CompletedTask;
return new ValueTask(this, _source.Version);
}
void IValueTaskSource.GetResult(short token)
{
_source.GetResult(token);
}
ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
{
return _source.GetStatus(token);
}
void IValueTaskSource.OnCompleted(Action<object> continuation, object state,
short token, ValueTaskSourceOnCompletedFlags flags)
{
_source.OnCompleted(continuation, state, token, flags);
}
}
My Pausation
class is based on the ManualResetValueTaskSourceCore<T>
struct, which is supposed to simplify the most common IValueTaskSource
implementations. Apparently I am doing something wrong, because when my worker await
s the WaitWhilePausedAsync
method, it crashes with an InvalidOperationException
.
My question is: How can I fix the Pausation
class, so that it works correctly? Should I add more state beyond the _paused
field? Am I calling the ManualResetValueTaskSourceCore<T>
methods in the wrong places? I am asking either for detailed fixing instructions, or for a complete and working implementation.
Specifics: The Pausation
class is intended to be used in a single worker - single controller scenario. There is only one asynchronous worker (the StartWorker
method), and only one controller thread issues Pause
and Resume
commands. Also there is no need for cancellation support. The termination of the worker is handled independently by a CancellationTokenSource
(removed from the above snippet for brevity). The only functionality that is needed is the Pause
, Resume
and WaitWhilePausedAsync
methods. The only requirement is that it works correctly, and it doesn't allocate memory.
A runnable online demo of my worker - controller scenario can be found here.
Output with the Nito.AsyncEx.PauseTokenSource
class:
Controller loops: 112,748
Worker loops: 84, paused: 36 times
Output with my Pausation
class:
Controller loops: 117,397
Unhandled exception. System.InvalidOperationException: Operation is not valid due to the current state of the object.
at System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore`1.GetStatus(Int16 token)
at Program.Pausation.System.Threading.Tasks.Sources.IValueTaskSource.GetStatus(Int16 token)
at Program.<>c__DisplayClass0_0.<<Main>g__StartWorker|0>d.MoveNext()
CodePudding user response:
I believe you're misunderstanding Reset
. ManualResetValueTaskSourceCore<T>.Reset
is nothing at all like ManualResetEvent.Reset
. For ManualResetValueTaskSourceCore<T>
, Reset
means "the previous operation has completed, and now I want to reuse the value task, so change the version". So, it should be called after GetResult
(i.e., after the paused code is done awaiting), not within Pause
. The easiest way to encapsulate this IMO is to Reset
the state immediately before returning the ValueTask
.
Similarly, your code shouldn't call SetResult
unless there's a value task already returned. ManualResetValueTaskSourceCore<T>
is really designed around sequential operations pretty strictly, and having a separate "controller" complicates things. You could probably get it working by keeping track of whether or not the consumer is waiting, and only attempting to complete if there is one:
public class Pausation : IValueTaskSource
{
private ManualResetValueTaskSourceCore<bool> _source;
private readonly object _mutex = new();
private bool _paused;
private bool _waiting;
public Pausation() => _source.RunContinuationsAsynchronously = true;
public void Pause()
{
lock (_mutex)
_paused = true;
}
public void Resume()
{
var wasWaiting = false;
lock (_mutex)
{
wasWaiting = _waiting;
_paused = _waiting = false;
}
if (wasWaiting)
_source.SetResult(default);
}
public ValueTask WaitWhilePausedAsync()
{
lock (_mutex)
{
if (!_paused) return ValueTask.CompletedTask;
_waiting = true;
_source.Reset();
return new ValueTask(this, _source.Version);
}
}
void IValueTaskSource.GetResult(short token)
{
_source.GetResult(token);
}
ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
{
return _source.GetStatus(token);
}
void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
_source.OnCompleted(continuation, state, token, flags);
}
}
I fixed some of the obvious race conditions using a mutex, but I can't guarantee there aren't more subtle ones remaining. It will certainly be limited to a single controller and single consumer, at least.
CodePudding user response:
You may have misunderstood the purpose of ValueTask
. Stephen Toub breaks down why it was introduced here: https://devblogs.microsoft.com/dotnet/understanding-the-whys-whats-and-whens-of-valuetask/
But the most relevant bit of information is that ValueTask
does not allocate iff the asynchronous method completes synchronously and successfully.
If it does need to complete asynchronously, there is no getting around needing to allocate a Task
class due to the state machine in the background. So this may need another solution entirely.