I commonly use boost::thread
to run threads. I need threads to be interrupted easely, so I use boost::this_thread::interruption_point()
. The code looks like this:
void do_long_calculations()
{
for (...)
{
boost::this_thread::interruption_point();
do_some_work();
}
}
auto t = boost::thread(do_long_calculations);
...
t.interrupt();
t.join();
Now I need a thread pool and I try to use boost::asio::thread_pool
. It seems it does not have a standard way to interrupt such threads. So, how can I interrupt all running threads in the pool? I need to interrupt all running jobs before destroying thread pool.
Is it possoble to make interruption by myself? Something like this (or by any other way):
boost::asio::thread_pool p;
std::set<boost::thread::id> thread_ids;
post(p, []()
{
thread_ids.insert(boost::this_thread::get_id());
do_long_calculations();
});
...
for (auto id : thread_ids)
SOME_INTERRUPTION_FUNCTION(id); // fire an event for boost::this_thread::interruption_point();
p.stop();
This way does not work:
boost::asio::thread_pool p;
std::set<boost::detail::thread_data_ptr> threads;
post(p, []()
{
if (auto d = boost::detail::get_current_thread_data())
threads.insert(d); // NEVER GOT HERE
do_long_calculations();
});
...
for (auto &d : threads)
d->interrupt();
p.stop();
Or may be I need to use some other thread_pool-compatible interruption checking instead/nearby of boost::this_thread::interruption_point()
call?
CodePudding user response:
I think in reality you do not want to control threads in this way. I think a thread pool shines when it's viewed like a "team", a computational resource, that is always available, "at the ready".
However if you want to interrupt tasks (not threads) you can.
Do not go with implementation details (detail::thread_data_base*
, or for that matter the idea that it will work with asio's thread pool; you don't know the locking required or undocumented semantics of the data members; it may be you need get_or_make_current_thread_data
or make_external_thread_data
etc.)
Instead go with thread_pool
's documented interface: attach
See it Live On Coliru
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iomanip>
#include <iostream>
#include <list>
using boost::chrono::seconds;
using boost::this_thread::sleep_for; // interruptable
static auto const now = std::chrono::steady_clock::now;
static auto const start = now();
auto trace(auto const&... msg) {
using namespace std::chrono_literals;
static constexpr std::hash<std::thread::id> hash{};
static std::mutex mx;
std::lock_guard lk(mx);
std::cout << std::fixed << std::setprecision(3) << std::setfill(' ') << std::setw(8)
<< (now() - start) / 1.0s << "ms " //
<< std::hex << std::showbase << std::setw(2) << std::setfill('0')
<< hash(std::this_thread::get_id()) % 256 << std::dec << " ";
(std::cout << ... << msg) << std::endl;
}
void long_calculation() {
try {
trace("start long_calculation");
sleep_for(seconds(5));
trace("complete long_calculation");
} catch (boost::thread_interrupted) {
trace("interrupted long_calculation");
}
}
int main() {
trace("Start");
boost::asio::thread_pool tp(0);
std::list<boost::thread> threads;
for (int i = 0; i < 4; i)
threads.emplace_back([&] { tp.attach(); });
post(tp, long_calculation);
post(tp, long_calculation);
sleep_for(seconds(2));
trace("Interrupt");
for (auto& th : threads)
th.interrupt();
tp.join(); // in case any asio native threads
trace("Waiting...");
tp.stop();
for (auto& th : threads)
if (th.joinable())
th.join();
trace("Bye");
}
Printing
0.000ms 0xac Start
0.002ms 0x9b start long_calculation
0.002ms 0x8e start long_calculation
2.002ms 0xac Interrupt
2.002ms 0xac Waiting...
2.003ms 0x9b interrupted long_calculation
2.004ms 0x8e interrupted long_calculation
2.004ms 0xac Bye
Recommendation
I still don't believe in thread interruption - I've heard only bad things and they lead to considerable size bloat in all Boost Thread internals (see e.g.).
Besides, I see some non-trivial issues with controlled shutdown even with the attach
approach just shown.
I'd say using an atomic flag or some weak_ptr in your code probably allows you to interrupt your calculations. If you need to maintain support for interruption points, I'd really recommend going without Asio's thread_pool
which really doesn't add value at that point:
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iomanip>
#include <iostream>
using boost::chrono::seconds;
using boost::this_thread::sleep_for; // interruptable
static auto const now = std::chrono::steady_clock::now;
static auto const start = now();
auto trace(auto const&... msg) {
using namespace std::chrono_literals;
static constexpr std::hash<std::thread::id> hash{};
static std::mutex mx;
std::lock_guard lk(mx);
std::cout << std::fixed << std::setprecision(3) << std::setfill(' ') << std::setw(8)
<< (now() - start) / 1.0s << "ms " //
<< std::hex << std::showbase << std::setw(2) << std::setfill('0')
<< hash(std::this_thread::get_id()) % 256 << std::dec << " ";
(std::cout << ... << msg) << std::endl;
}
void long_calculation() {
try {
trace("start long_calculation");
sleep_for(seconds(5));
trace("complete long_calculation");
} catch (boost::thread_interrupted) {
trace("interrupted long_calculation");
}
}
struct my_pool : boost::asio::io_context {
my_pool(unsigned nthreads) {
while (nthreads--)
threads_.create_thread([this] { worker(); });
}
void join() {
work_.reset();
threads_.join_all();
}
void interrupt() {
threads_.interrupt_all();
}
~my_pool() {
join();
}
private:
void worker() {
// http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
for (;;) {
try {
this->run();
break; // exited normally
} catch (std::exception const& e) {
trace("pool_worker exception: ", e.what());
} catch (...) {
trace("pool_worker exception: unhandled");
}
}
}
boost::thread_group threads_;
boost::asio::executor_work_guard<executor_type> work_{get_executor()};
};
int main() {
trace("Start");
my_pool tp(4);
for (int i = 0; i < 5; i)
post(tp, long_calculation);
sleep_for(seconds(2));
trace("Interrupt");
tp.interrupt();
trace("Waiting...");
tp.join();
trace("Bye");
}
Prints e.g.
0.000ms 0x18 Start
0.000ms 0x88 start long_calculation
0.000ms 0x88 start long_calculation
0.000ms 0xbe start long_calculation
0.000ms 0x68 start long_calculation
2.000ms 0x18 Interrupt
2.000ms 0x18 Waiting...
2.001ms 0x68 interrupted long_calculation
2.001ms 0x68 start long_calculation
2.001ms 0x88 interrupted long_calculation
2.001ms 0xbe interrupted long_calculation
2.001ms 0x88 interrupted long_calculation
7.001ms 0x68 complete long_calculation
7.001ms 0x18 Bye
Note how controlled shutdown works correctly even when uninterrupted tasks are pending.
(You can do this trivially without thread_group
: http://coliru.stacked-crooked.com/a/261986d2975e1da4)