Home > Blockchain >  Boost asio async_write from 2 different threads
Boost asio async_write from 2 different threads

Time:08-31

I would like to have a TCP server receiving data in one thread and being able to send data from another one.

My code does echo what the server receives, so it's possible to write from the same thread but fails to send messages from the main.

I believe it has something to do with the socket not accessible but I need a push to get back on track, thanks!

#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <boost/asio.hpp>

using boost::asio::ip::tcp;

class session
  : public std::enable_shared_from_this<session>
{
public:
  session(tcp::socket socket)
    : socket_(std::move(socket))
  {
  }

  void start()
  {
    do_read();
  }

  void write(char *msg, std::size_t length)
  {
    do_write(msg, length);
  }

private:
  void do_read()
  {
    auto self(shared_from_this());
    socket_.async_read_some(boost::asio::buffer(data_, max_length),
        [this, self](boost::system::error_code ec, std::size_t length)
        {
          if (!ec)
          {
            do_write(data_, length);
          }
        });
  }

  void do_write(char *msg, std::size_t length)
  {
    auto self(shared_from_this());

    boost::asio::async_write(socket_, boost::asio::buffer(msg, length),
        [this, self](boost::system::error_code ec, std::size_t length)
        {
          if (!ec)
          {
            do_read();
          }
          else
          {
            std::cout << ec.message() << std::endl;
            std::cout << socket_.is_open() << std::endl;
          }
        });
  }

  tcp::socket socket_;
  enum { max_length = 1024 };
  char data_[max_length];
};

class server
{
public:
  server(boost::asio::io_service& io_service, short port)
    : acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
      socket_(io_service)
  {
    do_accept();
  }

  void send(char *msg, std::size_t length)
  {
    std::make_shared<session>(std::move(socket_))->write(msg, length);
  }

private:
  void do_accept()
  {
    acceptor_.async_accept(socket_,
        [this](boost::system::error_code ec)
        {
          if (!ec)
          {
            std::make_shared<session>(std::move(socket_))->start();
          }

          do_accept();
        });
  }

  tcp::acceptor acceptor_;
  tcp::socket socket_;
};

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 2)
    {
      std::cerr << "Usage: async_tcp_server <port>\n";
      return 1;
    }

    boost::asio::io_service io_service;

    server s(io_service, std::atoi(argv[1]));

    std::shared_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(io_service));
    std::thread t1([&io_service]() {io_service.run();});

    char msg[] = { 't', 'e', 's', 't' };

    while(1)
    { 
      sleep(5);
      s.send(msg, 4);
    }

    t1.join();
    std::cout << "t1 joined " << std::endl;
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  return 0;
} 

CodePudding user response:

First off you need some synchronization. I'd suggest to put each client on a strand:

void do_accept() {
  acceptor_.async_accept(make_strand(acceptor_.get_executor()),
                         [this](error_code ec, tcp::socket s) {
                           if (!ec) {
                             std::make_shared<session>(std::move(s))->start();
                           }

                           do_accept();
                         });
}

Next up, you want to modify the write() member function to post to the strand:

 void write(char const* msg, size_t length)
 {
    post(socket_.get_executor(), [this, =] {
        do_write(msg, length); });
 }

Note that this is not enough. async_write is a composed operation and you cannot overlap writes:

This operation is implemented in terms of zero or more calls to the stream's async_write_some function, and is known as a composed operation. The program must ensure that the stream performs no other write operations (such as async_write, the stream's async_write_some function, or any other composed operations that perform writes) until this operation completes.

The common approach here is to create a queue:

  std::deque<std::string> outbox_;

  void do_write(std::string msg) {
    auto self(shared_from_this());

    outbox_.push_back(std::move(msg));
    if (outbox_.size() == 1)
      do_write_loop();
  }

The queue provides reference stability (important to keep the buffers valid). The write_loop is only started at the first element queued, and keeps writing until the queue is empty:

  void do_write_loop() {
    if (outbox_.empty())
      return;
    
    async_write(socket_, boost::asio::buffer(outbox_.front()),
                [this, self](error_code ec, size_t length) {
                  if (!ec) {
                    outbox_.pop_front();
                    do_write_loop();
                  } else {
                    std::cout << ec.message() << std::endl;
                    std::cout << socket_.is_open() << std::endl;
                  }
                });
  }

