Home > Back-end >  Switching between threads with C 20 coroutines
Switching between threads with C 20 coroutines

Time:05-11

There is an example of switching to a different thread with C 20 coroutines:

#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <thread>

auto switch_to_new_thread(std::jthread& out) {
    struct awaitable {
        std::jthread* p_out;
        bool await_ready() { return false; }
        void await_suspend(std::coroutine_handle<> h) {
            std::jthread& out = *p_out;
            if (out.joinable())
                throw std::runtime_error("Output jthread parameter not empty");
            out = std::jthread([h] { h.resume(); });
            // Potential undefined behavior: accessing potentially destroyed *this
            // std::cout << "New thread ID: " << p_out->get_id() << '\n';
            std::cout << "New thread ID: " << out.get_id() << '\n'; // this is OK
        }
        void await_resume() {}
    };
    return awaitable{ &out };
}

struct task {
    struct promise_type {
        task get_return_object() { return {}; }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() {}
    };
};

task resuming_on_new_thread(std::jthread& out) {
    std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << '\n';
    co_await switch_to_new_thread(out);
    // awaiter destroyed here
    std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
}

int main() {
    std::jthread out;
    resuming_on_new_thread(out);
}

the coroutine starts on the main thread and switches to a newly created thread.

What is the right way to make it switch back to the main thread?

So the code below

task resuming_on_new_thread(std::jthread& out) {
    std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << '\n';
    co_await switch_to_new_thread(out);
    // awaiter destroyed here
    std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
    co_await switch_to_main_thread();
    std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
}

would print

Coroutine started on thread: 139972277602112
New thread ID: 139972267284224
Coroutine resumed on thread: 139972267284224
Coroutine resumed on thread: 139972277602112

CodePudding user response:

switch_to_new_thread actually creates a new thread, it doesn't switch to a new thread. It then injects code that resumes the coroutine in it.

To run code on a specific thread, you have to actually run code on that thread. To resume a coroutine, that specific thread has to run code that resume that coroutine.

Here you did it by creating a brand-new thread and injecting code that does a resume.


A traditional way to do stuff like this is with a message pump. The thread you want to participate has a message pump and a queue of events. It runs the events in order.

To make a specific thread run some code, you send a message to that queue of events with the instructions (maybe the actual code, maybe just a value) in it.

To this end, such an "event consuming thread" is more than a std::jthread or std::thread; it is a thread safe queue and some in the thread popping tasks off it an executing them.

In such a system, you'd move between threads by sending messages.

So you'd have a queue:

template<class T>
struct threadsafe_queue {
  [[nodiscard]] std::optional<T> pop();
  [[nodiscard]] std::deque<T> pop_many(std::optional<std::size_t> count = {}); // defaults to all
  [[nodiscard]] bool push(T);
  template<class C, class D>
  [[nodiscard]] std::optional<T> wait_until_pop(std::chrono::time_point<C,D>);
  void abort();
  [[nodiscard]] bool is_aborted() const { return aborted; }
private:
  mutable std::mutex m;
  std::condition_variable cv;
  std::deque<T> queue;
  bool aborted = false;
  auto lock() const { return std::unique_lock(m); }
};

of tasks:

using task_queue = threadsafe_queue<std::function<void()>>;

a basic message pump is:

void message_pump( task_queue& q ) {
  while (auto f = q.pop()) {
    if (*f) (*f)();
  }
}

you'd then make two task_queues, one for your main thread and one for your worker thread. To switch to worker instead of creating a new jthread you'd:

workerq.push( [&]{ h.resume(); } );

and similarly to switch to the main

mainq.push( [&]{ h.resume(); } );

there are lots of details I have skipped over, but this is a sketch of how you'd do it.

CodePudding user response:

One way to make this happen is to have a thread-safe queue that the coroutine places itself in to tell the main thread "please resume me now". At that point, you're basically building a thread pool. The main function has to watch that queue (poll it at regular intervals or wait for something to be placed in it), then fetch and execute an element (work item) once one is available.

  • Related