Home > Net >  Issue with condition variables in C
Issue with condition variables in C

Time:09-17

We have implemented TaskRunner whose functions will be called by different threads to start, stop and post tasks. TaskRunner will internally create a thread and if the queue is not empty, it will pop the task from queue and executes it. Start() will check if the thread is running. If not creates a new thread. Stop() will join the thread. The code is as below.

bool TaskRunnerImpl::PostTask(Task* task) {
  tasks_queue_.push_back(task);
  return true;
}

void TaskRunnerImpl::Start() {
  std::lock_guard<std::mutex> lock(is_running_mutex_);
  if(is_running_) {
    return;
  }
  is_running_ = true;

  runner_thread_ = std::thread(&TaskRunnerImpl::Run, this);
}

void TaskRunnerImpl::Run() {
  while(is_running_) {
    if(tasks_queue_.empty()) {
      continue;
    }
    Task* task_to_run = tasks_queue_.front();
    task_to_run->Run();
    tasks_queue_.pop_front();

    delete task_to_run;
  }
}

void TaskRunnerImpl::Stop() {
  std::lock_guard<std::mutex> lock(is_running_mutex_);
  is_running_ = false;
  if(runner_thread_.joinable()) {
    runner_thread_.join();
  }
}

This code is working as expected. Continuously tasks are getting pushed and thread is executing those tasks. We want to use conditional variables now otherwise the thread will be continuously checking whether the task queue is empty or not. We implemented as below.

  • Thread function (Run()) will wait on condition variable.
  • PostTask() will signal if some one posts a task.
  • Stop() will signal if some one calls stop.

Implemented code as below.

bool TaskRunnerImpl::PostTask(Task* task, uint64_t delay_milliseconds) {
    std::lock_guard<std::mutex> taskGuard(m_task_mutex);
    tasks_queue_.push_back(task);
    m_task_cond_var.notify_one();
    INFO("{} : {} : {}", __FUNCTION__, delay_milliseconds, tasks_queue_.size());
    return true;
}

void TaskRunnerImpl::Start() {
    INFO("{}", __FUNCTION__);
    std::lock_guard<std::mutex> taskGuard(m_task_mutex);

    if(!is_running_) {
        is_running_ = true;
        runner_thread_ = std::thread(&TaskRunnerImpl::Run, this);
    }
}

void TaskRunnerImpl::Run() {
    while(true) {
        INFO("{} : {}", __FUNCTION__, 1);

        {
            std::unique_lock<std::mutex> mlock(m_task_mutex);
            INFO("{} : Locked Mutex", __FUNCTION__);
            m_task_cond_var.wait(mlock, [this]() {
                INFO("{} : Checking Condition", __FUNCTION__);
                return !(is_running_ && tasks_queue_.empty());
            });


                INFO("{} : Came out of wait", __FUNCTION__);
            if(!is_running_) {
                return;
            }

           INFO("{} : Escaped if cond", __FUNCTION__);
            if(!tasks_queue_.empty()) {
                INFO("{} : {} : {}", __FUNCTION__, 2, tasks_queue_.size());    // NO LOGS AFTER THIS GETTING PRINTED
                Task* task_to_run = tasks_queue_.front();
                task_to_run->Run();
                INFO("{} : Deleting Task", __FUNCTION__);
                tasks_queue_.pop_front();
                INFO("{} : After Deletion : {}", __FUNCTION__, tasks_queue_.size());
                delete task_to_run;
            }
        INFO("{} : Out of scope", __FUNCTION__);
        }

        INFO("{} : End of iteration", __FUNCTION__);
    }

    INFO("{} : returning", __FUNCTION__);
}

void TaskRunnerImpl::Stop() {
    {
        std::lock_guard<std::mutex> taskGuard(m_task_mutex);
        is_running_ = false;
        INFO("{} : Signalling STOP", __FUNCTION__);
        m_task_cond_var.notify_one();
    }

    INFO("{} : {}", __FUNCTION__, 1);

    if(runner_thread_.joinable()) {
        runner_thread_.join();
    }
}

Not sure what is wrong with the code. I am getting following output.

TaskRunnerImpl.cpp:34:INFO: Start
TaskRunnerImpl.cpp:45:INFO: Run : 1
TaskRunnerImpl.cpp:49:INFO: Run : Locked Mutex
TaskRunnerImpl.cpp:51:INFO: operator() : Checking Condition
TaskRunnerImpl.cpp:29:INFO: PostTask : 0 : 1
TaskRunnerImpl.cpp:29:INFO: PostTask : 0 : 2
TaskRunnerImpl.cpp:51:INFO: operator() : Checking Condition
TaskRunnerImpl.cpp:56:INFO: Run : Came out of wait
TaskRunnerImpl.cpp:61:INFO: Run : Escaped if cond
TaskRunnerImpl.cpp:63:INFO: Run : 2 : 2

That means the log is getting printed before executing the task and after that no logs. Usually PostTask() will be called continuously to post the tasks to queue. But with the new code no logs after task run. So I am assuming the thread function is holding the mutex and PostTask() is not able to push the tasks to queue. But unable to understand why there are no logs after executing the task. If I revert back to original code, the code is working as expected. Can anyone please let me know if there is any issue with the code.

CodePudding user response:

You probably have a dead-lock. In many locations, your code keep the mutex locked much longer than required.

When you start the thread, you usually don't need a lock.

Neither when stopping. You would use an atomic boolean for that purpose.

Finally, when you run each task (i.e task_to_run->Run();), you should definitively not have a lock.

If you have the mutex locked 99% of the time, then why not do everything in the main thread? As written, you cannot post any task while running a task as the mutex is locked. So your thread that post tasks will wait until the current task is completed.

Thus your posting thread will make almost no progress while the task queue is not empty.

Obviously, you want to put the top task in a local variable and pop that item out of the queue, then release the lock and then run the task. Something similar to this:

if(!tasks_queue_.empty())
{
    std::unique_ptr<Task> task_to_run(tasks_queue_.front());
    tasks_queue_.pop_front();
    mlock.unlock();
    task_to_run->Run();
}

Any good C book should explain that.

If you are serious with multithreading, then C Concurrency in action is a good read.

By the way, if you can use C 20, you can simplify a bit the code by using jthread and cancellation token.

Another book you might want to read is Concurrency with Modern C or a book on C 20.

And by the way, your queue should probably store std::unique_ptr too. Otherwise, you need cleanup code to delete tasks after calling Stop. It help make exception safe code while not required much extra code like explicit try/catch or cleanup loop.

Even bool TaskRunnerImpl::PostTask(Task* task) should be replaced by bool TaskRunnerImpl::PostTask(std::unique_ptr <Task> task) and you should then std::move your task.

  • Related