Home > other >  Thread-safe fixed-size circular buffer with sequence ids
Thread-safe fixed-size circular buffer with sequence ids

Time:06-30

I need a queue with these capabilities:

  • fixed-size (i.e. circular buffer)
  • queue items have ids (like a primary key), which are sequential
  • thread-safe (used from multiple ASP.NET Core requests)

To avoid locking, I tried a ConcurrentQueue but found race conditions. So I'm trying a custom approach.

public interface IQueueItem
{
    long Id { get; set; }
}

public class CircularBuffer<T> : LinkedList<T> where T : class, IQueueItem
{
    public CircularBuffer(int capacity) => _capacity = capacity;
    private readonly int _capacity;

    private long _counter = 0;
    private readonly object _lock = new();

    public void Enqueue(T item)
    {
        lock (_lock) {         // works but feels "heavy"
            _counter  ;
            item.Id = _counter;
            if (Count == _capacity) RemoveFirst();
            AddLast(item);
        }
    }
}

And to test:

public class Item : IQueueItem
{
    public long Id { get; set; }
    //...
}

public class Program
{
    public static void Main()
    {
        var q = new CircularBuffer<Item>(10);
        Parallel.For(0, 15, i => q.Enqueue(new Item()));
        Console.WriteLine(string.Join(", ", q.Select(x => x.Id)));
    }
}

Which gives correct output (is ordered even though enqueued by competing threads, and has fixed size with oldest items dequeued):

6, 7, 8, 9, 10, 11, 12, 13, 14, 15

In reality, I have web requests that read (i.e. enumerate) that queue.

The problem: if one thread is enumerating the queue while another thread is adding to it, I will have errors. (I could use a ToList() before the read, but for a large queue that will suck up all the server's memory as this could be done many times a second by multiple requests). How can I deal with that scenario? I used a linked list, but I'm flexible to use any structure.

(Also, that seems to be a really heavy lock section; is there a more performant way?)

UPDATE
As asked in comments below: I expect the queue to have from a few hundred to a few tens of thousand items, but the items themselves are small (just a few primitive data types). I expect an enqueue every second. Reads from web requests are less often, let's say a few times per minute (but can occur concurrently to the server writing to the queue).

CodePudding user response:

Based on the metrics that you provided in the question, you have plenty of options. The anticipated usage of the CircularBuffer<T> is not really that heavy. Wrapping a lock-protected Queue<T> should work pretty well. The cost of copying the contents of the queue into an array on each enumeration (copying 10,000 elements a few times per second) is unlikely to be noticeable. Modern machines can do such things in the blink of an eye. You'd have to enumerate the collection thousands of times per second for this to start (slightly) becoming an issue.

For the sake of variety I'll propose a different structure as internal storage: the ImmutableQueue<T> class. Its big plus is that it can be enumerated freely by multiple threads concurrently. You don't have to worry about concurrent mutations, because this collection is immutable. Nobody can change it after it has been created, ever.

The way that you update this collection is by creating a new collection and discarding the previous one. This collection has methods Enqueue and Dequeue that don't mutate the existing collection, but instead they return a new collection with the desirable mutation. This sounds extremely inefficient, but actually it's not. The new collection reuses most of the internal parts of the existing collection. Of course it's much more expensive compared to mutating a Queue<T>, probably around 10 times more expensive, but you hope that you'll get even more back in return by how cheap and non-contentious is to enumerate it.

public class ConcurrentCircularBuffer<T> : IEnumerable<T> where T : IQueueItem
{
    private readonly object _locker = new();
    private readonly int _capacity;
    private ImmutableQueue<T> _queue = ImmutableQueue<T>.Empty;
    private int _count = 0;
    private long _lastId = 0;

    public ConcurrentCircularBuffer(int capacity) => _capacity = capacity;

    public void Enqueue(T item)
    {
        lock (_locker)
        {
            item.Id =   _lastId;
            _queue = _queue.Enqueue(item);
            if (_count < _capacity)
                _count  ;
            else
                _queue = _queue.Dequeue();
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        var enumerator = Volatile.Read(ref _queue).GetEnumerator();
        while (enumerator.MoveNext())
            yield return enumerator.Current;
    }

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

The class that implements the IQueueItem interface should be implemented like this:

public class QueueItem : IQueueItem
{
    private long _id;

    public long Id
    {
        get => Volatile.Read(ref _id);
        set => Volatile.Write(ref _id, value);
    }
}

Otherwise it might be possible for a thread to see an IQueueItem instance with uninitialized Id. For an explanation you can read this article by Igor Ostrovsky. I am not 100% sure that it's possible, but neither I can guarantee that it's impossible. Even with the Volatile in place, it still looks fragile to me to delegate the responsibility of initializing the Id to an external component.

CodePudding user response:

Since ConcurrentQueue is out in this question, you can try fixed array.

IQueueItem[] items = new IQueueItem[SIZE];
long id = 0;

Enqueue is simple.

void Enqueue(IQueueItem item)
{
    long id2 = Interlocked.Increment(ref id);
    item.Id = id2 - 1;
    items[id2 % SIZE] = item;
}

To output the data, you just need copy the array to a new one, then sort it. (of course, it can be optimized here)

var arr = new IQueueItem[SIZE];
Array.Copy(items, arr, SIZE);
return arr.Where(a => a != null).OrderBy(a => a.Id);

There may be some gaps in the array because of the concurrent insertions, you can take a sequence till a gap is found.

var e = arr.Where(a => a != null).OrderBy(a => a.Id);
var firstId = e.First().Id;
return e.TakeWhile((a, index) => a.Id - index == firstId);
  • Related