I have a generic List<T>
in which live streaming data coming from web socket is getting stored. I want to store the data from the generic list to the database and clear the list so that fresh streaming data can be stored without filling my machine's memory.
If I enumerate over the list to send data to database I am getting exception, since data is getting added to the list while I try to enumerate or clear the list. If I apply the lock on the list, streaming data will pause and that's not allowed.
Please suggest how can I solve this problem.
CodePudding user response:
Seems like a job for BatchBlock
It is totally thread safe and is perfectly suited for data flows.
There are a lot of classes in the DataFlow .Net library, but the one that suits your situation is BatchBlock
.
BatchBlock
collects data until the size threshold is met. When it is met, the whole batch will be the result. You get the result in different ways like .Receive
or ReceiveAll
or their async counterparts. Another way is to link the batch result to another block like ActionBlock
which will asynchronously call the supplied Action
every time the input is supplied to it from the source block(BatchBlock in this case), so basically every time the batch gets full it is sent to the ActionBlock. ActionBlock
can receive a parameter like MaxDegreeOfParallelism
to avoid database lock or smth if you need that, but it will not block the BatchBlock
in any way so no waiting on the client side, the batches will be simply placed in a queue(thread safe) for ActionBlock
to execute.
And do not worry, when the batch gets full, it also doesn't stop to receive new items, so no blocking again. A beautiful solution.
One thing to worry about is that if the batch didn't reach full size, but you stop the application, the results will get lost, so you can TriggerBatch
manually to sent as much items to ActionBlock
as there is in the batch. So you can call TriggerBatch
in Dispose
or smth, up to you.
Also there are two ways of inputting items in the BatchBlock
: Post
and SendAsync
. Post
is blocking I believe (although I am not sure), but SendAsync
postpones the message if the BatchBlock
is busy.
class ConcurrentCache<T> : IAsyncDisposable {
private readonly BatchBlock<T> _batchBlock;
private readonly ActionBlock<T[]> _actionBlock;
private readonly IDisposable _linkedBlock;
public ConcurrentCache(int cacheSize) {
_batchBlock = new BatchBlock<T>(cacheSize);
// action to do when the batch max capacity is met
// the action can be an async task
_actionBlock = new ActionBlock<T[]>(ReadBatchBlock);
_linkedBlock = _batchBlock.LinkTo(_actionBlock);
}
public async Task SendAsync(T item) {
await _batchBlock.SendAsync(item);
}
private void ReadBatchBlock(T[] items) {
foreach (var item in items) {
Console.WriteLine(item);
}
}
public async ValueTask DisposeAsync() {
_batchBlock.Complete();
await _batchBlock.Completion;
_batchBlock.TriggerBatch();
_actionBlock.Complete();
await _actionBlock.Completion;
_linkedBlock.Dispose();
}
}
Usage example:
await using var cache = new ConcurrentCache<int>(5);
for (int i = 0; i < 12; i ) {
await cache.SendAsync(i);
await Task.Delay(200);
}
When the object will be disposed, the remaining batch will be triggered and printed.
UPDATE
As @TheodorZoulias pointed out, if the batch is not filled up and there are no messages for a long time, the messages would be stuck in the BatchBlock. The solution would be to create a timer to call .TriggerBatch()
.
CodePudding user response:
If I apply the lock on the list, streaming data will pause and thats not allowed
You should only hold locks for as short time as possible. In this case that should be to add or remove an item from the list. You should not hold the lock while adding the data to the database, or any other slow operation. Taking a uncontested lock is on the order of 25ns, this should only be a problem in very tight loops.
But an better option would be to use the built in thread safe collections, like BlockingCollection. The later is very convenient since it has methods like GetConsumingEnumerable
and CompleteAdding
. This lets your consumer just use a regular foreach loop to consume items, and the producer can just call CompleteAdding to let the loop exit after all items have been processed.
You might also want to take a look at DataFlow. I have not used it myself, but it seem suitable for setting up concurrent processing pipelines.
However, before trying to do any kind of concurrent processing you need to be fairly familiar with thread safety and the dangers involved. Thread safety is difficult, and you need to know what is safe and unsafe to do. You will not always be so lucky to get an exception when you mess up, you might just get missing or incorrect data.
CodePudding user response:
I think you should try Parallel.ForEach along with ConcurrentDictionary
var streamingDataList = new ConcurrentDictionary<int, StreamingDataModel>();
Parallel.ForEach(streamingDataBatch, streamingData =>
{
streamingDataList.TryAdd(streamingData.Id,streamingData.Data));
});