Home > Net >  In Asio, all completion handlers of async_* functions called in the same thread will run sequentiall
In Asio, all completion handlers of async_* functions called in the same thread will run sequentiall

Time:12-28

I am new to Asio, so I am a little confused about the control flow of asynchronous operations. Let's see this server:

class session
{
    ...

    sendMsg()
    {
        bool idle = msgQueue.empty();
        msgQueue.push(msg);
        if (idle)
            send();
    }

    send()
    {
        async_write(write_handler);
    }

    write_handler()
    {
        msgQueue.pop()
        if (!msgQueue.empty())
            send();
    }

    recvMsg()
    {
        async_read(read_handler);
    }

    read_handler()
    {
        ...
        recvMsg();
    }

    ...
};

class server
{
    ...

    start()
    {
        async_accept(accept_handler);
    }

    accept_handler()
    {
        auto client = make_shared<session>(move(socket));
        client->recvMsg();
        ...
        start();
    }

    ...
};

int main()
{
    io_context;
    server srv(io_context, 22222);
    srv.start();
    io_context.run();
    return 0;
}

In this case, all completion handlers accept_handler, read_handler, write_handler will be called in the thread calling io_context.run(), which is the main thread. If they will run in the same thread, it means they will run sequentially, not concurrently, right? And further, the msgQueue will be accessed sequentially, so there is no need a mutex lock for this queue, right?

I think async_* functions tell the operating system to do some work, and these work will run simultaneously in some other threads with their own buffers. Even if these work are done at the same time(say, at a point, a new connection requirement arrives, a new message from a exist client arrives, sending a message to a exist client is done), the completion handlers(accept_handler, read_handler, write_handler) will still be called sequentially. They will not run concurrently, am I correct?

Thank you so much for your help.

CodePudding user response:

Yes. There's only one thread running the io_context, so all completion handlers will run on that one thread. Indeed this implies a strand (the implicit strand) of execution, namely, all handlers will execute sequentially.

See: https://www.boost.org/doc/libs/1_81_0/doc/html/boost_asio/overview/core/threads.html

and these work will run simultaneously in some other threads with their own buffers

They will run asynchronously, but not usually on another thread. There could be internal threads, or kernel threads, but also just hardware. Their "own" buffer is true, but dangerously worded, because in Asio the operations never own the buffer - you have to make sure it stays valid until the operation completes.

Note:

  • if there can be multiple threads running (or polling) the io service, you need to make sure access to IO objects is synchronized. In Asio this can be achieved with strand executors
  • not all IO operations may be active in overlapping fashion. You seem to be aware of this given the msgQueue in your pseudo code

Bonus

For bonus, let me convert your code into non-pseudo code showing an explicit strand per connection to be future proof:

Live On Coliru

#include <boost/asio.hpp>
#include <deque>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
using namespace std::placeholders;

class session : public std::enable_shared_from_this<session> {
  public:
    session(tcp::socket s) : s(std::move(s)) {}

    void start() {
        post(s.get_executor(), [self = shared_from_this()] { self->recvMsg(); });
    }
    void sendMsg(std::string msg) {
        post(s.get_executor(), [=, self = shared_from_this()] { self->do_sendMsg(msg); });
    }

  private:
    //... all private members on strand
    void do_sendMsg(std::string msg) {
        bool was_idle = msgQueue.empty();
        msgQueue.push_back(std::move(msg));
        if (was_idle)
            do_writeloop();
    }

    void do_writeloop() {
        if (!msgQueue.empty())
            async_write(s, asio::buffer(msgQueue.front()),
                        std::bind(&session::write_handler, shared_from_this(), _1, _2));
    }

    void write_handler(error_code ec, size_t) {
        if (!ec) {
            msgQueue.pop_front();
            do_writeloop();
        }
    }

    void recvMsg() {
        //async_read(s, asio::dynamic_buffer(incoming),
                   //std::bind(&session::read_handler, shared_from_this(), _1, _2));

        async_read_until(s, asio::dynamic_buffer(incoming), "\n",
                         std::bind(&session::read_handler, shared_from_this(), _1, _2));
    }

    void read_handler(error_code ec, size_t n) { 
        if (!ec) {
            auto msg = incoming.substr(0, n);
            incoming.erase(0, n);
            recvMsg();

            sendMsg("starting job for "   msg);
            sendMsg("finishing job for "   msg);

            sendMsg(" -- some other message --\n");
        }
    }

    tcp::socket s;
    std::string incoming;
    std::deque<std::string> msgQueue;
};

class server {
  public:
    server(auto ex, uint16_t port) : acc(ex, tcp::v4()) {
        acc.set_option(tcp::acceptor::reuse_address(true));
        acc.bind({{}, port});
        acc.listen();
    }

    void accept_loop() {
        acc.async_accept(make_strand(acc.get_executor()),
                         std::bind(&server::accept_handler, this, _1, _2));
    }

    void accept_handler(error_code ec, tcp::socket s) {
        if (!ec ){
            std::make_shared<session>(std::move(s))->start();
            accept_loop();
        }
    }

  private:
    tcp::acceptor acc;
};

int main() {
    boost::asio::io_context ioc;
    server srv(ioc.get_executor(), 22222);
    srv.accept_loop();
    ioc.run();
}

With a sample client

for a in foo bar qux; do (sleep 1.$RANDOM; echo "command $a")|nc 127.0.0.1 22222 -w2; done

Prints

starting job for command foo
finishing job for command foo
 -- some other message --
starting job for command bar
finishing job for command bar
 -- some other message --
starting job for command qux
finishing job for command qux
 -- some other message --
  • Related