Home > Mobile >  Thread-safe buffer that propagates the latest data
Thread-safe buffer that propagates the latest data

Time:01-09

I have a data source which creates (produces) a PointF every 15 to 20 milliseconds. I need to store (consume) such points every 10ms. My approach is to use a 3 points wide buffer and pointers to achieve a lock-free access:

protected class PosBuffer
{
    PointF[] m_Buffer = new PointF[3];
    volatile int m_ReadPointer = 0;
    volatile int m_WritePointer = 1;
    internal PosBuffer()
    {
        m_Buffer[0] = new PointF(0, 0);
        m_Buffer[1] = new PointF(0, 0);
        m_Buffer[2] = new PointF(0, 0);
    }

    internal void Write(PointF point)
    {
        m_Buffer[m_WritePointer] = point;
        m_ReadPointer  ;
        if (m_ReadPointer == 3) m_ReadPointer = 0;
        m_WritePointer  ;
        if (m_WritePointer == 3) m_WritePointer = 0;
    }

    internal PointF Read()
    {
        return m_Buffer[m_ReadPointer];
    }
}

My idea is that

  • as soon as new data arrives it will be stored 'above' the old data. Then the read pointer is set to this position and then the write pointer is incremented.
  • in case now new data has been produced the consumer thread reads the old data again.
  • This construction allows different or inconstant read and write rates.

My questions are:

  • would this approach work?
  • Do I need locks/monitors/critical sections...
  • Would I need to disable optimization?
  • Are there known better solutions?

Thanks

CodePudding user response:

Try running this code:

async Task Main()
{
    var cts = new CancellationTokenSource();
    var ct = cts.Token;
    var pb = new PosBuffer();
    var tw = Task.Run(() =>
    {
        while (true)
        {
            if (ct.IsCancellationRequested)
                break;
            pb.Write(new PointF());
        }
    });

    var tr = Task.Run(() =>
    {
        while (true)
        {
            if (ct.IsCancellationRequested)
                break;
            pb.Read();
        }
    });
    
    await Task.Delay(TimeSpan.FromSeconds(5.0));
    
    cts.Cancel();
}

Fairly quickly it throws IndexOutOfRangeException. You're letting the value of the "pointers" (bad name by the way) be 3 before dropping back to zero and in the time it takes to change it down the read operation throws.

The problem goes away if you increment like this:

m_ReadPointer = (m_ReadPointer == m_Buffer.Length - 1) ? 0 : m_ReadPointer   1;
m_WritePointer = (m_WritePointer == m_Buffer.Length - 1) ? 0 : m_WritePointer   1;

Now, if you have multiple writers then you're definitely going to need locking.

CodePudding user response:

You could consider using a BroadcastBlock<T> from the TPL Dataflow library:

Provides a buffer for storing at most one element at time, overwriting each message with the next as it arrives.

using System.Threading.Tasks.Dataflow;

// Initialize
BroadcastBlock<PointF> block = new(x => x);

// Write the latest value
block.Post(new PointF(0, 0));

// Fetch the latest value
PointF point = await block.ReceiveAsync();

Another idea is to use a BehaviorSubject<T> from the Rx library.

Represents a value that changes over time.

using System.Reactive.Subjects;

// Initialize
BehaviorSubject<PointF> subject = new(new PointF(0, 0));

// Write the latest value
subject.OnNext(new PointF(0, 0));

// Get the latest value
PointF point = subject.Value;

Both classes (BroadcastBlock<T> and BehaviorSubject<T>) are thread-safe.

CodePudding user response:

For your case of single value, I would suggest ReaderWriterLockSlim assuming you need thread safe reads and writes with multiple threads.

protected class PosBuffer
{
  private PointF m_Buffer;
  private ReaderWriterLockSlim m_Lock = new();

  internal void Write(PointF point)
  {
    m_Lock.EnterWriteLock();
    try
    {
        m_Buffer = point;
    }
    finally
    {
        m_Lock.ExitWriteLock();
    }
  }

  internal PointF Read()
  {
    m_Lock.EnterReadLock();
    try
    {
        return m_Buffer;
    }
    finally
    {
        m_Lock.ExitReadLock();
    }
  }
}
  • Related