Home > Net >  Processing a queue quickly with async/await
Processing a queue quickly with async/await

Time:11-03

I often have scenarios where I want to queue data to be processed, and then process it in the background so that the main program can continue. I've written the following class to try and help with this:

public class ProcessQueue<T>
{
    #region Properties

    public Task ProcessingComplete => tcs.Task;

    #endregion

    #region Variables

    private ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
    private bool processingQueue = false;
    private bool stopped = false;

    private Func<T, Task> processItemFunc;
    private TaskCompletionSource tcs = new TaskCompletionSource();

    #endregion

    public ProcessQueue(Func<T, Task> processItemFunc)
    {
        this.processItemFunc = processItemFunc;
    }

    public void EnqueueAndProcess(T data)
    {
        if (stopped)
        {
            return;
        }

        queue.Enqueue(data);

        ProcessQueuedItems();
    }

    public void Stop()
    {
        stopped = true;

        queue.Clear();
        tcs.SetResult();
    }

    private void ProcessQueuedItems()
    {
        if (processingQueue)
        {
            return;
        }

        processingQueue = true;
        tcs = new TaskCompletionSource();

        new TaskFactory().StartNew(async () =>
        {
            while (queue.Count > 0)
            {
                if (queue.TryDequeue(out var item))
                {
                    await processItemFunc(item);
                }
            }

            processingQueue = false;
            tcs.SetResult();
        }, TaskCreationOptions.LongRunning);
    }
}

It seems fairly straightforward so I wrote some unit tests for it, such as the following:

[TestClass]
public class ProcessQueueTests
{
    private ProcessQueue<int> sut;
    private List<int> processedOutput = new();

    [TestInitialize]
    public void Initialise()
    {
        sut = new ProcessQueue<int>(ProcessAsync);
    }

    [TestMethod]
    public async Task Test1()
    {
        for (int i = 0; i < 1000; i  )
        {
            sut.EnqueueAndProcess(i);
        }

        await sut.ProcessingComplete;

        Assert.AreEqual(1000, processedOutput.Count);

        for (int i = 0; i < processedOutput.Count; i  )
        {
            Assert.AreEqual(i, processedOutput[i]);
        }
    }

    private async Task ProcessAsync(int value)
    {
        await Task.Delay(1);

        processedOutput.Add(value);
    }
}

I've put a delay in there to see how that affects it, and indeed for the above test it is constantly taking nearly 16 seconds to complete, when I would have hoped for slightly over 1 second.

Why is this taking so long? How can it be faster?

CodePudding user response:

Task.Delay is just pretty low resolution. As documentation states:

This method depends on the system clock. This means that the time delay will approximately equal the resolution of the system clock if the millisecondsDelay argument is less than the resolution of the system clock, which is approximately 15 milliseconds on Windows systems.

Your argument (1) is less than 15 milliseconds, so it gets adjusted to that. You can just do:

var watch = Stopwatch.StartNew();
for (int i = 0; i < 1000; i  ) {
    await Task.Delay(1);
}
watch.Stop();
Console.WriteLine($"Took {watch.ElapsedMilliseconds}ms");

to reproduce that. 1000 * 15ms = 15 seconds, which is about what you report.

  • Related