Im new to C , and trying to get my head around multithreading. I’ve got the basics covered. Now imagine this situation:
I have, say, N tasks that I want to have completed ASAP. That‘s easy, just start N threads and lean back. But I’m not sure if this will work for N=200 or more.
So I’d like to say: I have N tasks, and I want to start a limited number of M worker threads. How do I schedule a task to be issued to a new thread once one of the previous threads has finished?
Or is all this taken care of by the OS or runtime, and I need not worry at all, even if N gets really big?
CodePudding user response:
std::thread::hardware_concurrancy
may be useful to decide how many threads you want. If it returns anything but 0
it is the number of concurrent threads which can run simultaneously. It's often the number of CPU cores multiplied with the number of hyperthreads each core may run. 12 cores and 2 HT:s/core makes 24. Exceeding this number will likely just slow everything down.
You can create a pool of threads standing by to grab work on your command since creating threads is somewhat expensive. If you have 1000000 tasks to deal with, you want the 24 threads (in this example) to be up all the time.
This is a very common scenario though and since C 17 there is an addition to many of the standard algorithms, like std::for_each
, to make them execute according to execution policies. If you want it to execute in parallel, it'll use a built-in thread pool (most likely) to finish the task.
Example:
#include <algorithm>
#include <execution>
#include <vector>
struct Task {
some_type data_to_work_on;
some_type result;
};
int main() {
std::vector<Task> tasks;
std::for_each(std::execution::par, tasks.begin(), tasks.end(), [](Task& t) {
// work on task `t` here
});
// all tasks done, check the result in each.
}
CodePudding user response:
No, you don’t want to create 200 threads. While it would likely work just fine, creating a thread involves significant processing overhead. Rather, you want a “task queue” system, where a pool of worker threads (generally equal in size to the number of CPU cores) draw from a shared queue of things that need to be done. Intel TBB contains a commonly used task queue implementation, but there are others as well.
CodePudding user response:
This is my take on a threadpool (not extensively debugged yet). In main, it starts a threadpool with the maximum of threads the hardware allows (the thing Ted Lyngmo was referring to)
There are quite a few things involved since this threadpool also allows callers to get back the results of asynchronously started call
- std::shared_future (to return a result to caller if needed)
- std::packaged_task (to hold a call)
- std::condition_variable (to communicate that stuff has entered the queue, or to signal all threads should stop)
- std::mutex/std::unique_lock (to protect the queue of calls)
- std::thread (ofcourse)
- use of lambda's
#include <cassert>
#include <condition_variable>
#include <exception>
#include <iostream>
#include <mutex>
#include <future>
#include <thread>
#include <vector>
#include <queue>
//=====================================================================================================================================
namespace details
{
// task_itf is something the threadpool can call to start a scheduled function call
// independent of argument and/or return value types
class task_itf
{
public:
virtual void execute() = 0;
};
//-------------------------------------------------------------------------------------------------------------------------------------
// A task is a container for a function call arguments a future.
// but is already specialized for the return value type of the function call
// which the future also needs
//
template<typename retval_t>
class task final :
public task_itf
{
public:
template<typename lambda_t>
explicit task(lambda_t&& lambda) :
m_task(lambda)
{
}
std::future<retval_t> get_future()
{
return m_task.get_future();
}
std::shared_future<retval_t> get_shared_future()
{
return std::shared_future<retval_t>(m_task.get_future());
}
virtual void execute() override
{
m_task();
}
private:
std::packaged_task<retval_t()> m_task;
};
class stop_exception :
public std::exception
{
};
}
//-------------------------------------------------------------------------------------------------------------------------------------
// actual thread_pool class
class thread_pool_t
{
public:
// construct a thread_pool with specified number of threads.
explicit thread_pool_t(const std::size_t size) :
m_stop{ false }
{
std::condition_variable signal_started;
std::atomic<std::size_t> number_of_threads_started{ 0u };
for (std::size_t n = 0; n < size; n)
{
// move the thread into the vector, no need to copy
m_threads.push_back(std::move(std::thread([&]()
{
{
number_of_threads_started ;
signal_started.notify_all();
}
thread_loop();
})));
}
// wait for all threads to have started.
std::mutex mtx;
std::unique_lock<std::mutex> lock{ mtx };
signal_started.wait(lock, [&] { return number_of_threads_started == size; });
}
// destructor signals all threads to stop as soon as they are done.
// then waits for them to stop.
~thread_pool_t()
{
{
std::unique_lock<std::mutex> lock(m_queue_mutex);
m_stop = true;
}
m_wakeup.notify_all();
for (auto& thread : m_threads)
{
thread.join();
}
}
// pass a function asynchronously to the threadpool
// this function returns a future so the calling thread
// my synchronize with a result if it so wishes.
template<typename lambda_t>
auto async(lambda_t&& lambda)
{
using retval_t = decltype(lambda());
auto task = std::make_shared<details::task<retval_t>>(lambda);
queue_task(task);
return task->get_shared_future();
}
// let the threadpool run the function but wait for
// the threadpool thread to finish
template<typename lambda_t>
auto sync(lambda_t&& lambda)
{
auto ft = async(lambda);
return ft.get();
}
void synchronize()
{
sync([] {});
}
private:
void queue_task(const std::shared_ptr<details::task_itf>& task_ptr)
{
{
std::unique_lock<std::mutex> lock(m_queue_mutex);
m_queue.push(task_ptr);
}
// signal only one thread, first waiting thread to wakeup will run the next task.
m_wakeup.notify_one();
}
std::shared_ptr<details::task_itf> get_next_task()
{
static auto pred = [this] { return (m_stop || (m_queue.size() > 0)); };
std::unique_lock<std::mutex> lock(m_queue_mutex);
while (!pred())
{
m_wakeup.wait(lock, pred);
}
if (m_stop)
{
// use exception to break out of the mainloop
throw details::stop_exception();
}
auto task = m_queue.front();
m_queue.pop();
return task;
}
void thread_loop()
{
try
{
while (auto task = get_next_task())
{
task->execute();
}
}
catch (const details::stop_exception&)
{
}
}
std::vector<std::thread> m_threads;
std::mutex m_queue_mutex;
std::queue<std::shared_ptr<details::task_itf>> m_queue;
std::condition_variable m_wakeup;
bool m_stop;
};
//-----------------------------------------------------------------------------
int main()
{
thread_pool_t thread_pool{ std::thread::hardware_concurrency() };
for (int i = 0; i < 200; i )
{
// just schedule asynchronous calls, returned futures are not used in this example
thread_pool.async([i]
{
std::cout << i << " ";
});
}
// this threadpool will not by default wait until all work is finished
// but stops processing when destructed.
// a call to synchronize will block until all work is done that is queued up till this moment.
thread_pool.synchronize();
std::cout << "\nDone...\n";
return 0;
}
CodePudding user response:
- I have N tasks, and I want to start a limited number of M worker threads.
- How do I schedule a task to be issued to a new thread once one of the previous threads has finished?
- Set your thread pool size,
M
, taking into account the number of threads available in your system (hardware_concurrency
). - Use a
counting_semaphore
to make sure you don't launch a task if there is not an available thread pool slot. - Loop through your
N
tasks, acquiring a thread pool slot, running the task, and releasing the thread pool slot. Notice that, since tasks are launched asynchronously, you will be able to haveM
tasks running in parallel.
#include <future> // async
#include <iostream> // cout
#include <semaphore> // counting_semaphore
#include <vector>
static const size_t THREAD_POOL_SIZE_DEFAULT{ std::thread::hardware_concurrency() };
static const size_t THREAD_POOL_SIZE_MAX{ std::thread::hardware_concurrency() * 2 };
static const size_t NUM_TASKS_DEFAULT{ 20 };
template <typename F>
void run_tasks(
F&& f,
size_t thread_pool_size = THREAD_POOL_SIZE_DEFAULT,
size_t num_tasks = NUM_TASKS_DEFAULT)
{
thread_pool_size = std::min(thread_pool_size, THREAD_POOL_SIZE_MAX);
std::counting_semaphore task_slots(thread_pool_size);
auto futures{ std::vector<std::future<void>>(num_tasks) };
auto task_results{ std::vector<int>(num_tasks) };
// We can run thread_pool_size tasks in parallel
// If all task slots are busy, we have to wait for a task to finish
for (size_t i{ 0 }; i < num_tasks; i)
{
// Wait for a task slot to be free
task_slots.acquire();
futures[i] = std::async(
std::launch::async,
[i, &f, &task_result = task_results[i], &task_slots]() {
// Execute task
task_result = std::forward<F>(f)(i);
// Release the task slot
task_slots.release();
}
);
}
// Wait for all the tasks to finish
for (auto& future : futures) { future.get(); };
for (auto& result: task_results) { std::cout << result << " "; }
}
int main()
{
run_tasks([](int i) { return i * i; }, 4, 20);
}