I consider myself reasonably experienced with asio but can't figure out how to correctly perform async_read
and async_write
on a boost::asio::ssl::stream<boost::asio::ip::tcp::socket>
. I have created the following minimal example https://github.com/ladnir/asio-ssl-stackoverflow which I explain next.
My goal is quite simple, perform full duplex async read and write on a ssl_stream
. The documentation is clear that you need to perform the async_read
and async_write
calls from within a strand which I do.
My setup is to have an io_context
with multiple threads. Data is continuously sent and received on the socket. Sending and receiving data each have their own callback chain. In the completion handler for each I simply schedule another send or receive operation. All of this is performed within a strand. Below is the main bit of code
std::function<void(bool, ssl::stream<tcp::socket>&, io_context::strand&, u64)> f =
[&](bool send, ssl::stream<tcp::socket>& sock, io_context::strand& strand, u64 t) {
strand.dispatch([&, send, t]() {
std::vector<u8> buffer(10000);
auto bb = mutable_buffer(buffer.data(), buffer.size());
auto callback = [&, send, t, buffer = std::move(buffer), moveOnly = std::unique_ptr<int>{}](boost::system::error_code error, std::size_t n) mutable {
if (error) {
std::cout << error.message() << std::endl;
std::terminate();
}
// perform another operation or complete.
if (t)
f(send, sock, strand, t - 1);
else
--spinLock;
};
if (send)
async_write(sock, bb, std::move(callback));
else
async_read(sock, bb, std::move(callback));
});
};
A send and receive callback chain is then started for the server and client sockets.
// launch our callback chains.
f(true, srvSocket, srvStrand, trials);
f(false, srvSocket, srvStrand, trials);
f(true, cliSocket, cliStrand, trials);
f(false, cliSocket, cliStrand, trials);
It seems that despite the use of the strand, something inside OpenSSL is not being performed in a thread-safe manner. When I run the code I sometimes get a decryption failure and sometimes it just crashes somewhere in OpenSSL.
If I use a tcp::socket
this code works fine. If I make the io_context
single-threaded then it works fine. I have tested this on ubuntu and windows.
It seems from related questions, e.g. this, that full duplex should work as long as you wrap it in a strand.
Does anyone see what I'm doing wrong? Maybe it is simply not safe to perform an async_read
/async_write
when the other is already scheduled?
Notes on Sehe's solution: After comparing my code to Sehe's solution I determined that my code has one major bug. The execution context of the ssl::stream<tcp::socket>
's is the multi-thread io_context
. As such, there is no guarantee that the ssl::stream
perform work on my strand. Moreover, how could it? The ssl::stream
is unaware of my strand and only happens to be executed on it during the initial call. When the ssl::stream
gets called back from the underlying socket, it will/might schedule more read/write operations but it wont be on my chosen strand.
The only thing guaranteed is that the ssl::stream
is on the execution context that it was constructed with. A simple fix is to construct the stream with a strand as it's execution context.
I updated the github repo above to contain my solution.
Thanks, Sehe for the insight.
CodePudding user response:
It's been one of those days. I felt completely silly that I couldn't find the issue with your code. I kept running into variations of use-after-free and stack thrashing even with simplified code (way simplified).
The obvious culprit looked to me the use of vector member data after moving the vector. I have not been able to prove that (manually asserting the values for data()
and size()
are stable across move didn't fail), but see below.
So... I went to square one and just wrote the entire program the way I'd write it. That is to say,
- using executors instead of service reference explicit handler binding
asio::strand<>
instead ofio_context::strand
(which is deprecated)asio::thread_pool
instead ofio_context
plus raw threads (and a work guard)
I don't think any of these really mattered, but it does simplify the code.
While doing the mental gymnastics to know whether function<>
can hold a copy of itself, I decided it would need to have shared ownership.
auto make_loop = [&](auto& stream, bool sending) {
auto shared_loop = std::make_shared<std::function<void()>>();
auto port = stream.lowest_layer().remote_endpoint().port();
auto mode = [=](auto t) {
return std::string(sending ? "write" : "read")
" t=" std::to_string(t) " port:" std::to_string(port);
};
auto s = std::make_shared<sentinel>("shared_loop " mode(trials));
*shared_loop = [wl = /*std::weak_ptr*/(shared_loop), mode, sending, &stream,
t = trials, s_ = s]() mutable {
if (!t--) {
return;
}
auto data = std::make_shared<std::vector<uint8_t>>(10'000);
auto buf = asio::buffer(*data);
auto handler = [sl = my_lock(wl), mode, data_ = std::move(data), t] //
(error_code ec, size_t n) {
trace("Handler ", mode(t), " n=", n, " ", ec.message());
if (t && sl)
(*sl)();
};
trace("Initiating ", mode(t));
if (sending)
async_write(stream, buf, handler);
else
async_read(stream, buf, handler);
};
return *shared_loop;
};
post(srv.get_executor(), make_loop(srv, true));
post(srv.get_executor(), make_loop(srv, false));
post(cli.get_executor(), make_loop(cli, true));
post(cli.get_executor(), make_loop(cli, false));
The good news is, that this program runs without problems (under ASAN UBSAN):
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <iostream>
namespace asio = boost::asio;
namespace ssl = asio::ssl;
using asio::ip::tcp;
using ssl::context;
using stream = ssl::stream<tcp::socket>;
using boost::system::error_code;
static constexpr auto trials = 10;
static inline void trace(auto const&... args) {
static std::mutex mx;
std::lock_guard lk(mx);
(std::cout << ... << args) << std::endl;
}
struct sentinel {
std::string msg;
~sentinel() {
trace("~sentinel: ", msg);
} // trace when shared loop is freed
};
template <typename... Ts> static inline auto my_lock(std::shared_ptr<Ts...> sp) { return sp; }
template <typename... Ts> static inline auto my_lock(std::weak_ptr<Ts...> wp) { return wp.lock(); }
int main() try {
asio::thread_pool ioc;
ssl::context sctx{ssl::context::tlsv13_server};
ssl::context cctx{ssl::context::tlsv13_client};
sctx.set_default_verify_paths();
sctx.set_password_callback([](auto&&...) { return "test"; });
sctx.use_certificate_file("server.pem", context::pem);
sctx.use_private_key_file("server.pem", context::pem);
tcp::acceptor acc(make_strand(ioc), tcp::v4());
trace("before set_option");
acc.set_option(tcp::acceptor::reuse_address(true));
acc.bind({{}, 7878});
acc.listen();
trace("listening");
stream srv(acc.get_executor(), sctx);
auto fut = std::async([&] {
acc.accept(srv.lowest_layer());
trace("Handshaking ", srv.lowest_layer().remote_endpoint());
srv.handshake(stream::server);
});
stream cli(make_strand(ioc), cctx);
cli.lowest_layer().connect({{}, 7878});
cli.handshake(stream::client);
fut.get();
trace("connected");
auto make_loop = [&](auto& stream, bool sending) {
auto shared_loop = std::make_shared<std::function<void()>>();
auto port = stream.lowest_layer().remote_endpoint().port();
auto mode = [=](auto t) {
return std::string(sending ? "write" : "read")
" t=" std::to_string(t) " port:" std::to_string(port);
};
auto s = std::make_shared<sentinel>("shared_loop " mode(trials));
*shared_loop = [wl = /*std::weak_ptr*/(shared_loop), mode, sending, &stream,
t = trials, s_ = s]() mutable {
if (!t--) {
return;
}
auto data = std::make_shared<std::vector<uint8_t>>(10'000);
auto buf = asio::buffer(*data);
auto handler = [sl = my_lock(wl), mode, data_ = std::move(data), t] //
(error_code ec, size_t n) {
trace("Handler ", mode(t), " n=", n, " ", ec.message());
if (t && sl)
(*sl)();
};
trace("Initiating ", mode(t));
if (sending)
async_write(stream, buf, handler);
else
async_read(stream, buf, handler);
};
return *shared_loop;
};
post(srv.get_executor(), make_loop(srv, true));
post(srv.get_executor(), make_loop(srv, false));
post(cli.get_executor(), make_loop(cli, true));
post(cli.get_executor(), make_loop(cli, false));
trace("waiting");
ioc.join();
sentinel atdone{"done"};
} catch(boost::system::system_error const& se) {
std::cout << se.what() << " from " << se.code().location() << "\n";
}
Running it locally for demonstration purposes: https://i.imgur.com/q5e7ENI.mp4
The Troubles
There are two notable problems:
as you can see the shared loop function is leaked. This is because it circularly holds on to itself. If we attempt to break the cycle by using weak_ptr (e.g. as commented) the async chain cannot proceed beyond the first iteration. I don't think there is a solution here, except to move ownership out of the handler completely.
If in my example I replace the buffer with the
std::vector
as you had it:auto data = std::vector<uint8_t>(10'000); auto buf = asio::buffer(data);
(no further changes required), ASAN will object about heap-use-after-free regardless of how stable the data address ought to be... I would not bet on this