Home > OS >  How to ensure parallel tasks dequeue unique entries from ConcurrentQueue<T>?
How to ensure parallel tasks dequeue unique entries from ConcurrentQueue<T>?

Time:10-11

Hi I have a concurrent Queue that is loaded with files from database. These files are to be processed by parallel Tasks that will dequeue the files. However I run into issues where after some time, I start getting tasks that dequeue the same file at the same time (which leads to "used by another process errors on the file). And I also get more tasks than are supposed to be allocated. I have even seen 8 tasks running at once which should not be happening. The active tasks limit is 5

Rough code:

private void ParseQueuedTDXFiles()
{
    while (_signalParseQueuedFilesEvent.WaitOne())
    {
        Task.Run(() => SetParsersTask());
    }
}

The _signalParseQueuedFilesEvent is set on a timer in a Windows Service The above function then calls SetParsersTask. This is why I use a concurrent Dictionary to track how many active tasks there are. And make sure they are below _ActiveTasksLimit:

private void SetParsersTask()
{
    
    if (_ConcurrentqueuedTdxFilesToParse.Count > 0)
    {
        if (_activeParserTasksDict.Count < _ActiveTasksLimit) //ConcurrentTask Dictionary Used to control how many Tasks should run
        {
            int parserCountToStart = _ActiveTasksLimit - _activeParserTasksDict.Count;
            Parallel.For(0, parserCountToStart, parserToStart =>
            {
                lock(_concurrentQueueLock)
                    Task.Run(() => PrepTdxParser());
            });
        }
    }

}

Which then calls this function which dequeues the Concurrent Queue:

private void PrepTdxParser()
{
    TdxFileToProcessData fileToProcess;
    lock (_concurrentQueueLock)
        _ConcurrentqueuedTdxFilesToParse.TryDequeue(out  fileToProcess);
    if (!string.IsNullOrEmpty(fileToProcess.TdxFileName))
    {
        LaunchTDXParser(fileToProcess);
    }
}

I even put a lock on _ConcurrentqueuedTdxFilesToParse even though I know it doesn't need one. All to make sure that I never run into a situation where two Tasks are dequeuing the same file.

This function is where I add and remove Tasks as well as launch the file parser for the dequeued file:

private void LaunchTDXParser(TdxFileToProcessData fileToProcess)
{
    string fileName = fileToProcess.TdxFileName;
    Task startParserTask = new Task(() => ConfigureAndStartProcess(fileName));
    _activeParserTasksDict.TryAdd(fileName, startParserTask);
    startParserTask.Start();
    Task.WaitAll(startParserTask);
    _activeParserTasksDict.TryRemove(fileName, out Task taskToBeRemoved);
}

Can you guys help me understand why I am getting the same file dequeued in two different Tasks? And why I am getting more Tasks than the _ActiveTasksLimit?

CodePudding user response:

There is a number of red flags in this¹ code:

  1. Using a WaitHandle. This tool it too primitive. I've never seen a problem solved with WaitHandles, that can't be solved in a simpler way without them.
  2. Launching Task.Run tasks in a fire-and-forget fashion.
  3. Launching a Parallel.For loop without configuring the MaxDegreeOfParallelism. This practically guarantees that the ThreadPool will get saturated.
  4. Protecting a queue (_queuedTdxFilesToParse) with a lock (_concurrentQueueLock) only partially. If the queue is a Queue<T>, you must protect it on each and every operation, otherwise the behavior of the program is undefined. If the queue is a ConcurrentQueue<T>, there is no need to protect it because it is thread-safe by itself.
  5. Calling Task.Factory.StartNew and Task.Start without configuring the scheduler argument.

So I am not surprised that your code is not working as expected. I can't point to a specific error that needs to be fixed. For me the whole approach is dubious, and needs to be reworked/scraped. Some concepts and tools that you might want to research before attempting to rewrite this code:

  1. The producer-consumer pattern.
  2. The BlockingCollection<T> class.
  3. The TPL Dataflow library.

Optionally you could consider familiarizing yourself with asynchronous programming. It can help at reducing the number of threads that your program uses while running, resulting in a more efficient and scalable program. Two powerful asynchronous tools is the Channel<T> class and the Parallel.ForEachAsync API (available from .NET 6 and later).

¹ This answer was intended for a related question that is now deleted.

CodePudding user response:

So I thought I fixed my problem. The solution was to not add more parallelism than needs be. I was trying to create a situaion where private void SetParsersTask() would not be held by tasks that still needed to finish process a file. So I foolishly threw in Parallel.For in addition to Task.Start which is already parallel. I fixed this by generating Fire and Forget Tasks in a normal for loop as opposed to Paralle.For:

private void SetParsersTask()
{
    if (_queuedTdxFilesToParse.Count < _tdxParsersInstanceCount)
    {
        int numFilesToGet = _tdxParsersInstanceCount - _activeParserTasksDict.Count;
        var filesToAdd = ServiceDBHelper.GetTdxFilesToEnqueueForProcessingFromDB(numFilesToGet);
        foreach (var fileToProc in filesToAdd)
        {
            ServiceDBHelper.UpdateTdxFileToProcessStatusAndUpdateDateTime(fileToProc.TdxFileName, 1, DateTime.Now);
            _queuedTdxFilesToParse.Enqueue(fileToProc);
        }

    }

    if (_queuedTdxFilesToParse.Count > 0)
    {
        if (_activeParserTasksDict.Count < _tdxParsersInstanceCount)
        {
            int parserCountToStart = _tdxParsersInstanceCount - _activeParserTasksDict.Count;
            for (int i = 0; i < parserCountToStart; i  )
            {
                
                Task.Run(() => PrepTdxParser());
            }
            
        }
    }

}

However I still get one file every now and then that gets queued in two different Tasks. How is this still possible? And how can I fix it?

  • Related