Home > Software design >  Best way to order parallel processing of commands with the same attribute
Best way to order parallel processing of commands with the same attribute

Time:05-31

I have a queue of commands. Some of them have the same attributes, say "document id". I need to be able to process them all in parallel, but with one restriction: commands with same features should be processed in the order they appeared in the queue.

For example: My queue is [n, a, s, j, a, l, v, g, a, f, f], where letters are DocumentIds. I need parallel processing, but the processing of 'a's should be in the order they appear in the queue, i.e. [1, 4, 8], where numbers are ids of letters in the queue. So, it doesn't matter in which order those elements are processed, as long as [8] is after [4], which is after [1] (with any number of intermediate items between them).

First, I tried SemaphoreSlim locking on DocumentId. That means, if we take an item to process, we block other threads if they should process the same item. This didn't work, because SemaphoreSlim doesn't guarantee FIFO order of unblocking.

Then, I made a wrapper aroung SemaphoreSlim, to enforce FIFO unblocking:

public class FifoAsyncLock : IDisposable
{
    private readonly SemaphoreSlim _sem = new (1, 1);
    private readonly ConcurrentQueue<TaskCompletionSource> _queue = new ();

    public async Task WaitAsync()
    {
        var tcsE = new TaskCompletionSource();
        _queue.Enqueue(tcsE);

        await _sem.WaitAsync();
        if (_queue.TryDequeue(out var tcsD))
        tcsD.SetResult();

        await tcsE.Task;
    }

    public void Release()
    {
        _sem.Release();
    }

    public void Dispose()
    {
        _sem.Dispose();
    }
}

I used it in a class, where I stored a semaphore for each DocumentId, and also keeped count of how many lock users are waiting to be unblocked. If the last user releases the lock, it's deleted (because memory):

public class DocIdLocker : IDisposable
{
    private readonly ConcurrentDictionary<Guid, FifoAsyncLock> _docIdLocks = new ();
    private readonly ConcurrentDictionary<Guid, int> _users = new ();

    private bool _disposed;

    public async Task<IAsyncDisposable> AquireLockAsync(Guid docId)
    {
        var userCount = _users.AddOrUpdate(docId, 1, (_, o) => o   1);
        await _docIdLocks.GetOrAdd(docId, new FifoAsyncLock()).WaitAsync();
        return new Lock(this, docId);
    }

    private async Task Release(Guid docId)
    {
        if (!_docIdLocks.ContainsKey(docId))
            throw new KeyNotFoundException($"Key not found: '{docId}'");

        _docIdLocks[docId].Release();

        if (!_users.ContainsKey(docId))
            throw new KeyNotFoundException($"Key not found: '{docId}'");

        if (--_users[docId] == 0)
        {
            _docIdLocks.TryRemove(docId, out _);
            _users.TryRemove(docId, out _);
        }
    }

    private class Lock : IAsyncDisposable
    {
        private readonly DocIdLocker _parent;
        private readonly Guid _docId;

        public Lock(DocIdLocker parent, Guid docId)
        {
            _parent = parent;
            _docId = docId;
        }

        public ValueTask DisposeAsync() => new (_parent.Release(_docId));
    }

    public void Dispose()
    {
        if (_disposed)
            return;

        foreach (var item in _docIdLocks.Values)
            item.Dispose();
        _users.Clear();

         _disposed = true;
    }
}

But my tests still showed be that order of 'a's is not preserved.

I thought, maybe some threads take elements when lock doesn't exist still, and process them out of order. Well, it all became very hard to reason about, and everything is mixing up in my head right now.

Is there a simple and elegant way to achieve what I'm trying to achieve?

CodePudding user response:

The easiest way to solve this problem is probably to group the commands by the attribute that correlates them, and then parallelize the processing of the groups instead of the individual documents. Then for each group perform a sequential foreach loop, and process the related documents one by one. Example:

string[] documents = new[] { 'n', 'a', 's', 'j', 'a', 'l', 'v', 'g', 'a', 'f', 'f' }
    .Select((item, index) => $"{item}-{index}")
    .ToArray();
