I want to split jobs among multiple std::thread
workers and continue once they are all done.
To do so, I implemented a thread pool class mainly based on this SO answer.
I noticed, however, that my benchmarks can get stuck, running forever, without any errors thrown.
I wrote a minimal reproducing code, enclosed at the end. Based on terminal output, the issue seems to occur when the jobs are being queued. I checked videos (1, 2), documentation (3) and blog posts (4). I tried replacing the type of the locks, using atomics. I could not find the underlying cause.
Here is the snippet to replicate the issue. The program repeatedly counts the odd elements in the test vector.
#include <atomic>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
class Pool {
public:
const int worker_count;
bool to_terminate = false;
std::atomic<int> unfinished_tasks = 0;
std::mutex mutex;
std::condition_variable condition;
std::vector<std::thread> threads;
std::queue<std::function<void()>> jobs;
void thread_loop()
{
while (true) {
std::function<void()> job;
{
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock, [&] { return (!jobs.empty()) || to_terminate; });
if (to_terminate)
return;
job = jobs.front();
jobs.pop();
}
job();
unfinished_tasks -= 1;
}
}
public:
Pool(int size) : worker_count(size)
{
if (size < 0)
throw std::invalid_argument("Worker count needs to be a positive integer");
for (int i = 0; i < worker_count; i)
threads.push_back(std::thread(&Pool::thread_loop, this));
};
~Pool()
{
{
std::unique_lock lock(mutex);
to_terminate = true;
}
condition.notify_all();
for (auto &thread : threads)
thread.join();
threads.clear();
};
void queue_job(const std::function<void()> &job)
{
{
std::unique_lock<std::mutex> lock(mutex);
jobs.push(job);
unfinished_tasks = 1;
// std::cout << unfinished_tasks;
}
condition.notify_one();
}
void wait()
{
while (unfinished_tasks) {
; // spinlock
};
}
};
int main()
{
constexpr int worker_count = 8;
constexpr int vector_size = 1 << 10;
Pool pool = Pool(worker_count);
std::vector<int> test_vector;
test_vector.reserve(vector_size);
for (int i = 0; i < vector_size; i)
test_vector.push_back(i);
std::vector<int> worker_odd_counts(worker_count, 0);
std::function<void(int)> worker_task = [&](int thread_id) {
int chunk_size = vector_size / (worker_count) 1;
int my_start = thread_id * chunk_size;
int my_end = std::min(my_start chunk_size, vector_size);
int local_odd_count = 0;
for (int ii = my_start; ii < my_end; ii)
if (test_vector[ii] % 2 != 0)
local_odd_count;
worker_odd_counts[thread_id] = local_odd_count;
};
for (int iteration = 0;; iteration) {
std::cout << "Jobs.." << std::flush;
for (int i = 0; i < worker_count; i)
pool.queue_job([&worker_task, i] { worker_task(i); });
std::cout << "..queued. " << std::flush;
pool.wait();
int odd_count = 0;
for (auto elem : worker_odd_counts)
odd_count = elem;
std::cout << "Iter:" << iteration << ". Odd:" << odd_count << '\n';
}
}
Here is the terminal output of one specific run:
[...]
Jobs....queued. Iter:2994. Odd:512
Jobs....queued. Iter:2995. Odd:512
Jobs..
Edit: The error occurres using GCC 12.2.0 x86_64-w64-mingw32 on Windows 10 with AMD Ryzen 4750U CPU. I do not get past 15k iterations . Using Visual Studio Community 2022, I got past 1.5M iterations (and stopped it myself). Thanks @IgorTandetnik for pointing out the latter.
CodePudding user response:
Mingw doesn’t natively support multithreading on Windows. They supporting threads in their C standard library over the POSIX API, and winpthreads
compatibility layer which implements that API on top of the Windows OS threads.
I think your error is not in the C code, but in the computer setup. Do the following.
Use the compiler from
x86_64-12.2.0-release-posix-seh-ucrt-rt_v10-rev2.7z
archive, there.Don’t forget the binary built that way depends on a bunch of DLL files provided by the compiler:
libgcc_s_seh-1.dll
,libwinpthread-1.dll
andlibstdc -6.dll
. You must use exactly the same version of these DLL which were shipped with mingw. If you have some other versions of these DLLs anywhere in your%PATH%
, expect all kinds of fails.
Couple general notes.
Linux-first C compilers like gcc have issues on Windows. A path of least resistance is using Visual C instead. If you want your software to build on other platforms as well, consider cmake to abstract away the compiler.
Windows already includes a thread pool implementation, since Vista. The API is easy to use, you only need 4 functions: CreateThreadpoolWork, SubmitThreadpoolWork, WaitForThreadpoolWorkCallbacks, and CloseThreadpoolWork. Example.
CodePudding user response:
The first thing you should do is split the queue from the thread pool. They are both tricky enough, writing both of them comingled in one class is asking for trouble.
This also allows you to unit test the queue without the pool.
template<class Payload>
class MutexQueue {
public:
std::optional<Payload> wait_and_pop();
void push(Payload);
void terminate_queue();
bool queue_is_terminated() const;
private:
mutable std::mutex m;
std::condition_variable cv;
std::deque<Payload> q;
bool terminated = false;
std::unique_lock<std::mutex> lock() const {
return std::unique_lock<std::mutex>(m);
}
};
this is a bit easier to write than the thread pool.
void push(Payload p) {
{
auto l = lock();
if (terminate) return;
q.push_back(std::move(p));
}
cv.notify_one();
}
void terminate_queue() {
{
auto l = lock(); // YOU CANNOT SKIP THIS LOCK, even if terminate is atomic
terminate = true;
q.clear();
}
cv.notify_all();
}
bool queue_is_terminated() const {
auto l = lock(); // if you make terminate atomic, you CAN skip this lock
return terminate;
}
std::optional<Payload> wait_and_pop() {
auto l = lock();
cv.wait(l, [&]{ return terminate || !q.empty(); }
if (terminate) return std::nullopt;
auto r = std::move(q.front());
q.pop_front();
return std::move(r);
}
there we go.
Now our thread pool is simpler.
struct ThreadPool {
explicit ThreadPool(std::size_t n) {
create_threads(n);
}
std::future<void> push_task(std::function<void()> f) {
std::packaged_task<void()> p = std::move(f);
auto r = p.get_future();
q.push( std::move(p) );
return r;
}
void terminate_pool() {
q.terminate_queue();
terminate_threads();
}
~ThreadPool() {
terminate_pool();
}
private:
MutexQueue<std::packaged_task<void()>> q;
std::vector<std::thread> threads;
void terminate_threads() {
for(auto& thread:threads)
thread.join();
threads.clear();
}
static void thread_task( MutexQueue<std::packaged_task<void()>>* pq ) {
if (!pq) return;
while (auto task = pq->wait_and_pop()) {
(*task)();
}
}
void create_threads(std::size_t n) {
for (std::size_t i = 0; i < n; i) {
threads.push_back( std::thread( thread_task, &q ) );
}
}
I cannot spot an error in your code. But with the above, you can test a split of the queue from the pool.
The queue will work with pthreads or other primitives.