Home > Mobile >  How to make multiple concurrent calls to async function collapse into 1 task?
How to make multiple concurrent calls to async function collapse into 1 task?

Time:08-30

I have a C# async function like SlowRewriteFolder(), and I have multiple calls of this function coming in asynchronously.

If a call to this function is already processing, I want subsequent callers to not kick off the work that this function does again and instead wait on the same result (especially while the first one is still in progress).

How can I make it so that the Task created for the first call is shared among subsequent callers while it is still in progress?

I have considered caching the Task instance and returning that if it is available and clearing it when the work is complete, but is that the best approach?

CodePudding user response:

I have considered caching the Task instance and returning that if it is available and clearing it when the work is complete, but is that the best approach?

-ish. You'll need to ensure that "return or restart" logic is thread-safe.

Something like this...

  • My simplistic approach uses a basic lock() inside a non-async-but-Task-returning method to do the job of swapping/resetting Task instances stored in a class field.

    • Note that the volatile keyword isn't needed here at all, enter image description here

      Note that it gets far more gnarly if you want to use CancellationToken with SlowRewriteFolderAsync as the SlowRewriteFolderImplAsync will only have access to the CancellationToken of the first invocation, so subsequent invocations cannot be canceled.

      CodePudding user response:

      Here is a component similar in shape with the AsyncLazy<T> type (also available in the Nito.AsyncEx library by Stephen Cleary), that has a behavior tailored to your needs:

      /// <summary>
      /// Represents an asynchronous operation that is invoked lazily on demand, can be
      /// invoked multiple times, and is subject to a non-concurrent execution policy.
      /// Concurrent observers receive the result of the same operation.
      /// </summary>
      public class AsyncCollapseConcurrent
      {
          private readonly Func<Task> _taskFactory;
          private volatile Task _task;
      
          public AsyncCollapseConcurrent(Func<Task> taskFactory)
          {
              ArgumentNullException.ThrowIfNull(taskFactory);
              _taskFactory = taskFactory;
          }
      
          public Task Task
          {
              get
              {
                  Task capturedTask = _task;
                  if (capturedTask is not null) return capturedTask;
                  Task<Task> newTaskTask = new(_taskFactory);
                  Task newTask = newTaskTask.Unwrap().ContinueWith(t =>
                  {
                      _task = null;
                      return t;
                  }, default, TaskContinuationOptions.DenyChildAttach |
                      TaskContinuationOptions.ExecuteSynchronously,
                      TaskScheduler.Default).Unwrap();
                  capturedTask = Interlocked
                      .CompareExchange(ref _task, newTask, null) ?? newTask;
                  if (ReferenceEquals(capturedTask, newTask))
                      newTaskTask.RunSynchronously(TaskScheduler.Default);
                  return capturedTask;
              }
          }
      
          public TaskAwaiter GetAwaiter() => Task.GetAwaiter();
      
          public ConfiguredTaskAwaitable ConfigureAwait(bool continueOnCapturedContext)
              => Task.ConfigureAwait(continueOnCapturedContext);
      }
      

      Usage example:

      private readonly AsyncCollapseConcurrent _asyncLazy;
      
      //...
      
      _asyncLazy = new(() => SlowRewriteFolderAsync());
      
      //...
      
      await _asyncLazy;
      

      The AsyncCollapseConcurrent ensures that the taskFactory will not be invoked concurrently, by creating a cold nested Task<Task> using the Task<T> constructor, and starting this task only in case the atomic Interlocked.CompareExchange operation succeeds. Otherwise, in case the race to update the _task field is won by another thread, the current thread discards the cold Task<Task> without starting it.

      I have used this technique for implementing various AsyncLazy<T> variants, like this (with retry) or this (with expiration).

      In case your SlowRewriteFolderAsync method returns a generic Task<TResult>, you can find a compatible generic AsyncCollapseConcurrent<TResult> class here.

      CodePudding user response:

      If you only ever want the task to run once with multiple callers then the easy way is with Lazy<T>.

      Try this:

      public Lazy<Task<List<String>>> SlowRewriteFolderAsyncLazy =>
          new Lazy<Task<List<String>>>(() => SlowRewriteFolderAsync());
      

      You then call it like this:

      Lazy<Task<List<String>>> lazy = SlowRewriteFolderAsyncLazy;
      Task<List<String>> task = lazy.Value;
      List<String> value = await task;
      

      The task within the Lazy<> type doesn't begin to run until the first caller invokes the .Value property, so this is safe to define SlowRewriteFolderAsyncLazy as a property.

      All subsequent callers get the same completed task.

  • Related