Home > Software design >  boost::asio::io_service::post - how to move std::unique_ptr into a handler function
boost::asio::io_service::post - how to move std::unique_ptr into a handler function

Time:09-09

I'm trying to use boost::asio::io_service::post with std::unique_ptr in lambda-capture of the handler. But the post handler needs to be copy-constructed for later execution. Thus, as far as I understand the msgPtr value of type std::unique_ptr in the lambda-capture as well needs to be copied, even though I'm trying to move-construct it in the lambda-capture. But the copy constructor in the unique_ptr is deleted, which makes complete sense. Is there a way to move the std::unique_ptr to be executed later in the handler of the io_service::post?

The error I get when I try to compile it with GCC:

   error: use of deleted function ...
   BOOST_ASIO_COMPLETION_HANDLER_CHECK(CompletionHandler, handler) type_check;

The code sample goes below (updated):

#include <boost/asio.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <deque>
#include <memory>
#include <string>

// Defined abstract class IMsg and concrete MsgTest class, representing the
// messages to be sent
class IMsg {
 public:
  explicit IMsg() = default;
  virtual ~IMsg() = default;
  IMsg(const IMsg& msg) = delete;
  IMsg& operator=(const IMsg& msg) = delete;
  virtual const std::string& Serialize() = 0;
};

class MsgTest : public IMsg {
 public:
  explicit MsgTest(const std::string& msg) : m_testMsg(msg) {}
  MsgTest(const MsgTest&) = delete;
  MsgTest& operator=(const MsgTest&) = delete;
  const std::string& Serialize() override { return m_testMsg; }

 private:
  std::string m_testMsg;
};

using IMsgPtr = std::unique_ptr<IMsg>;

class MsgClient {
 public:
  MsgClient(boost::asio::io_service& ioService)
      : m_ioService(ioService), m_socket(ioService) {}

  void asyncSendMsg(IMsgPtr&& msgPtr) {
    m_ioService.post([this, msgPtr{std::move(msgPtr)}]() mutable {
      m_messagesOut.emplace_back(std::move(msgPtr));
      // Serialize() returns const reference to serialized member data.
      // Thus, the data will be valid until the async_write returns.
      const std::string& currentMsg = m_messagesOut.front()->Serialize();

      // todo: such usage of async_write is dangerous and not thread-safe!
      boost::asio::async_write(
          m_socket, boost::asio::buffer(currentMsg, currentMsg.length()),
          [this](boost::system::error_code ec, std::size_t length) {
            if (ec == 0) {
              m_messagesOut.pop_front();
            } else {
              // todo: error
            }
          });
    });
  }

 private:
  std::deque<IMsgPtr> m_messagesOut;
  boost::asio::io_service& m_ioService;
  boost::asio::ip::tcp::socket m_socket;
};

// Calling the asynSendMsg
int main() {
  boost::asio::io_service ioServiceWorker;
  MsgClient client(ioServiceWorker);
  //todo: client.connect()...
  client.asyncSendMsg(std::make_unique<MsgTest>("Hello World"));
  // code that runs io_service in a separate thread is present below
  // but omitted.
}

P.S. We are limited to using C 14 only and Boost.Asio version 1.57. Also, if anyone is interested in why I'm using post and call async_write in the handler (why not async_write directly): I have a multi-threaded app, and asyncSendMsg is usually called from a different thread than the MsgClient resides in. Thus, by adding the IMsgPtr into the array by post-ing, I can avoid using inter-thread synchronization using mutexes - as far as post's handler will always be executed in the same thread where the MsgClient is located.

I'd appreciate knowing how I can move construct std::unique_ptr<IMsg> in the post's handler. Also, if you have any ideas on improving the code quality here, this will be appreciated as well.

CodePudding user response:

Move-only handler support has improved over latest releases.

That means when using the post, dispatch and defer interfaces on executors. However, you're using the legacy io_service::post interface.

If your version of boost is recent enough you may be able to simply get away with switching m_io.post(task) into asio::post(m_io, task)

First Take

Live On Coliru

#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <deque>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;

// Defined abstract class IMsg and concrete MsgTest class, representing the
// messages to be sent
struct IMsg {
    virtual ~IMsg() = default;
    using buffers = std::vector<boost::asio::const_buffer>;
    virtual buffers as_buffers() const = 0;
};

struct MsgTest : public IMsg {
    boost::endian::big_uint32_t msg_type_ = 0x0001;
    std::string                 payload_;
    boost::endian::big_uint32_t payload_length_ = payload_.length();

    MsgTest(std::string payload) : payload_(std::move(payload)) {}

    virtual buffers as_buffers() const override {
        return {
            boost::asio::buffer(&msg_type_, sizeof(msg_type_)),
            boost::asio::buffer(&payload_length_, sizeof(payload_length_)),
            boost::asio::buffer(payload_),
        };
    }
};

using IMsgPtr = std::unique_ptr<IMsg>;

class MsgClient {
  public:
    MsgClient(boost::asio::io_context& ioc) : m_io(ioc) {}

