I'm working on project with following workflow :
- Background service consumme messages from Rabbitmq's queue
- Background service use background task queue like this and here to process task paralleling
- Each task execute queries to retrieve some datas and cache them in collection
If collection size is over 1000 objects, I would like to read collection and then clear it. Like each tasks are processing as parallel, I don't want that another thread add datas in collection until it was cleared.
There are blockingcollection or concurrentdictionary (thread-safe collection), but I don't know which mechanic to use ?
What's the best way to achieve this?
CodePudding user response:
The collection that seems more suitable for your case is the Channel<T>
. This is an asynchronous version of the BlockingCollection<T>
, and internally it's based on the same storage (the ConcurrentQueue<T>
collection). The similarities are:
- They both can be configured to be bounded or unbounded.
- A consumer can take a message, even if none is currently available. In this case the
Take
/TakeAsync
call will block either synchronously or asynchronously until a message can be consumed, or the collection completes, whatever comes first. - A producer can push a message, even if the collection is currently full. In this case the
Add
/WriteAsync
call will block either synchronously or asynchronously until there is space available for the message, or the collection completes, whatever comes first. - A consumer can enumerate the collection in a consuming fashion, with a
foreach
/await foreach
loop. Each message received in the loop is consumed by this loop, and will never be available to other consuming loops that might be active by other consumers in parallel.
Some features of the Channel<T>
that the BlockingCollection<T>
lacks:
- A
Channel<T>
exposes two facades, aWriter
and aReader
, that allow a better separation between the roles of the producer and the consumer. In practice this can be more of an annoyance than a useful feature IMHO, but nonetheless it's part of the experience of working with a channel. - A
ChannelWriter<T>
can be optionally completed with an error. This error is propagated to the consumers of the channel. - A
ChannelReader<T>
has aCompletion
property of typeTask
. - A bounded
Channel<T>
can be configured to be lossy, so that it drops old buffered messages automatically in order to make space for new incoming messages.
Some features of the BlockingCollection<T>
that the Channel<T>
lacks:
- There is no direct support for timeout when writing/reading messages. This can be achieved indirectly (but precariously, see below) with timer-based
CancellationTokenSource
s. - The contents of a channel cannot be enumerated in a non-consuming fashion.
- Getting the number of the currently stored messages is only possible for bounded channels.
- Some auxiliary features like the
BlockingCollection<T>.TakeFromAny
method are not available. - A channel cannot be backed by other internal collections, other than the
ConcurrentQueue<T>
. So it can't have, for example, the behavior of a stack instead of a queue.
Caveat:
There is a nasty memory leak issue that is triggered when a channel is idle (empty with an idle producer, or full with an idle consumer), and the consumer or the producer attempts continuously to read/write messages with timer-based CancellationTokenSource
s. Each such canceled operation leaks about 800 bytes. The leak is resolved automatically when the first read/write operation completes successfully. This issue is known for more than two years, and Microsoft has not decided yet what to do with it.
CodePudding user response:
Check out concurrentQueue. It appears to be suitable for the tasks you have mentioned in your questions. Documentation here - https://docs.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentqueue-1?view=net-6.0
There are other concurrent collection types as well - https://docs.microsoft.com/en-us/dotnet/standard/collections/thread-safe/