Home > Back-end >  boost asio ssl async_read async_write within a strand is failing
boost asio ssl async_read async_write within a strand is failing

Time:10-25

I consider myself reasonably experienced with asio but can't figure out how to correctly perform async_read and async_write on a boost::asio::ssl::stream<boost::asio::ip::tcp::socket>. I have created the following minimal example https://github.com/ladnir/asio-ssl-stackoverflow which I explain next.

My goal is quite simple, perform full duplex async read and write on a ssl_stream. The documentation is clear that you need to perform the async_read and async_write calls from within a strand which I do.

My setup is to have an io_context with multiple threads. Data is continuously sent and received on the socket. Sending and receiving data each have their own callback chain. In the completion handler for each I simply schedule another send or receive operation. All of this is performed within a strand. Below is the main bit of code

 std::function<void(bool, ssl::stream<tcp::socket>&, io_context::strand&, u64)> f =
        [&](bool send, ssl::stream<tcp::socket>& sock, io_context::strand& strand, u64 t) {
        
        strand.dispatch([&, send, t]() {
            std::vector<u8> buffer(10000);
            auto bb = mutable_buffer(buffer.data(), buffer.size());
            auto callback = [&, send, t, buffer = std::move(buffer), moveOnly = std::unique_ptr<int>{}](boost::system::error_code error, std::size_t n) mutable {
                    if (error) {
                        std::cout << error.message() << std::endl;
                        std::terminate();
                    }

                    // perform another operation or complete.
                    if (t)
                        f(send, sock, strand, t - 1);
                    else
                        --spinLock;
                };

            if (send)
                async_write(sock, bb, std::move(callback));
            else
                async_read(sock, bb, std::move(callback));

        });
};

A send and receive callback chain is then started for the server and client sockets.

    // launch our callback chains.
    f(true, srvSocket, srvStrand, trials);
    f(false, srvSocket, srvStrand, trials);
    f(true, cliSocket, cliStrand, trials);
    f(false, cliSocket, cliStrand, trials);

It seems that despite the use of the strand, something inside OpenSSL is not being performed in a thread-safe manner. When I run the code I sometimes get a decryption failure and sometimes it just crashes somewhere in OpenSSL.

If I use a tcp::socket this code works fine. If I make the io_context single-threaded then it works fine. I have tested this on ubuntu and windows.

It seems from related questions, e.g. this, that full duplex should work as long as you wrap it in a strand.

Does anyone see what I'm doing wrong? Maybe it is simply not safe to perform an async_read/async_write when the other is already scheduled?


Notes on Sehe's solution: After comparing my code to Sehe's solution I determined that my code has one major bug. The execution context of the ssl::stream<tcp::socket>'s is the multi-thread io_context. As such, there is no guarantee that the ssl::stream perform work on my strand. Moreover, how could it? The ssl::stream is unaware of my strand and only happens to be executed on it during the initial call. When the ssl::stream gets called back from the underlying socket, it will/might schedule more read/write operations but it wont be on my chosen strand.

The only thing guaranteed is that the ssl::stream is on the execution context that it was constructed with. A simple fix is to construct the stream with a strand as it's execution context.

I updated the github repo above to contain my solution.

Thanks, Sehe for the insight.

CodePudding user response:

It's been one of those days. I felt completely silly that I couldn't find the issue with your code. I kept running into variations of use-after-free and stack thrashing even with simplified code (way simplified).

