Home > Enterprise >  C multithreading locking a mutex before assigning to an atomic
C multithreading locking a mutex before assigning to an atomic

Time:03-26

In C do you need to lock a mutex before assigning to an atomic? I tried implementing the thread pool as shown here https://stackoverflow.com/a/32593825/2793618. In doing so, I created a thread safe queue and used atomics. In particular, in the shutdown method (or in my code the waitForCompletion) requires the thread pool loop function while loop variable to be set to true so that the thread can finish its work and join. But since atomics are thread safe, I didn't lock the mutex before assigning true to it in the shutdown method as shown below. This ended up causing a deadlock. Why is that the case?

ThreadPool.hpp:

#pragma once 

#include <atomic> 
#include <vector> 
#include <iostream> 
#include <thread>
#include <future>
#include <mutex>
#include <queue>
#include <functional>
#include <ThreadSafeQueue.hpp>

class ThreadPool{
    public: 
        ThreadPool(std::atomic_bool& result); 
        void waitForCompletion();
        void addJob(std::function<bool()> newJob);
        void setComplete();
    private: 
        void workLoop(std::atomic_bool& result); 
        int m_numThreads; 
        std::vector<std::thread> m_threads; 
        std::atomic_bool m_workComplete; 
        std::mutex m_mutex; 
        std::condition_variable m_jobWaitCondition; 
        ThreadSafeQueue<std::function<bool()>> m_JobQueue;
};

ThreadPool.cpp:

#include <ThreadPool.hpp> 

ThreadPool::ThreadPool(std::atomic_bool& result){ 
    m_numThreads = std::thread::hardware_concurrency();
    m_workComplete = false;
    for (int i = 0; i < m_numThreads; i  )
    {
        m_threads.push_back(std::thread(&ThreadPool::workLoop, this, std::ref(result)));
    }
}

// each thread executes this loop 
void ThreadPool::workLoop(std::atomic_bool& result){ 
    while(!m_workComplete){
        std::function<bool()> currentJob;
        bool popped;
        {
            std::unique_lock<std::mutex> lock(m_mutex); 
            m_jobWaitCondition.wait(lock, [this](){
                return !m_JobQueue.empty() || m_workComplete.load();
            });
            
            popped = m_JobQueue.pop(currentJob);
        }
        if(popped){
            result = currentJob() && result;
        }
    }
}

void ThreadPool::addJob(std::function<bool()> newJob){ 
    m_JobQueue.push(newJob);
    m_jobWaitCondition.notify_one();
}

void ThreadPool::setComplete(){
    m_workComplete = true; 
}

void ThreadPool::waitForCompletion(){
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_workComplete.store(true);
    }
    
    m_jobWaitCondition.notify_all();

    for(auto& thread : m_threads){ 
        thread.join();
    }
    
    m_threads.clear();
}

ThreadSafeQueue.hpp:

#pragma once

#include <mutex>
#include <queue>

template <class T>
class ThreadSafeQueue {
   public:
    ThreadSafeQueue(){};
    void push(T element) {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_queue.push(element);
    }
    bool pop(T& retElement) {
        std::unique_lock<std::mutex> lock(m_mutex);
        if (m_queue.empty()) {
            return false;
        }
        retElement = m_queue.front();
        m_queue.pop();
        return true;
    }
    bool empty(){ 
        std::unique_lock<std::mutex> lock(m_mutex); 
        return m_queue.empty();
    }

   private:
    std::queue<T> m_queue;
    std::mutex m_mutex;
};

CodePudding user response:

You probably want to check m_workComplete after the wait() returns in workLoop(), otherwise you might be calling pop() on an empty queue, which is bad.

CodePudding user response:

You have your dead lock while waiting for the condition. The condition although is only notified when there is a new job added. Your thread is waiting that condition to be notified. You may have non deterministic (from your point of view) checks of a condition "condition" but you may not rely them to exist.

You need to notify your condition when the task is completed.One possible place to do that is when you call for wait to complete or in any point where a completion state can be achieved.

I changed your code to this to illustrate:

// each thread executes this loop 
void ThreadPool::workLoop(std::atomic_bool& result){ 
    while(!m_workComplete)
    {
        std::function<bool()> currentJob;
        bool popped;
        {
        std::cout<<"Before the lock"<<std::endl;
            std::unique_lock<std::mutex> lock(m_mutex); 
        std::cout<<"After lock"<<std::endl;
            m_jobWaitCondition.wait(lock, [this]()
        {
            bool res = (!m_JobQueue.empty() || m_workComplete.load() );
        std::cout<<"res:"<<res<<std::endl;
                return res;
            });
        std::cout<<"After wait"<<std::endl;
            
            popped = m_JobQueue.pop(currentJob);
        }
        if(popped)
    { 
        std::cout<<"Popped"<<std::endl;
            result = currentJob() && result;
        std::cout<<"Popped 2"<<std::endl;
        }
    }
    std::cout<<"LEave"<<std::endl;
}

void ThreadPool::addJob(std::function<bool()> newJob){ 
    m_JobQueue.push(newJob);
    std::cout<<"before call notify"<<std::endl;
    m_jobWaitCondition.notify_one();
    std::cout<<"After call notify"<<std::endl;
}

I add a single job and the printed content is:

Before the lock After lock res:0 Before the lock After lock Before the lock Before the lock Before the lock res:0 Before the lock After lock res:0 After lock res:0 Before the lock After lock res:0 After lock before call notifyres:1

Before the lockAfter wait

Popped After lock res:0 After call notifyres:0

Popped 2 Before the lock res:0 res:0 res:0 res:0 After lock

res:0

After lock

res:0

Notice that last notify is called BEFORE the last "after lock" line (that precedes the condition wait)

  • Related