I'm trying to understand a bit better how async asio works.
I have the following code, where I'm calling async_read on a socket to read the next 10 bytes of data.
struct SocketReader {
void do_read_body()
{
asio::async_read(socket_,
asio::buffer(msg_, 10),
[this](asio::error_code ec, std::size_t length)
{
if (!ec)
{
//messages_to_work_on.emplace_back(msg_); // <-- I'm trying to send this msg_ instance to another io_context
do_read_body(); // call again
}
else
{
socket_.close();
}
});
}
std::vector<uint8_t> msg_;
asio::tcp::socket _socket;
}
These reads are done inside an io_context running in his own std::thread, where I'm collecting in a queue all messages read from the socket. So far so good.
I have also another "worker" class that just executes some work based on what is available in his queue:
struct Worker
{
asio::io_context& io_context_;
std::deque< std::vector<uint8_t> > queue;
Worker(asio::io_context& io_context)
: io_context_(io_context) {
asio::post(io_context_, [this]() {doWork();});
}
void doWork() {
if (!queue.empty())
{
// do some work with front()
queue.pop_front();
}
asio::post(io_context_, [this]() {doWork();});
}
};
That one is also executing in his own io_context, running in his own thread. So there is concurrency between the socket thread and the worker thread.
What is the correct way to post the data received from the socket, to the worker class ? I'm thinking I should be able to call from the socket completion handler, something like:
asio::post(worker_io_context, [this]() {worker.queue.push_back(msg_)});
That way, I'm at least sure that the worker queue is not used concurently. But I'm not sure if I'm allowed to post from one io_context to the other, and also if I won't create another race condition this way. I also don't really understand where the memory for my message should be located, especially "in between" the transfer from one io_context to the other. Is it required I pass the message by value (since this.msg_ can be modified before the post handler is executed) ?
Thanks!
CodePudding user response:
I should be able to call from the socket completion handler, something like:
asio::post(worker_io_context, [this]() {worker.queue.push_back(msg_)});
Sure.
That way, I'm at least sure that the worker queue is not used concurently. But I'm not sure if I'm allowed to post from one io_context to the other,
io_context
are not magic. They're basically cooperative task queues.
and also if I won't create another race condition this way.
I'm not going to sit here and pass a verdict without seeing your code (I might not want to read all of it anyways), but let me repeat: io_context
are not magic. You can reason about them the way you already know how to in terms of threads, tasks and resources.
I also don't really understand where the memory for my message should be located, especially "in between" the transfer from one io_context to the other. Is it required I pass the message by value (since this.msg_ can be modified before the post handler is executed) ?
Yes. Indeed. Something like
post(worker_io_context, [this, msg=std::move(msg_)]() {worker.queue.push_back(std::move(msg)); });
If moving isn't cheap, there's the option of having a refcounted smart pointer (like shared_ptr). Consider making it smartpointer<T const>
if you actually share ownership between threads.
Shower thought: maybe you can do without the "worker" queues. Since you're moving to reactor-style asynchrony (using Asio), you might focus on queueing the tasks, instead of the data. Reasons to not do that would include when you want to have priority queuing, load balancing/back pressure etc. [In principle all these can be implemented using custom executors, but I would stick to what I know before doing that.]
CodePudding user response:
Note: you do not need an extra io_context.
If you have to do long running computation you can write your own async_xyz function and use it like the other async functions. The idea is to post the work to a boost thread pool to do the computation there and then call the completion handler when the work is done. Here is an example using a boost thread pool to do the time consuming hashing of a password.
template <boost::asio::completion_token_for<void (std::string)> CompletionToken>
auto
async_hash (boost::asio::thread_pool &pool, boost::asio::io_context &io_context, std::string const &password, CompletionToken &&token)
{
return boost::asio::async_initiate<CompletionToken, void (std::string)> (
[&] (auto completion_handler, std::string const &passwordToHash) {
auto io_eq = boost::asio::prefer (io_context.get_executor (), boost::asio::execution::outstanding_work.tracked);
boost::asio::post (pool, [&, io_eq = std::move (io_eq), completion_handler = std::move (completion_handler), passwordToHash] () mutable {
auto hashedPw = pw_to_hash (passwordToHash);
boost::asio::post (io_eq, [hashedPw = std::move (hashedPw), completion_handler = std::move (completion_handler)] () mutable { completion_handler (hashedPw); });
});
},
token, password);
}
Then call it:
auto hashedPw = co_await async_hash (pool, io_context, createAccountObject.password, boost::asio::use_awaitable);
It seems like you do not use coroutine ts so I think you have to do something like this
async_hash (pool, io_context, createAccountObject.password, /*your lambda here*/);
CodePudding user response:
You don't need more than one io_context, even for multi-threading applications you can use only one io_context. You want to make your SocketReader a shared pointer, everytime when a read is happening, add a count to it. I'm assuming acceptor, socket creation and some_io_context.run() parts are done. I would do something like this:
class SocketReader
: public std::enable_shared_from_this<SocketReader> // we need this!
{
public:
// Constructor
SocketReader(io_context& ctx)
: ctx_(ctx)
{
}
// read
void do_read_body()
{
auto self(this->shared_from_this()); // we need this!
asio::async_read(socket_,
asio::buffer(msg_, 10),
[this](asio::error_code ec, std::size_t length)
{
if (!ec)
{
// later edit - @sehe is spot on -> is better to move it
asio::post(ctx_, [this, msg=std::move(msg_)]() { // do the long work });
do_read_body();
}
else
{
socket_.shutdown(asio::tcp::socket::shutdown_send, ec);
socket_.close();
}
});
}
private:
io_context& ctx_; // l.e. ctx_ must be a reference
vector<uint8_t> msg_;
asio::tcp::socket _socket;
};
...
// somewhere in the code
auto s_reader = std::make_shared<SocketReader>(some_io_context);
s_reader->do_read_body();