The obvious culprit looked to me the use of vector member data after moving the vector. I have not been able to prove that (manually asserting the values for data() and size() are stable across move didn't fail), but see below.

So... I went to square one and just wrote the entire program the way I'd write it. That is to say,

  • using executors instead of service reference explicit handler binding
  • asio::strand<> instead of io_context::strand (which is deprecated)
  • asio::thread_pool instead of io_context plus raw threads (and a work guard)

I don't think any of these really mattered, but it does simplify the code.

While doing the mental gymnastics to know whether function<> can hold a copy of itself, I decided it would need to have shared ownership.

auto make_loop = [&](auto& stream, bool sending) {
    auto shared_loop = std::make_shared<std::function<void()>>();
    auto port        = stream.lowest_layer().remote_endpoint().port();
    auto mode        = [=](auto t) {
        return std::string(sending ? "write" : "read")  
            " t="   std::to_string(t)   " port:"   std::to_string(port);
    };
    auto s = std::make_shared<sentinel>("shared_loop "   mode(trials));

    *shared_loop = [wl = /*std::weak_ptr*/(shared_loop), mode, sending, &stream,
                    t = trials, s_ = s]() mutable {
        if (!t--) {
            return;
        }

        auto data = std::make_shared<std::vector<uint8_t>>(10'000);
        auto buf  = asio::buffer(*data);

        auto handler = [sl = my_lock(wl), mode, data_ = std::move(data), t] //
            (error_code ec, size_t n) {
                trace("Handler ", mode(t), " n=", n, " ", ec.message());
                if (t && sl)
                    (*sl)();
            };

        trace("Initiating ", mode(t));

        if (sending)
            async_write(stream, buf, handler);
        else
            async_read(stream, buf, handler);
    };
    return *shared_loop;
};

post(srv.get_executor(), make_loop(srv, true));
post(srv.get_executor(), make_loop(srv, false));
post(cli.get_executor(), make_loop(cli, true));
post(cli.get_executor(), make_loop(cli, false));

The good news is, that this program runs without problems (under ASAN UBSAN):

(almost) Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <iostream>
namespace asio = boost::asio;
namespace ssl  = asio::ssl;
using asio::ip::tcp;
using ssl::context;
using stream = ssl::stream<tcp::socket>;
using boost::system::error_code;

static constexpr auto trials = 10;

static inline void trace(auto const&... args) {
    static std::mutex mx;
    std::lock_guard   lk(mx);
    (std::cout << ... << args) << std::endl;
}

struct sentinel {
    std::string msg;
    ~sentinel() {
        trace("~sentinel: ", msg);
    } // trace when shared loop is freed
};

template <typename... Ts> static inline auto my_lock(std::shared_ptr<Ts...> sp) { return sp; }
template <typename... Ts> static inline auto my_lock(std::weak_ptr<Ts...> wp) { return wp.lock(); }

int main() try {
    asio::thread_pool ioc;
    ssl::context      sctx{ssl::context::tlsv13_server};
    ssl::context      cctx{ssl::context::tlsv13_client};

    sctx.set_default_verify_paths();
    sctx.set_password_callback([](auto&&...) { return "test"; });
    sctx.use_certificate_file("server.pem", context::pem);
    sctx.use_private_key_file("server.pem", context::pem);

    tcp::acceptor acc(make_strand(ioc), tcp::v4());
    trace("before set_option");
    acc.set_option(tcp::acceptor::reuse_address(true));
    acc.bind({{}, 7878});
    acc.listen();
    trace("listening");

    stream srv(acc.get_executor(), sctx);
    auto   fut = std::async([&] {
        acc.accept(srv.lowest_layer());
        trace("Handshaking ", srv.lowest_layer().remote_endpoint());

        srv.handshake(stream::server);
    });

    stream cli(make_strand(ioc), cctx);
    cli.lowest_layer().connect({{}, 7878});
    cli.handshake(stream::client);
    fut.get();
    trace("connected");

    auto make_loop = [&](auto& stream, bool sending) {
        auto shared_loop = std::make_shared<std::function<void()>>();
        auto port        = stream.lowest_layer().remote_endpoint().port();
        auto mode        = [=](auto t) {
            return std::string(sending ? "write" : "read")  
                " t="   std::to_string(t)   " port:"   std::to_string(port);
        };
        auto s = std::make_shared<sentinel>("shared_loop "   mode(trials));

        *shared_loop = [wl = /*std::weak_ptr*/(shared_loop), mode, sending, &stream,
                        t = trials, s_ = s]() mutable {
            if (!t--) {
                return;
            }

            auto data = std::make_shared<std::vector<uint8_t>>(10'000);
            auto buf  = asio::buffer(*data);

            auto handler = [sl = my_lock(wl), mode, data_ = std::move(data), t] //
                (error_code ec, size_t n) {
                    trace("Handler ", mode(t), " n=", n, " ", ec.message());
                    if (t && sl)
                        (*sl)();
                };

            trace("Initiating ", mode(t));

            if (sending)
                async_write(stream, buf, handler);
            else
                async_read(stream, buf, handler);
        };
        return *shared_loop;
    };

    post(srv.get_executor(), make_loop(srv, true));
    post(srv.get_executor(), make_loop(srv, false));
    post(cli.get_executor(), make_loop(cli, true));
    post(cli.get_executor(), make_loop(cli, false));

    trace("waiting");
    ioc.join();
    sentinel atdone{"done"};
} catch(boost::system::system_error const& se) {
    std::cout << se.what() << " from " << se.code().location() << "\n";
}

Running it locally for demonstration purposes: https://i.imgur.com/q5e7ENI.mp4

The Troubles

There are two notable problems:

  • as you can see the shared loop function is leaked. This is because it circularly holds on to itself. If we attempt to break the cycle by using weak_ptr (e.g. as commented) the async chain cannot proceed beyond the first iteration. I don't think there is a solution here, except to move ownership out of the handler completely.

  • If in my example I replace the buffer with the std::vector as you had it:

         auto data = std::vector<uint8_t>(10'000);
         auto buf  = asio::buffer(data);
    

    (no further changes required), ASAN will object about heap-use-after-free regardless of how stable the data address ought to be... I would not bet on this

  • Related