Console.WriteLine($"Documents: [{String.Join(", ", documents)}]");

var grouped = documents.GroupBy(item => item[0]); // Group by the first char

ParallelOptions options = new()
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
};

Parallel.ForEach(grouped, options, grouping =>
{
    foreach (var document in grouping)
    {
        Console.WriteLine($"Processing: {document}");
        Thread.Sleep(500); // Simulate a CPU-bound or blocking operation
    }
});

Output:

Documents: [n-0, a-1, s-2, j-3, a-4, l-5, v-6, g-7, a-8, f-9, f-10]
Processing: a-1
Processing: n-0
Processing: s-2
Processing: j-3
Processing: a-4
Processing: l-5
Processing: v-6
Processing: g-7
Processing: f-9
Processing: a-8
Processing: f-10

Live demo.

The ordering behavior of the GroupBy LINQ operator is well defined. According to the documentation:

The IGrouping<TKey,TElement> objects are yielded in an order based on the order of the elements in source that produced the first key of each IGrouping<TKey,TElement>. Elements in a grouping are yielded in the order that the elements that produced them appear in source.

This approach has a few drawbacks:

  1. The source sequence must be enumerated fully before starting the parallel processing. This can be an issue in case the source sequence is a deferred enumerable, for example a BlockingCollection<T> that contains items that are coming in real time from a parallel producer.
  2. The order of processing is dominated by the order that the first unique key appears in the source sequence, and not by the order of the items themselves. So for example if the source is (A, B, A, A, A, A, A) and the degree of parallelism is 1, the B item will be processed last.
  3. The resulting partitioning scheme might not be well balanced. In case there are keys with large number of elements, and these keys appear late in the source sequence, then the parallel processing might suffer from reduced degree of parallelization towards the end of the operation. To mitigate this issue, it might be a good idea to reorder the groups based on the number of the items they contain, in descending order.

Below is a custom LINQ operator ToConsumableGroupings, that might be better suited for this scenario than the standard GroupBy operator. It solves most of the previously mentioned issues, because it enumerates lazily the source sequence, and emits groupings on-the-go. It has identical signature with the GroupBy operator:

/// <summary>
/// Groups the elements of a sequence into consumable groupings, according to
/// a specified key selector function.
/// </summary>
/// <remarks>
/// For each key, more than one groupings can be emitted. A new grouping can be emitted
/// if the previously emitted grouping for the same key has been fully consumed.
/// </remarks>
public static IEnumerable<IGrouping<TKey, TSource>>
    ToConsumableGroupings<TKey, TSource>(
    this IEnumerable<TSource> source,
    Func<TSource, TKey> keySelector,
    IEqualityComparer<TKey> keyComparer = default)
{
    var perKey = new Dictionary<TKey, Queue<TSource>>(keyComparer);
    foreach (var item in source)
    {
        var key = keySelector(item);
        lock (perKey)
        {
            if (perKey.TryGetValue(key, out var queue))
            {
                queue.Enqueue(item); continue;
            }
            queue = perKey[key] = new Queue<TSource>();
            queue.Enqueue(item);
        }
        yield return new Grouping<TKey, TSource>(key, GetGroup(key));
    }

    IEnumerable<TSource> GetGroup(TKey key)
    {
        while (true)
        {
            TSource item;
            lock (perKey)
            {
                var queue = perKey[key];
                if (queue.Count == 0) { perKey.Remove(key); break; }
                item = queue.Dequeue();
            }
            yield return item;
        }
    }
}

private class Grouping<TKey, TSource> : IGrouping<TKey, TSource>
{
    private readonly TKey _key;
    private readonly IEnumerable<TSource> _sequence;
    public Grouping(TKey key, IEnumerable<TSource> sequence)
    {
        _key = key;
        _sequence = sequence;
    }
    public TKey Key => _key;
    public IEnumerator<TSource> GetEnumerator() => _sequence.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

Usage example:

var grouped = documents.ToConsumableGroupings(item => item[0]);

Unlike the GroupBy operator, the ToConsumableGroupings operator emits non-materialized groupings, that are expected to be enumerated (consumed) only once.

  • Related