Home > front end >  Generic list concurrent access - clear part of list while data is getting stored
Generic list concurrent access - clear part of list while data is getting stored

Time:12-22

I have a generic List<T> in which live streaming data coming from web socket is getting stored. I want to store the data from the generic list to the database and clear the list so that fresh streaming data can be stored without filling my machine's memory.

If I enumerate over the list to send data to database I am getting exception, since data is getting added to the list while I try to enumerate or clear the list. If I apply the lock on the list, streaming data will pause and that's not allowed.

Please suggest how can I solve this problem.

CodePudding user response:

Seems like a job for BatchBlock

It is totally thread safe and is perfectly suited for data flows. There are a lot of classes in the DataFlow .Net library, but the one that suits your situation is BatchBlock.

BatchBlock collects data until the size threshold is met. When it is met, the whole batch will be the result. You get the result in different ways like .Receive or ReceiveAll or their async counterparts. Another way is to link the batch result to another block like ActionBlock which will asynchronously call the supplied Action every time the input is supplied to it from the source block(BatchBlock in this case), so basically every time the batch gets full it is sent to the ActionBlock. ActionBlock can receive a parameter like MaxDegreeOfParallelism to avoid database lock or smth if you need that, but it will not block the BatchBlock in any way so no waiting on the client side, the batches will be simply placed in a queue(thread safe) for ActionBlock to execute.

And do not worry, when the batch gets full, it also doesn't stop to receive new items, so no blocking again. A beautiful solution.

One thing to worry about is that if the batch didn't reach full size, but you stop the application, the results will get lost, so you can TriggerBatch manually to sent as much items to ActionBlock as there is in the batch. So you can call TriggerBatch in Dispose or smth, up to you.

Also there are two ways of inputting items in the BatchBlock: Post and SendAsync. Post is blocking I believe (although I am not sure), but SendAsync postpones the message if the BatchBlock is busy.

class ConcurrentCache<T> : IAsyncDisposable {
    private readonly BatchBlock<T>    _batchBlock;
    private readonly ActionBlock<T[]> _actionBlock;
    private readonly IDisposable      _linkedBlock;

    public ConcurrentCache(int cacheSize) {
        _batchBlock = new BatchBlock<T>(cacheSize);
        // action to do when the batch max capacity is met
        // the action can be an async task
        _actionBlock = new ActionBlock<T[]>(ReadBatchBlock);
        _linkedBlock = _batchBlock.LinkTo(_actionBlock);
    }

    public async Task SendAsync(T item) {
        await _batchBlock.SendAsync(item);
    }

    private void ReadBatchBlock(T[] items) {
        foreach (var item in items) {
            Console.WriteLine(item);
        }
    }

    public async ValueTask DisposeAsync() {
        _batchBlock.Complete();
        await _batchBlock.Completion;
        _batchBlock.TriggerBatch();
        _actionBlock.Complete();
        await _actionBlock.Completion;
        _linkedBlock.Dispose();
    }
}

Usage example:

await using var cache = new ConcurrentCache<int>(5);

for (int i = 0; i < 12; i  ) {
    await cache.SendAsync(i);
    await Task.Delay(200);
}

When the object will be disposed, the remaining batch will be triggered and printed.


UPDATE

As @TheodorZoulias pointed out, if the batch is not filled up and there are no messages for a long time, the messages would be stuck in the BatchBlock. The solution would be to create a timer to call .TriggerBatch().

CodePudding user response:

If I apply the lock on the list, streaming data will pause and thats not allowed

You should only hold locks for as short time as possible. In this case that should be to add or remove an item from the list. You should not hold the lock while adding the data to the database, or any other slow operation. Taking a uncontested lock is on the order of 25ns, this should only be a problem in very tight loops.

But an better option would be to use the built in thread safe collections, like BlockingCollection. The later is very convenient since it has methods like GetConsumingEnumerable and CompleteAdding. This lets your consumer just use a regular foreach loop to consume items, and the producer can just call CompleteAdding to let the loop exit after all items have been processed.

You might also want to take a look at DataFlow. I have not used it myself, but it seem suitable for setting up concurrent processing pipelines.

However, before trying to do any kind of concurrent processing you need to be fairly familiar with thread safety and the dangers involved. Thread safety is difficult, and you need to know what is safe and unsafe to do. You will not always be so lucky to get an exception when you mess up, you might just get missing or incorrect data.

CodePudding user response:

I think you should try Parallel.ForEach along with ConcurrentDictionary

var streamingDataList = new ConcurrentDictionary<int, StreamingDataModel>();
Parallel.ForEach(streamingDataBatch, streamingData =>
{                            
  streamingDataList.TryAdd(streamingData.Id,streamingData.Data));
});
  • Related