    error_code connect(tcp::endpoint ep) {
        auto task = std::packaged_task<error_code()>{[this, ep]() {
            error_code ec;
            m_socket.connect(ep, ec);
            return ec;
        }};
        return asio::post(m_io, std::move(task)).get();
    }

    void asyncSendMsg(IMsgPtr msgPtr) {
        post(m_io, [this, msgPtr = std::move(msgPtr)]() mutable {
            m_messagesOut.emplace_back(std::move(msgPtr));

            if (m_messagesOut.size() == 1)
                do_send_loop();
        });
    }

  private:
    void do_send_loop() {
        if (m_messagesOut.empty())
            return;

        asio::async_write( //
            m_socket, m_messagesOut.front()->as_buffers(),
            [this](error_code ec, size_t /*length*/) {
                if (!ec.failed()) {
                    m_messagesOut.pop_front();

                    do_send_loop();
                } else {
                    // todo: error
                }
            });
    }

    asio::io_context&   m_io;
    tcp::socket         m_socket{m_io};
    std::deque<IMsgPtr> m_messagesOut;
};

// Calling the asynSendMsg
int main() {
    boost::asio::io_context ioc;
    MsgClient client(ioc);

    std::cout << "Connecting: " << client.connect({{}, 7878}).message() << "\n";

    for (auto msg : {"Hello", " World!\n", "Bye", " Universe\n"})
        client.asyncSendMsg(std::make_unique<MsgTest>(msg));

    ioc.run();
}

When run against netcat -tlp 7878 | xxd:

Connecting: Success

With the netcat pipeline printing:

00000000: 0000 0001 0000 0005 4865 6c6c 6f00 0000  ........Hello...
00000010: 0100 0000 0820 576f 726c 6421 0a00 0000  ..... World!....
00000020: 0100 0000 0342 7965 0000 0001 0000 000a  .....Bye........
00000030: 2055 6e69 7665 7273 650a                  Universe.

Note the many small fixes/improvements:

  • using buffer sequence instead of serialized string - this may or may not help you depending on how you serialize your data

  • showing some message framing for fun

  • showing how to safely dispatch synchronous operations to the service (by using a packaged task/future)

  • correctly queuing the output messages

    • not allowing multiple async_write operations concurrently (see docs)
    • correctly sending FIFO order (you had Serialize(...back()) but then pop_front()...)

Going From Here

I'd actually go the extra step and switch to the executor model, which has some simplifying benefits:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <deque>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;

// Defined abstract class IMsg and concrete MsgTest class, representing the
// messages to be sent
struct IMsg {
    virtual ~IMsg() = default;
    using buffers = std::vector<boost::asio::const_buffer>;
    virtual buffers as_buffers() const = 0;
};

struct MsgTest : IMsg {
    boost::endian::big_uint32_t msg_type_ = 0x0001;
    std::string                 payload_;
    boost::endian::big_uint32_t payload_length_ = payload_.length();

    MsgTest(std::string payload) : payload_(std::move(payload)) {}

    virtual buffers as_buffers() const override {
        return {
            boost::asio::buffer(&msg_type_, sizeof(msg_type_)),
            boost::asio::buffer(&payload_length_, sizeof(payload_length_)),
            boost::asio::buffer(payload_),
        };
    }
};

using IMsgPtr = std::unique_ptr<IMsg>;

class MsgClient {
  public:
    template <typename Executor> MsgClient(Executor ex) : m_socket(ex) {}

    error_code connect(tcp::endpoint ep) {
        auto task = std::packaged_task<error_code()>{[this, ep]() {
            error_code ec;
            m_socket.connect(ep, ec);
            return ec;
        }};
        return asio::post(m_socket.get_executor(), std::move(task)).get();
    }

    void asyncSendMsg(IMsgPtr msgPtr) {
        post(m_socket.get_executor(),
             [this, msgPtr = std::move(msgPtr)]() mutable {
                 m_messagesOut.emplace_back(std::move(msgPtr));

                 if (m_messagesOut.size() == 1)
                     do_send_loop();
             });
    }

  private:
    void do_send_loop() {
        if (m_messagesOut.empty())
            return;

        asio::async_write( //
            m_socket, m_messagesOut.front()->as_buffers(),
            [this](error_code ec, size_t /*length*/) {
                if (!ec.failed()) {
                    m_messagesOut.pop_front();

                    do_send_loop();
                } else {
                    // todo: error
                }
            });
    }

    tcp::socket         m_socket;
    std::deque<IMsgPtr> m_messagesOut;
};

// Calling the asynSendMsg
int main() {
    boost::asio::thread_pool ioc(1);

    MsgClient client(make_strand(ioc.get_executor()));

    std::cout << "Connecting: " << client.connect({{}, 7878}).message() << "\n";

    for (auto msg : {"Hello", " World!\n", "Bye", " Universe\n"})
        client.asyncSendMsg(std::make_unique<MsgTest>(msg));

    ioc.join();
}

BONUS

If you have to use the old interfaces and cannot upgrade Boost, you might use the BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS define to skip the handler type checks, which makes it compile in more cases: https://godbolt.org/z/76dh9dcjj

  • Related