Now, your server::write was a bit funny. It created a new session, moving again from the socket_ variable (which I removed in the process of adding the strand earlier). That would never have done anything useful. I'm guessing that you might have expected it to send to all connected sessions. In that case:

  void send(char const* msg, size_t length) const
  {
    for (Handle const& handle: _sessions)
      if (auto const sess = handle.lock())
        sess->write(msg, length);
  }

Of course, you need something to contain the sessions:

 private:
  using Handle = std::weak_ptr<session>;
  std::list<Handle> _sessions;

And to store the sessions there in do_accept:

      auto sess = std::make_shared<session>(std::move(s));
      _sessions.emplace_back(sess);
      sess->start();

Optionally, you might want to clean up disconnected sessions:

      _sessions.remove_if(std::mem_fn(&Handle::expired));

Now, we're not done, because just like session, server needs a strand to make all these operations thread-safe (with respect to sessions_):

Full Listing

Live On Coliru

#include <boost/asio.hpp>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <list>
#include <memory>
#include <utility>

using boost::asio::ip::tcp;
using boost::system::error_code;
using std::this_thread::sleep_for;
using namespace std::chrono_literals;

class session : public std::enable_shared_from_this<session>
{
 public:
  session(tcp::socket socket)
    : socket_(std::move(socket))
  {
  }

  void start()
  {
    do_read();
  }

  void write(char const* msg, size_t length)
  {
    auto self(shared_from_this());
    post(socket_.get_executor(),
      [this, msg = std::string(msg, length)]() mutable { do_write(std::move(msg)); });
  }

 private:
  void do_read()
  {
    auto self(shared_from_this());
    socket_.async_read_some(boost::asio::buffer(data_), [this, self](error_code ec, size_t length) {
      if (!ec)
      {
        write(data_.data(), length);
        do_read();
      }
    });
  }

  std::deque<std::string> outbox_;

  void do_write(std::string msg)
  {
    auto self(shared_from_this());

    outbox_.push_back(std::move(msg));
    if (outbox_.size() == 1)
      do_write_loop();
  }

  void do_write_loop()
  {
    if (outbox_.empty())
      return;

    auto self(shared_from_this());

    async_write(
      socket_, boost::asio::buffer(outbox_.front()), [this, self](error_code ec, size_t length) {
        if (!ec)
        {
          outbox_.pop_front();
          do_write_loop();
        }
        else
        {
          std::cout << ec.message() << std::endl;
          std::cout << socket_.is_open() << std::endl;
        }
      });
  }

  tcp::socket socket_;
  std::array<char, 1024> data_;
};

class server
{
 public:
  server(boost::asio::io_service& io_service, short port)
    : acceptor_(make_strand(io_service), tcp::endpoint(tcp::v4(), port))
  {
    acceptor_.listen();
    do_accept();
  }

  void send(char const* msg, size_t length)
  {
    post(acceptor_.get_executor(), [this, msg = std::string(msg, length)]() mutable {
      for (Handle const& handle : _sessions)
        if (auto const sess = handle.lock())
          sess->write(msg.data(), msg.size());
    });
  }

 private:
  using Handle = std::weak_ptr<session>;
  std::list<Handle> _sessions;

  void do_accept()
  {
    acceptor_.async_accept(
      make_strand(acceptor_.get_executor()), [this](error_code ec, tcp::socket s) {
        if (!ec)
        {
          auto sess = std::make_shared<session>(std::move(s));
          _sessions.emplace_back(sess);
          sess->start();

          _sessions.remove_if(std::mem_fn(&Handle::expired));
        }

        do_accept();
      });
  }

  tcp::acceptor acceptor_;
};

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 2)
    {
      std::cerr << "Usage: async_tcp_server <port>" << std::endl;
      return 1;
    }

    boost::asio::io_service io_service;

    server s(io_service, std::atoi(argv[1]));

    auto work = make_work_guard(io_service);
    std::thread t1([&io_service]() { io_service.run(); });

    char constexpr msg[] = {'t', 'e', 's', 't'};

    while (true)
    {
      sleep_for(5s);
      s.send(msg, 4);
    }

    t1.join();
    std::cout << "t1 joined " << std::endl;
  }
  catch (std::exception const& e)
  {
    std::cerr << "Exception: " << e.what() << std::endl;
  }
}
  • Related