Hi I have a concurrent Queue that is loaded with files from database. These files are to be processed by parallel Tasks that will dequeue the files. However I run into issues where after some time, I start getting tasks that dequeue the same file at the same time (which leads to "used by another process errors on the file). And I also get more tasks than are supposed to be allocated. I have even seen 8 tasks running at once which should not be happening. The active tasks limit is 5
Rough code:
private void ParseQueuedTDXFiles()
{
while (_signalParseQueuedFilesEvent.WaitOne())
{
Task.Run(() => SetParsersTask());
}
}
The _signalParseQueuedFilesEvent
is set on a timer in a Windows Service
The above function then calls SetParsersTask
. This is why I use a concurrent Dictionary to track how many active tasks there are. And make sure they are below _ActiveTasksLimit
:
private void SetParsersTask()
{
if (_ConcurrentqueuedTdxFilesToParse.Count > 0)
{
if (_activeParserTasksDict.Count < _ActiveTasksLimit) //ConcurrentTask Dictionary Used to control how many Tasks should run
{
int parserCountToStart = _ActiveTasksLimit - _activeParserTasksDict.Count;
Parallel.For(0, parserCountToStart, parserToStart =>
{
lock(_concurrentQueueLock)
Task.Run(() => PrepTdxParser());
});
}
}
}
Which then calls this function which dequeues the Concurrent Queue:
private void PrepTdxParser()
{
TdxFileToProcessData fileToProcess;
lock (_concurrentQueueLock)
_ConcurrentqueuedTdxFilesToParse.TryDequeue(out fileToProcess);
if (!string.IsNullOrEmpty(fileToProcess.TdxFileName))
{
LaunchTDXParser(fileToProcess);
}
}
I even put a lock on _ConcurrentqueuedTdxFilesToParse
even though I know it doesn't need one. All to make sure that I never run into a situation where two Tasks are dequeuing the same file.
This function is where I add and remove Tasks as well as launch the file parser for the dequeued file:
private void LaunchTDXParser(TdxFileToProcessData fileToProcess)
{
string fileName = fileToProcess.TdxFileName;
Task startParserTask = new Task(() => ConfigureAndStartProcess(fileName));
_activeParserTasksDict.TryAdd(fileName, startParserTask);
startParserTask.Start();
Task.WaitAll(startParserTask);
_activeParserTasksDict.TryRemove(fileName, out Task taskToBeRemoved);
}
Can you guys help me understand why I am getting the same file dequeued in two different Tasks? And why I am getting more Tasks than the _ActiveTasksLimit
?
CodePudding user response:
There is a number of red flags in this¹ code:
- Using a
WaitHandle
. This tool it too primitive. I've never seen a problem solved withWaitHandle
s, that can't be solved in a simpler way without them. - Launching
Task.Run
tasks in a fire-and-forget fashion. - Launching a
Parallel.For
loop without configuring theMaxDegreeOfParallelism
. This practically guarantees that theThreadPool
will get saturated. - Protecting a queue (
_queuedTdxFilesToParse
) with a lock (_concurrentQueueLock
) only partially. If the queue is aQueue<T>
, you must protect it on each and every operation, otherwise the behavior of the program is undefined. If the queue is aConcurrentQueue<T>
, there is no need to protect it because it is thread-safe by itself. - Calling
Task.Factory.StartNew
andTask.Start
without configuring the scheduler argument.
So I am not surprised that your code is not working as expected. I can't point to a specific error that needs to be fixed. For me the whole approach is dubious, and needs to be reworked/scraped. Some concepts and tools that you might want to research before attempting to rewrite this code:
- The producer-consumer pattern.
- The
BlockingCollection<T>
class. - The TPL Dataflow library.
Optionally you could consider familiarizing yourself with asynchronous programming. It can help at reducing the number of threads that your program uses while running, resulting in a more efficient and scalable program. Two powerful asynchronous tools is the Channel<T>
class and the Parallel.ForEachAsync
API (available from .NET 6 and later).
¹ This answer was intended for a related question that is now deleted.
CodePudding user response:
So I thought I fixed my problem. The solution was to not add more parallelism than needs be. I was trying to create a situaion where private void SetParsersTask() would not be held by tasks that still needed to finish process a file. So I foolishly threw in Parallel.For in addition to Task.Start which is already parallel. I fixed this by generating Fire and Forget Tasks in a normal for loop as opposed to Paralle.For:
private void SetParsersTask()
{
if (_queuedTdxFilesToParse.Count < _tdxParsersInstanceCount)
{
int numFilesToGet = _tdxParsersInstanceCount - _activeParserTasksDict.Count;
var filesToAdd = ServiceDBHelper.GetTdxFilesToEnqueueForProcessingFromDB(numFilesToGet);
foreach (var fileToProc in filesToAdd)
{
ServiceDBHelper.UpdateTdxFileToProcessStatusAndUpdateDateTime(fileToProc.TdxFileName, 1, DateTime.Now);
_queuedTdxFilesToParse.Enqueue(fileToProc);
}
}
if (_queuedTdxFilesToParse.Count > 0)
{
if (_activeParserTasksDict.Count < _tdxParsersInstanceCount)
{
int parserCountToStart = _tdxParsersInstanceCount - _activeParserTasksDict.Count;
for (int i = 0; i < parserCountToStart; i )
{
Task.Run(() => PrepTdxParser());
}
}
}
}
However I still get one file every now and then that gets queued in two different Tasks. How is this still possible? And how can I fix it?