Home > Blockchain >  Thread pools not working with large number of tasks
Thread pools not working with large number of tasks

Time:12-15

I am trying to create a thread pool with native C and I am using the code listings from the book "C Concurrency in Action". The problem I have is that when I submit more work items than the number of threads, not all the work items get done. In the simple example below, I am trying to submit the runMe() function 200 times but the function is run only 8 times. It seems like this shouldn't happen because in the code, the work_queue is separate from the work threads. Here is the code:

#include "iostream"
#include "ThreadPool.h"
void runMe()
{
    cout << "testing" << endl;
}

int main(void)
{
    thread_pool pool;
    for (int i = 0; i < 200; i  )
    {
        std::function<void()> myFunction = [&] {runMe(); };
        pool.submit(myFunction);
    }


    return 0;
}

ThreadPool.h class

#include <queue>
#include <future>
#include <list>
#include <functional>
#include <memory>
template<typename T>
class threadsafe_queue
{
private:
    mutable std::mutex mut;
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    threadsafe_queue() {}
    void push(T new_value)
    {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(std::move(new_value));
        data_cond.notify_one();
    }
    void wait_and_pop(T& value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this] {return !data_queue.empty(); });
        value = std::move(data_queue.front());
        data_queue.pop();
    }
    bool try_pop(T& value)
    {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return false;
        value = std::move(data_queue.front());
        data_queue.pop();
        return true;
    }
    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
    int size()
    {
        return data_queue.size();
    }
};

class join_threads
{
    std::vector<std::thread>& threads;
public:
    explicit join_threads(std::vector<std::thread>& threads_) : threads(threads_) {}
    ~join_threads()
    {
        for (unsigned long i = 0; i < threads.size(); i  )
        {
            if (threads[i].joinable())
            {
                threads[i].join();
            }
        }
    }
};

class thread_pool
{
    std::atomic_bool done;
    threadsafe_queue<std::function<void()> > work_queue;
    std::vector<std::thread> threads;
    join_threads joiner;
    void worker_thread()
    {
        while (!done)
        {
            std::function<void()> task;
            if (work_queue.try_pop(task))
            {
                task();
                numActiveThreads--;
            }
            else
            {
                std::this_thread::yield();
            }
        }
    }
public:
    int numActiveThreads;
    thread_pool() : done(false), joiner(threads), numActiveThreads(0)
    {
        unsigned const thread_count = std::thread::hardware_concurrency();
        try
        {
            for (unsigned i = 0; i < thread_count; i  )
            {
                threads.push_back(std::thread(&thread_pool::worker_thread, this));
            }
        }
        catch (...)
        {
            done = true;
            throw;
        }
    }
    ~thread_pool()
    {
        done = true;
    }
    template<typename FunctionType>
    void submit(FunctionType f)
    {
        work_queue.push(std::function<void()>(f));
        numActiveThreads  ;
    }
    int size()
    {
        return work_queue.size();
    }
    bool isQueueEmpty()
    {
        return work_queue.empty();
    }
};

Any idea on how to use the work_queue properly?

CodePudding user response:

When pool is destroyed at the end of main, your destructor sets done, making your worker threads exit.

You should make the destructor (or possibly main, if you want to make this optional) wait for the queue to drain before setting the flag.

  • Related