Home > Blockchain >  Thread pool with individual std::function jobs per worker crashes with segmentation fault
Thread pool with individual std::function jobs per worker crashes with segmentation fault

Time:10-28

I have successfully implemented the thread pool from an answer on Stack Overflow, which helped me in speeding up my program. It uses a single std::queue to distribute jobs (std::function<void()>) among multiple workers (std::threads).

I wanted to improve on this. As I only need to run a limited set of functions, I planned to ditch the queue and to use variables instead. In other words, the n-th worker would do the n-th job from the std::vector<std::function<void()>>. Unfortunately, my test app crashes with Segmentation fault (core dumped) and I could not realize my mistake so far.

Here is my ~minimal reproducible code, with the job of counting the odd elements in a vector. (Idea taken from Scott Meyers: Cpu Caches and Why You Care.)

#include <algorithm>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <stdexcept> // std::invalid_argument
#include <thread>
#include <vector>

// Thread pool with a std::function for each worker.
class Pool {
  public:
    enum class Status {
        idle,
        working,
        terminate
    };

    const int worker_count;

    std::vector<Status> statuses;
    std::vector<std::mutex> mutexes;
    std::vector<std::condition_variable> conditions;
    std::vector<std::thread> threads;
    std::vector<std::function<void()>> jobs;

    void thread_loop(int thread_id)
    {
        std::puts("Thread started");
        auto &my_status = statuses[thread_id];
        auto &my_mutex = mutexes[thread_id];
        auto &my_condition = conditions[thread_id];
        auto &my_job = jobs[thread_id];

        while (true) {
            std::unique_lock<std::mutex> lock(my_mutex);
            my_condition.wait(lock, [this, &my_status] { return my_status != Status::idle; });

            if (my_status == Status::terminate)
                return;

            my_job();
            my_status = Status::idle;
            lock.unlock();
            my_condition.notify_one(); // Tell the main thread we are done
        }
    }

  public:
    Pool(int size) : worker_count(size), statuses(size, Status::idle), mutexes(size), conditions(size), threads(), jobs(size)
    {
        if (size < 0)
            throw std::invalid_argument("Worker count needs to be a positive integer");
    };
    ~Pool()
    {
        for (int i = 0; i < worker_count;   i) {
            std::unique_lock lock(mutexes[i]);
            statuses[i] = Status::terminate;
            lock.unlock(); // Unlock before notifying
            conditions[i].notify_one();
        }

        for (auto &thread : threads)
            thread.join();
        threads.clear();
    };

    void start_threads()
    {
        threads.resize(worker_count);
        jobs.resize(worker_count);

        for (int i = 0; i < worker_count;   i) {
            statuses[i] = Status::idle;
            jobs[i] = []() { std::puts("I am running"); };
            threads[i] = std::thread(&Pool::thread_loop, this, i);
        }
    }

    void set_and_start_job(const std::function<void(int)> &job)
    {
        for (int i = 0; i < worker_count;   i) {
            std::unique_lock lock(mutexes[i]);
            jobs[i] = [&job, i]() { job(i); };
            statuses[i] = Status::working;
            lock.unlock();
            conditions[i].notify_one();
        }
    }

    void wait()
    {
        for (int i = 0; i < worker_count;   i) {
            auto &my_status = statuses[i];
            std::unique_lock lock(mutexes[i]);
            conditions[i].wait(lock, [this, &my_status] { return my_status != Status::working; });
        }
    }
};

int main()
{
    constexpr int worker_count = 1;
    constexpr int vector_size = 1 << 10;

    std::vector<int> test_vector;
    test_vector.reserve(vector_size);
    for (int i = 0; i < vector_size;   i)
        test_vector.push_back(i);

    std::vector<int> worker_odd_counts(worker_count, 0);

    const auto worker_task = [&](int thread_id) {
        int chunk_size = vector_size / (worker_count)   1;
        int my_start = thread_id * chunk_size;
        int my_end = std::min(my_start   chunk_size, vector_size);

        int local_odd_count = 0;
        for (int ii = my_start; ii < my_end;   ii)
            if (test_vector[ii] % 2 != 0)
                  local_odd_count;

        worker_odd_counts[thread_id] = local_odd_count;
    };

    Pool pool = Pool(worker_count);
    pool.start_threads();
    pool.set_and_start_job(worker_task);
    pool.wait();

    int odd_count = 0;
    for (auto elem : worker_odd_counts)
        odd_count  = elem;
    std::cout << odd_count << '\n';
}


CodePudding user response:

TL;DR version:

The simplest fix is to change

jobs[i] = [&job, i]() { job(i); };

to

jobs[i] = [job, i]() { job(i); }; 

This captures job by value and makes a copy. The copy won't go out of scope before the lambda does and the lambda will outlive the thread.

The Long version:

The problem is at

jobs[i] = [&job, i]() { job(i); };

in set_and_start_job. The object backing job goes out of scope before the threads get started, but how can this be if

pool.set_and_start_job(worker_task);

and worker_task won't go out of scope until after the the threads are joined?

Turns out that's because set_and_start_job requires a const std::function<void(int)> & and worker_task isn't a std::function, merely implicitly convertible to a std::function. This conversion makes a temporary variable with a lifespan bound to set_and_start_job's job parameter. When set_and_start_job exits, job goes out of scope and the temporary is destroyed.

The simple fix is above, but we can also make the conversion right at the source to that `std::function is passed all the way through the system and will go out of scope after the threads are joined.

const std::function<void(int)> worker_task = [&](int thread_id) { ... };

There may be some small resource saving in end-to-end std::function and capturing a reference, but my experiences with references and threads haven't been the best, so I'd prefer the copy to reduce the possibility that I've missed some subtlety or someone in the future will make a change that adds some.

CodePudding user response:

In the function Pool::set_and_start_job, when setting the job, removing the & from the job capture seems to have resolved the issue:

jobs[i] = [job, i]() { job(i); };

However, I just had the suspicion and does not know the underlying cause.

  • Related