Home > Software engineering >  Interrupt all threads in the Boost Asio Thread Pool
Interrupt all threads in the Boost Asio Thread Pool

Time:11-03

I commonly use boost::thread to run threads. I need threads to be interrupted easely, so I use boost::this_thread::interruption_point(). The code looks like this:

void do_long_calculations()
{
  for (...)
  {
    boost::this_thread::interruption_point();
    do_some_work();
  }
}

auto t = boost::thread(do_long_calculations);
...
t.interrupt();
t.join();

Now I need a thread pool and I try to use boost::asio::thread_pool. It seems it does not have a standard way to interrupt such threads. So, how can I interrupt all running threads in the pool? I need to interrupt all running jobs before destroying thread pool.

Is it possoble to make interruption by myself? Something like this (or by any other way):

boost::asio::thread_pool p;
std::set<boost::thread::id> thread_ids;
post(p, []()
{
  thread_ids.insert(boost::this_thread::get_id());
  do_long_calculations();
});
...
for (auto id : thread_ids)
  SOME_INTERRUPTION_FUNCTION(id); // fire an event for boost::this_thread::interruption_point();
p.stop();

This way does not work:

boost::asio::thread_pool p;
std::set<boost::detail::thread_data_ptr> threads;
post(p, []()
{
  if (auto d = boost::detail::get_current_thread_data())
    threads.insert(d); // NEVER GOT HERE
  do_long_calculations();
});
...
for (auto &d : threads)
  d->interrupt();
p.stop();

Or may be I need to use some other thread_pool-compatible interruption checking instead/nearby of boost::this_thread::interruption_point() call?

CodePudding user response:

I think in reality you do not want to control threads in this way. I think a thread pool shines when it's viewed like a "team", a computational resource, that is always available, "at the ready".

However if you want to interrupt tasks (not threads) you can.

Do not go with implementation details (detail::thread_data_base*, or for that matter the idea that it will work with asio's thread pool; you don't know the locking required or undocumented semantics of the data members; it may be you need get_or_make_current_thread_data or make_external_thread_data etc.)

Instead go with thread_pool's documented interface: attach

See it Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iomanip>
#include <iostream>
#include <list>
using boost::chrono::seconds;
using boost::this_thread::sleep_for; // interruptable

static auto const now   = std::chrono::steady_clock::now;
static auto const start = now();
auto trace(auto const&... msg) {
    using namespace std::chrono_literals;
    static constexpr std::hash<std::thread::id> hash{};
    static std::mutex                           mx;

    std::lock_guard lk(mx);
    std::cout << std::fixed << std::setprecision(3) << std::setfill(' ') << std::setw(8)
              << (now() - start) / 1.0s << "ms " //
              << std::hex << std::showbase << std::setw(2) << std::setfill('0')
              << hash(std::this_thread::get_id()) % 256 << std::dec << " ";
    (std::cout << ... << msg) << std::endl;
}

void long_calculation() {
    try {
        trace("start long_calculation");
        sleep_for(seconds(5));
        trace("complete long_calculation");
    } catch (boost::thread_interrupted) {
        trace("interrupted long_calculation");
    }
}

int main() {
    trace("Start");
    boost::asio::thread_pool tp(0);
    std::list<boost::thread> threads;

    for (int i = 0; i < 4;   i)
        threads.emplace_back([&] { tp.attach(); });

    post(tp, long_calculation);
        post(tp, long_calculation);

    sleep_for(seconds(2));

    trace("Interrupt");
    for (auto& th : threads)
        th.interrupt();

    tp.join(); // in case any asio native threads
    trace("Waiting...");
    tp.stop();
    for (auto& th : threads)
        if (th.joinable())
            th.join();

    trace("Bye");
}

Printing

   0.000ms 0xac Start
   0.002ms 0x9b start long_calculation
   0.002ms 0x8e start long_calculation
   2.002ms 0xac Interrupt
   2.002ms 0xac Waiting...
   2.003ms 0x9b interrupted long_calculation
   2.004ms 0x8e interrupted long_calculation
   2.004ms 0xac Bye

Recommendation

I still don't believe in thread interruption - I've heard only bad things and they lead to considerable size bloat in all Boost Thread internals (see e.g.).

Besides, I see some non-trivial issues with controlled shutdown even with the attach approach just shown.

I'd say using an atomic flag or some weak_ptr in your code probably allows you to interrupt your calculations. If you need to maintain support for interruption points, I'd really recommend going without Asio's thread_pool which really doesn't add value at that point:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iomanip>
#include <iostream>
using boost::chrono::seconds;
using boost::this_thread::sleep_for; // interruptable

static auto const now   = std::chrono::steady_clock::now;
static auto const start = now();
auto trace(auto const&... msg) {
    using namespace std::chrono_literals;
    static constexpr std::hash<std::thread::id> hash{};
    static std::mutex                           mx;

    std::lock_guard lk(mx);
    std::cout << std::fixed << std::setprecision(3) << std::setfill(' ') << std::setw(8)
              << (now() - start) / 1.0s << "ms " //
              << std::hex << std::showbase << std::setw(2) << std::setfill('0')
              << hash(std::this_thread::get_id()) % 256 << std::dec << " ";
    (std::cout << ... << msg) << std::endl;
}

void long_calculation() {
    try {
        trace("start long_calculation");
        sleep_for(seconds(5));
        trace("complete long_calculation");
    } catch (boost::thread_interrupted) {
        trace("interrupted long_calculation");
    }
}

struct my_pool : boost::asio::io_context {
    my_pool(unsigned nthreads) {
        while (nthreads--)
            threads_.create_thread([this] { worker(); });
    }

    void join() {
        work_.reset();
        threads_.join_all();
    }

    void interrupt() {
        threads_.interrupt_all();
    }

    ~my_pool() {
        join();
    }

  private:
    void worker() {
        // http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
        for (;;) {
            try {
                this->run();
                break; // exited normally
            } catch (std::exception const& e) {
                trace("pool_worker exception: ", e.what());
            } catch (...) {
                trace("pool_worker exception: unhandled");
            }
        }
    }

    boost::thread_group                             threads_;
    boost::asio::executor_work_guard<executor_type> work_{get_executor()};
};

int main() {
    trace("Start");
    my_pool tp(4);

    for (int i = 0; i < 5;   i)
        post(tp, long_calculation);

    sleep_for(seconds(2));

    trace("Interrupt");
    tp.interrupt();

    trace("Waiting...");
    tp.join();

    trace("Bye");
}

Prints e.g.

   0.000ms 0x18 Start
   0.000ms 0x88 start long_calculation
   0.000ms 0x88 start long_calculation
   0.000ms 0xbe start long_calculation
   0.000ms 0x68 start long_calculation
   2.000ms 0x18 Interrupt
   2.000ms 0x18 Waiting...
   2.001ms 0x68 interrupted long_calculation
   2.001ms 0x68 start long_calculation
   2.001ms 0x88 interrupted long_calculation
   2.001ms 0xbe interrupted long_calculation
   2.001ms 0x88 interrupted long_calculation
   7.001ms 0x68 complete long_calculation
   7.001ms 0x18 Bye

Note how controlled shutdown works correctly even when uninterrupted tasks are pending.

(You can do this trivially without thread_group: http://coliru.stacked-crooked.com/a/261986d2975e1da4)

  • Related