I have a data source which creates (produces) a PointF
every 15 to 20 milliseconds.
I need to store (consume) such points every 10ms. My approach is to use a 3 points wide buffer and pointers to achieve a lock-free access:
protected class PosBuffer
{
PointF[] m_Buffer = new PointF[3];
volatile int m_ReadPointer = 0;
volatile int m_WritePointer = 1;
internal PosBuffer()
{
m_Buffer[0] = new PointF(0, 0);
m_Buffer[1] = new PointF(0, 0);
m_Buffer[2] = new PointF(0, 0);
}
internal void Write(PointF point)
{
m_Buffer[m_WritePointer] = point;
m_ReadPointer ;
if (m_ReadPointer == 3) m_ReadPointer = 0;
m_WritePointer ;
if (m_WritePointer == 3) m_WritePointer = 0;
}
internal PointF Read()
{
return m_Buffer[m_ReadPointer];
}
}
My idea is that
- as soon as new data arrives it will be stored 'above' the old data. Then the read pointer is set to this position and then the write pointer is incremented.
- in case now new data has been produced the consumer thread reads the old data again.
- This construction allows different or inconstant read and write rates.
My questions are:
- would this approach work?
- Do I need locks/monitors/critical sections...
- Would I need to disable optimization?
- Are there known better solutions?
Thanks
CodePudding user response:
Try running this code:
async Task Main()
{
var cts = new CancellationTokenSource();
var ct = cts.Token;
var pb = new PosBuffer();
var tw = Task.Run(() =>
{
while (true)
{
if (ct.IsCancellationRequested)
break;
pb.Write(new PointF());
}
});
var tr = Task.Run(() =>
{
while (true)
{
if (ct.IsCancellationRequested)
break;
pb.Read();
}
});
await Task.Delay(TimeSpan.FromSeconds(5.0));
cts.Cancel();
}
Fairly quickly it throws IndexOutOfRangeException
. You're letting the value of the "pointers" (bad name by the way) be 3
before dropping back to zero
and in the time it takes to change it down the read operation throws.
The problem goes away if you increment like this:
m_ReadPointer = (m_ReadPointer == m_Buffer.Length - 1) ? 0 : m_ReadPointer 1;
m_WritePointer = (m_WritePointer == m_Buffer.Length - 1) ? 0 : m_WritePointer 1;
Now, if you have multiple writers then you're definitely going to need locking.
CodePudding user response:
You could consider using a BroadcastBlock<T>
from the TPL Dataflow library:
Provides a buffer for storing at most one element at time, overwriting each message with the next as it arrives.
using System.Threading.Tasks.Dataflow;
// Initialize
BroadcastBlock<PointF> block = new(x => x);
// Write the latest value
block.Post(new PointF(0, 0));
// Fetch the latest value
PointF point = await block.ReceiveAsync();
Another idea is to use a BehaviorSubject<T>
from the Rx library.
Represents a value that changes over time.
using System.Reactive.Subjects;
// Initialize
BehaviorSubject<PointF> subject = new(new PointF(0, 0));
// Write the latest value
subject.OnNext(new PointF(0, 0));
// Get the latest value
PointF point = subject.Value;
Both classes (BroadcastBlock<T>
and BehaviorSubject<T>
) are thread-safe.
CodePudding user response:
For your case of single value, I would suggest ReaderWriterLockSlim assuming you need thread safe reads and writes with multiple threads.
protected class PosBuffer
{
private PointF m_Buffer;
private ReaderWriterLockSlim m_Lock = new();
internal void Write(PointF point)
{
m_Lock.EnterWriteLock();
try
{
m_Buffer = point;
}
finally
{
m_Lock.ExitWriteLock();
}
}
internal PointF Read()
{
m_Lock.EnterReadLock();
try
{
return m_Buffer;
}
finally
{
m_Lock.ExitReadLock();
}
}
}