In its bare-bone, my producer/consumer pattern reads as the following.
public class Consumer<T>
{
Task consumer;
BlockingCollection<T> buffer;
public Consumer()
{
buffer = new();
consumer = Task.Factory.StartNew(
x => ConsumerAction(),
TaskCreationOptions.LongRunning);
}
public void ConsumerAction()
{
while(true)
{
// log 1
var obj = buffer.Take();
// log 2
WriteToDisk(obj);
// log 3
}
}
public void Enqueue(T obj)
{
buffer.Add(obj);
}
}
The Consumer
type works as expected for a while, then at a seemingly random point, it stops Take
ing, and the buffer
continues to grow. On the same input collection, sometimes it works throughout the input with no problems, and sometimes breaks towards the beginning of the input, and sometimes towards the end.
- I tried catching any possible exceptions in any method involved in the execution path, but no exception is raised;
- I have checked the logs in my application, accordingly, all the business logic on the last
obj
was executed successfully, hence the call was returned tovar obj = buffer.Take();
and was waiting for a new item to be added tobuffer
; - I have tried enclosing
while(true)
, in atry-catch
block, and no exception is caught; - The commented logs in the code sample, appear in the following order in the logs:
1
,2
,3
, ...3
,1
.
My specific questions are:
- Is there a possibility that the garbage collector is collecting the thread?
- Can
while(true)
be the source of errors?! - Any thoughts on how best I can debug this?
- Processing the entire input collection takes ~12h on a successful run, and over 1 billion items are added to the
buffer
(but ifTake
works,buffer
contains only a handful number of items at a given time). Does this scale seem to be a corner-case for this pattern / not its intended use?
As requested in the comments, I'll provide a minimal reproducible example. However, that is a bit challenging since I am not sure which parts of the program are relevant, so it may take sometime to narrow it down. Meanwhile, I'd appreciate any suggestions on the above-listed questions.
CodePudding user response:
Is there a possibility that the garbage collector is collecting the thread?
No, the GC collects objects in memory, not threads.
Can while(true) be the source of errors?!
No. But I would recommend replacing it with a foreach loop using GetConsumingEnumerable. This will allow you to cleanly and easily exit the loop by calling CompleteAdding on the buffer.
Any thoughts on how best I can debug this?
I would definitely add a try/catch to ensure WriteToDisk
does not fail in some way. You should also check the task when you are done to ensure some other failure has not occurred. Another thing you could consider is adding a limit to the size of the buffer. This should limit memory usage, help prevent thread starvation, and should hang your program if items are not removed from the buffer. The last point can help in debugging since you can simply break the process at that point and check what each thread is doing.
A guess is that there might be some kind of deadlock or other issue going on that causes the ConsumerAction thread to block inside WriteToDisk
.
Processing the entire input collection takes ~12h on a successful run, and over 1 billion items are added to the buffer (but if Take works, buffer contains only a handful number of items at a given time). Does this scale seem to be a corner-case for this pattern / not its intended use?
That seem like a perfectly fine usage for a blocking collection to me.