Home > Back-end >  Simple Queue with 2 threads only processes half the items
Simple Queue with 2 threads only processes half the items

Time:03-01

I have implemented a simple queue to store a list of filenames, and a method that Reads the queue, takes the next available filename, and moves the file from one folder to another folder.

This class is used to keep track of files in a folder.

internal class FileItem
{
    public string FullFileName { get; set; }
    public bool isLocked { get; set; }
}

This is my Simple Queue implementation

internal class MyQueue
{
    List<FileItem> FileList;
    
    public MyQueue(string FilePath)
    {
        FileList = new List<FileItem>();

        string[] files = Directory.GetFiles(FilePath);
        foreach (string file in files)
        {
            FileItem fileitem = new FileItem
            {
                FullFileName = file,
                isLocked = false
            };

            FileList.Add(fileitem);
        }
    }

  
    public FileItem GetNextAvailableItem()
    {
        FileItem item = FileList.Where(i => i.isLocked == false).FirstOrDefault();
        if (item != null) item.isLocked = true;
        return item;
    }

    public void RemoveProcessedItem(FileItem item)
    {
        FileList.Remove(item);
    }

}

When I run this from Single thread, it works fine.

But I am using two threads like this.

static void ProcessFilesInMultiThread()
{
   Task task1 = Task.Factory.StartNew(() => ReadFromQueueAndMoveFile("Thread 1"));
   Task task2 = Task.Factory.StartNew(() => ReadFromQueueAndMoveFile("Thread 2"));            
   Task.WaitAll(task1, task2);
}

This is the ReadFromQueueAndMoveFile method.

static void ReadFromQueueAndMoveFile(string ThreadName)
    {
        while (_queue.GetNextAvailableItem() != null)
        {
            //get next available item from queue.

            FileItem item = _queue.GetNextAvailableItem();

            if(item != null)
            {
                string FileName = Path.GetFileName(item.FullFileName);
                string SourceFilePath = Path.Combine(sourcePath, FileName);
                string DestinationFilePath = Path.Combine(destinationPath, FileName);

                File.Move(SourceFilePath, DestinationFilePath);
                Thread.Sleep(2000);
                Console.WriteLine("Successfully moved: "   FileName   " Via "   ThreadName);

                //remove item from queue.
                _queue.RemoveProcessedItem(item);
            }

            
        }
    }

The problem is when I run it from 2 threads, always only half of the files are being moved and I am not sure why. If the folder has 6 files then only 3 files are getting moved randomly.

Why this is happening?

CodePudding user response:

I think that your main problem is here:

while (_queue.GetNextAvailableItem() != null)
{
    //get next available item from queue.
    FileItem item = _queue.GetNextAvailableItem();

You are calling the GetNextAvailableItem twice, and the value returned from the first call is discarded.

One way to solve this problem is this:

while (true)
{
    //get next available item from queue.
    FileItem item = _queue.GetNextAvailableItem();
    if (item == null) break;

Of course you should also ensure that the MyQueue class is thread-safe, as Gabriel suggests in their answer.

CodePudding user response:

To prevent the threads from influencing each other, a lock must be set. For this the lock statement is helpful. See: https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/statements/lock

For this purpose a lock object is defined:

private readonly object fileListLock = new object();

And then it is used in the GetNextAvailableItem() method:

    public FileItem GetNextAvailableItem()
    {
        lock (fileListLock)
        {
            FileItem item = FileList.Where(i => i.isLocked == false).FirstOrDefault();
            if (item != null) item.isLocked = true;
            return item;
        }
    }

and also in the RemoveProcessedItem() method:

    public void RemoveProcessedItem(FileItem item)
    {
        lock(fileListLock)
        {
            FileList.Remove(item);
        }
    }

CodePudding user response:

I was able to rewrite my queue using ConcurrentQueue<T> This solved my issue. code here:

internal class MyQueue
{
    ConcurrentQueue<FileItem> FileList = new ConcurrentQueue<FileItem>();
    
    public MyQueue(string FilePath)
    {
        string[] files = Directory.GetFiles(FilePath);
        foreach (string file in files)
        {
            FileItem fileitem = new FileItem
            {
                FullFileName = file,
                isLocked = false
            };

            FileList.Enqueue(fileitem);
        }
    }


    public FileItem GetNextAvailableItem()
    {
        //Deque and return object.
        FileItem DequeItem = new FileItem();
        bool isDequeSuccess = FileList.TryDequeue(out DequeItem);

        if (isDequeSuccess) return DequeItem;
        else return null;
    }

    public bool PeekIfAnyFilesLeftInQueue()
    {
        FileItem PeekItem = new FileItem();
        bool isFileExists = FileList.TryPeek(out PeekItem);
        return isFileExists;
    }


}

Also I had to change the invoking of these methods from multi threaded method.

    static void ReadFromQueueAndMoveFile(string ThreadName)
    {
        do
        {
            //get next available item from queue.

            FileItem item = _queue.GetNextAvailableItem();

            if (item != null)
            {
                string FileName = Path.GetFileName(item.FullFileName);                    

                string SourceFilePath = Path.Combine(sourcePath, FileName);
                string DestinationFilePath = Path.Combine(destinationPath, FileName);

                // Let's assume this is a long running process.
                File.Move(SourceFilePath, DestinationFilePath);
                Thread.Sleep(2000);
                Console.WriteLine("Successfully moved: "   FileName   " Via "   ThreadName);
            }
        }
        while (_queue.PeekIfAnyFilesLeftInQueue());
        
    }
  • Related