Home > Mobile >  How to lock threads only if they have to work on non overlapping rows of the dataset
How to lock threads only if they have to work on non overlapping rows of the dataset

Time:03-12

I have that method that is executed in multithread. The method have to read some rows where there are two DateTime fields, start and stop. The method than have to apply some modifications to the dataset itself and have to do other operations on other datasets. Currently I'm locking the execution of the method using a lock to prevent the method to work on the dataset from different threads. The solution works but it's not optimal, ideally I would like to lock only if I'm trying to modify a shared set of rows. I would like to allow the execution when I'm trying to work on the same dataset, but in different time ranges of the datasets that are not overlapping to the executing once.

If the problem was to lock "per row" I would like to use a concurrent dictionary where I lock using the row id, something like that:

var key = myConcurrentDictionary.GetOrAdd(rowNr, new object());
lock (key)
{
    DoWork(rowNr);
}

But in my case I'm working on a range of rows and I don't know how can I achive my objective.

CodePudding user response:

Here is a custom RangeLock implementation, that allows acquiring exclusive locks on ranges between generic boundaries:

/// <summary>
/// A mechanism that enforces exclusive access to resources associated with ranges.
/// </summary>
public class RangeLock<TBoundary>
{
    // Represents a lock that is currently acquired or pending.
    private class Entry
    {
        public TBoundary Start { get; init; }
        public TBoundary End { get; init; }
        public TaskCompletionSource<object> TCS; // It's null when acquired.
        public int Countdown; // Number of older entries that are overlapping this.
        public List<Entry> Overlapped; // Newer entries that are overlapped by this.
    }

    private readonly IComparer<TBoundary> _comparer;
    private readonly List<Entry> _entries = new();

    public RangeLock(IComparer<TBoundary> comparer = default)
    {
        _comparer = comparer ?? Comparer<TBoundary>.Default;
    }

    /// <summary>Acquires an exclusive lock on the specified range.</summary>
    /// <param name="start">The inclusive lower bound of the range.</param>
    /// <param name="end">The exclusive upper bound of the range.</param>
    /// <returns>A token that identifies the acquired exclusive lock.</returns>
    public Task<object> WaitAsync(TBoundary start, TBoundary end)
    {
        if (_comparer.Compare(end, start) < 0)
            throw new ArgumentOutOfRangeException(nameof(end));
        var entry = new Entry() { Start = start, End = end };
        lock (_entries)
        {
            foreach (var older in _entries)
            {
                if (Overlaps(entry, older))
                {
                    (older.Overlapped ??= new()).Add(entry);
                    entry.Countdown  ;
                }
            }
            _entries.Add(entry);
            if (entry.Countdown == 0) return Task.FromResult((object)entry);
            entry.TCS = new TaskCompletionSource<object>(
                TaskCreationOptions.RunContinuationsAsynchronously);
            return entry.TCS.Task;
        }
    }

    public object Wait(TBoundary start, TBoundary end)
        => WaitAsync(start, end).GetAwaiter().GetResult();

    /// <summary>Releases a previously acquired exclusive lock.</summary>
    /// <param name="token">A token that identifies the exclusive lock.</param>
    public void Release(object token)
    {
        if (token == null) throw new ArgumentNullException(nameof(token));
        if (!(token is Entry entry)) throw new ArgumentException("Invalid token.");
        lock (_entries)
        {
            if (!_entries.Remove(entry))
                throw new ArgumentException("Unknown token.");
            if (entry.Overlapped == null) return;
            foreach (var overlapped in entry.Overlapped)
            {
                overlapped.Countdown--;
                if (overlapped.Countdown == 0)
                {
                    Debug.Assert(overlapped.TCS != null);
                    overlapped.TCS.SetResult(overlapped);
                    overlapped.TCS = null;
                }
            }
        }
    }

    private bool Overlaps(Entry entry1, Entry entry2)
    {
        return (
            _comparer.Compare(entry1.Start, entry2.Start) <= 0 &&
            _comparer.Compare(entry1.End, entry2.Start) > 0
        ) || (
            _comparer.Compare(entry2.Start, entry1.Start) <= 0 &&
            _comparer.Compare(entry2.End, entry1.Start) > 0
        );
    }
}

Usage example:

var locker = new RangeLock<DateTime>();

DateTime start = new DateTime(2022, 3, 1); // Inclusive
DateTime end = new DateTime(2022, 4, 1); // Exclusive
// This range represents the whole month March, 2022.

var token = await locker.WaitAsync(start, end);
try
{
    // Here do something with the range between start and end.
    // Other execution flows that want to acquire a lock on an overlapping
    // range, will have to wait until this execution flow releases the lock.
}
finally { locker.Release(token); }

This implementation has both synchronous and asynchronous API. It is suitable for synchronizing a medium number of threads/asynchronous workflows, under conditions of low contention, and for doing chunky work while holding the lock. It's not suitable for synchronizing a huge number of flows that are contesting for a crowded range-space, and are doing granular (lightweight) work. In such conditions a simple SemaphoreSlim(1, 1) or lock might have better performance than the RangeLock. The complexity of both WaitAsync and Release methods is O(n). There is no support for CancellationToken or timeout. Adding this functionality is not trivial.

A demo application that verifies the correctness of the above implementation can be found here.

An alternative implementation based on the Wait/Pulse mechanism (offering only synchronous API) can be found in the first revision of this answer.

A non generic implementation with int boundaries can be found in the 4th revision of this answer. It has O(n²) complexity. It might be more efficient when the number of execution flows is small.

  • Related