Home > Mobile >  Stopping multiple threads at once
Stopping multiple threads at once

Time:03-30

What do I miss in the program below with threads waiting for a condition_variable_any to determine when to stop ? In the program listed below, the threads stop in an impredictable way; some before the call to notify_all and some don't stop at all.

The condition variable used is defined as below:

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;

The threads check if it is time to stop as below:

std::unique_lock<std::mutex> lock(interrupt_mutex);
const auto cv_status = interrupt_cv.wait_for(lock, std::chrono::milliseconds(1000));
const auto timeout_expired = cv_status == std::cv_status::timeout;
if (!timeout_expired)
{
    quit = true;
}

The main thread signals the threads to stop as below:

std::unique_lock<std::mutex> lock(interrupt_mutex);
interrupt_cv.notify_all();

A possible output looks like:

Thread  1> Received interrupt signal at iteration 2
Thread  1> Terminate
Thread  2> Received interrupt signal at iteration 2
Thread  2> Terminate
Thread  4> Received interrupt signal at iteration 2
Thread  4> Terminate
**** Requesting all threads to stop ****
Waiting for all threads to complete...

Below the complete code that reproduces the problem:

#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;

int main()
{
    std::vector<std::thread> thread_handles;
    for (int thread_idx = 0; thread_idx < 4;   thread_idx)
    {
        thread_handles.emplace_back(std::thread([thread_idx](const int thread_id)
        {
            int num_iterations = 0;
            auto quit = false;
            while (!quit)
            {
                // Fake processing time during the lock for testing purpose
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
                  num_iterations;

                // Check if need to stop with a timeout of 200ms 
                {
                    std::unique_lock<std::mutex> lock(interrupt_mutex);
                    const auto cv_status = interrupt_cv.wait_for(lock, std::chrono::milliseconds(1000));
                    if (const auto timeout_expired = cv_status == std::cv_status::timeout; !timeout_expired)
                    {
                        printf("Thread -> Received interrupt signal at iteration %d\n", thread_id, num_iterations);
                        quit = true;
                    }
                }
            }

            printf("Thread -> Terminate\n", thread_id);
        }, thread_idx   1));
    }

    std::this_thread::sleep_for(std::chrono::seconds(5));

    // Signals all threads to stop
    {
        printf("**** Requesting all threads to stop ****\n");
        std::unique_lock<std::mutex> lock(interrupt_mutex);
        interrupt_cv.notify_all();
    }

    // Wait until all threads stop
    printf("Waiting for all threads to complete...\n");
    std::ranges::for_each(thread_handles, [](std::thread& thread_handle)
    {
        thread_handle.join();
    });

    printf("Program ends\n");
    return 0;
}

CodePudding user response:

A condition_variable is meant to signal threads when a condition changes (ie, such as when a shared variable changes value). But your code has no condition. You are trying to use the condition_variable itself as a quit signal, and that is not what it is meant for. notify_all() will only wake up threads that are actively waiting on the condition_variable at that exact moment. Threads that are not waiting on it, because they are busy doing something else, will not receive that signal to terminate. But those threads will need to detect the condition once they are ready to wait. So the condition needs to be something more persistent. That is why your code is not working correctly.

In this case, you can simply move your quit variable to global scope, next to the condition_variable and mutex. Setting that quit variable will act as your condition that you can signal waiting threads about. You can then use the overloaded version of wait_for() that will let you check the current state of quit (to ignore spurious awakenings).

Try something more like this:

#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;
static bool quit = false;

int main()
{
    std::vector<std::thread> thread_handles;
    for (int thread_idx = 0; thread_idx < 4;   thread_idx)
    {
        thread_handles.emplace_back(std::thread([thread_idx](const int thread_id)
        {
            int num_iterations = 0;
            while (true)
            {
                // Fake processing time outside the lock for testing purpose
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
                  num_iterations;

                // Check if need to stop with a timeout of 1s 
                {
                    std::unique_lock<std::mutex> lock(interrupt_mutex);
                    const bool signaled = interrupt_cv.wait_for(lock, std::chrono::seconds(1), [](){ return quit; });
                    if (signaled) break;
                }
            }

            printf("Thread -> Received interrupt signal at iteration %d\n", thread_id, num_iterations);
            printf("Thread -> Terminate\n", thread_id);
        }, thread_idx   1));
    }

    std::this_thread::sleep_for(std::chrono::seconds(5));

    // Signals all threads to stop
    {
        printf("**** Requesting all threads to stop ****\n");
        std::unique_lock<std::mutex> lock(interrupt_mutex);
        quit = true;
        interrupt_cv.notify_all();
    }

    // Wait until all threads stop
    printf("Waiting for all threads to complete...\n");
    std::ranges::for_each(thread_handles, [](std::thread& thread_handle)
    {
        thread_handle.join();
    });

    printf("Program ends\n");
    return 0;
}

CodePudding user response:

There are 2 issues with your code and both have the same solution.

  1. Spurious wake-up. If your wait_for is ended because of SW, you will have your condition satisfied, although no one actually requested it to wake-up/end.
  2. What if your thread isn't holding a lock and not sleeping and your main thread notifies all? Note, that notifications aren't stored anywhere so if you missed one, you won't get one later. So those threads that missed will never terminate.

To fix both issues you need another flag, which will tell your threads that job is done and they have to stop.

static bool stop = false;
//...
if (stop) // Instead of if (const auto timeout_expired = cv_status == std::cv_status::timeout; !timeout_expired)
//...
printf("**** Requesting all threads to stop ****\n");
std::unique_lock<std::mutex> lock(interrupt_mutex);
stop = true;
interrupt_cv.notify_all();
  • Related