I am trying to work with Coroutines and multithreading together in C .
In many coroutine examples, they create a new thread in the await_suspend
of the co_await
operator for the promise type. I want to submit to a thread pool in this function.
Here I define a co_await
for future<int>
.
void await_suspend(std::coroutine_handle<> handle) {
this->wait();
handle.resume();
}
I want to change this code to submit a lambda/function pointer to a threadpool. Potentially I can use Alexander Krizhanovsky's ringbuffer to communicate with the threadpool to create a threadpool by myself or use boost's threadpool.
My problem is NOT the thread pool. My problem is that I don't know how to get reference to the threadpool in this co_await
operator.
How do I pass data from the outside environment where the operator is to this await_suspend
function? Here is an example of what I want to do:
void await_suspend(std::coroutine_handle<> handle) {
// how do I get "pool"? from within this function
auto res = pool.enqueue([](int x) {
this->wait();
handle.resume();
});
}
I am not an expert at C so I'm not sure how I would get access to pool
in this operator?
Here's the full code inspired by this GitHub gist A simple C coroutine example.
#include <future>
#include <iostream>
#include <coroutine>
#include <type_traits>
#include <list>
#include <thread>
using namespace std;
template <>
struct std::coroutine_traits<std::future<int>> {
struct promise_type : std::promise<int> {
future<int> get_return_object() { return this->get_future(); }
std::suspend_never initial_suspend() noexcept { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_value(int value) { this->set_value(value); }
void unhandled_exception() {
this->set_exception(std::current_exception());
}
};
};
template <>
struct std::coroutine_traits<std::future<int>, int> {
struct promise_type : std::promise<int> {
future<int> get_return_object() { return this->get_future(); }
std::suspend_never initial_suspend() noexcept { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_value(int value) { this->set_value(value); }
void unhandled_exception() {
this->set_exception(std::current_exception());
}
};
};
auto operator co_await(std::future<int> future) {
struct awaiter : std::future<int> {
bool await_ready() { return false; } // suspend always
void await_suspend(std::coroutine_handle<> handle) {
this->wait();
handle.resume();
}
int await_resume() { return this->get(); }
};
return awaiter{std::move(future)};
}
future<int> async_add(int a, int b)
{
auto fut = std::async([=]() {
int c = a b;
return c;
});
return fut;
}
future<int> async_fib(int n)
{
if (n <= 2)
co_return 1;
int a = 1;
int b = 1;
// iterate computing fib(n)
for (int i = 0; i < n - 2; i)
{
int c = co_await async_add(a, b);
a = b;
b = c;
}
co_return b;
}
future<int> test_async_fib()
{
for (int i = 1; i < 10; i)
{
int ret = co_await async_fib(i);
cout << "async_fib(" << i << ") returns " << ret << endl;
}
}
int runfib(int arg) {
auto fut = test_async_fib();
fut.wait();
return 0;
}
int run_thread() {
printf("Running thread");
return 0;
}
int main()
{
std::list<shared_ptr<std::thread>> threads = { };
for (int i = 0 ; i < 10; i ) {
printf("Creating thread\n");
std::shared_ptr<std::thread> thread = std::make_shared<std::thread>(runfib, 5);
threads.push_back(thread);
}
std::list<shared_ptr<std::thread>>::iterator it;
for (it = threads.begin(); it != threads.end(); it ) {
(*it).get()->join();
printf("Joining thread");
}
fflush(stdout);
return 0;
}
CodePudding user response:
You could have a thread pool, and let the coroutine promise to schedule work on it.
I have this example around that is not exactly simple but may do the work:
- Make your coroutines return a
task<T>
.
task<int> async_add(int a, int b) { ... }
- Let the
task
share astate
with itscoroutine_promise
. Thestate
:- is implemented as an executable, resuming the coroutine when executed, and
- holds the result of the operation (e.g. a
std::promise<T>
).
template <typename T>
class task<T>::state : public executable {
public:
void execute() noexcept override {
handle_.resume();
}
...
private:
handle_type handle_;
std::promise<T> result_;
};
- The
coroutine_promise
returns atask_scheduler
awaiter atinitial_suspend
:
template <typename T>
class task<T>::coroutine_promise {
public:
auto initial_suspend() {
return task_scheduler<task<T>>{};
}
- The
task_scheduler
awaiter schedules thestate
:
template <is_task task_t>
struct task_scheduler : public std::suspend_always {
void await_suspend(task_t::handle_type handle) const noexcept {
thread_pool::get_instance().schedule(handle.promise().get_state());
}
};
- Wrapping it all up: calls to a coroutine will make a
state
be scheduled on a thread, and, whenever a thread executes thatstate
, the coroutine will be resumed. The caller can then wait for the task's result.
auto c{ async_add(a,b) };
b = c.get_result();
CodePudding user response:
That example is from 2018, and was built for the Coroutine TS. So it's missing a lot of stuff from the actual C 20 feature. It also assumes the presence of a lot of things that didn't make it into C 20. The most notable of which being the idea that std::future
is an awaitable type, and that it has continuation support when coupled with std::async
.
It's not, and it doesn't. So there's not much you can really learn from this example.
co_await
is ultimately built on the ability to suspend execution of a function and schedule its resumption after some value has been successfully computed. The actual C 20 std::future
has exactly none of the machinery needed to do that. Nor does std::asyc
give it the ability to do so.
As such, neither is an appropriate tool for this task.
You need to build your own future type (possibly using std::promise/future
internally) which has a reference to your thread pool. When you co_await
on this future, it is that new future which passes off the coroutine_handle
to the thread pool, doing whatever is needed to ensure that this handle does not get executed until its current set of tasks is done.
Your pool or whatever needs to have a queue of tasks, such that it can insert new ones to be processed after all of the current one, and remove tasks once they've finished (as well as starting the next one). And those operations need to be properly synchronized. This queue needs to be accessible by both the future type and your coroutine's promise type.
When a coroutine ends, the promise needs to tell the queue that its current task is over and to move to the next one, or suspend the thread if there is no next one. And the promise's value needs to be forwarded to the next task. When a coroutine co_await
s on a future from your system, it needs to add that handle to the queue of tasks to be performed, possibly starting up the thread again.