I'm trying to use boost::asio::io_service::post
with std::unique_ptr
in lambda-capture of the handler. But the post
handler needs to be copy-constructed for later execution. Thus, as far as I understand the msgPtr
value of type std::unique_ptr
in the lambda-capture as well needs to be copied, even though I'm trying to move-construct it in the lambda-capture.
But the copy constructor in the unique_ptr
is deleted, which makes complete sense.
Is there a way to move the std::unique_ptr
to be executed later in the handler of the io_service::post
?
The error I get when I try to compile it with GCC:
error: use of deleted function ...
BOOST_ASIO_COMPLETION_HANDLER_CHECK(CompletionHandler, handler) type_check;
The code sample goes below (updated):
#include <boost/asio.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <deque>
#include <memory>
#include <string>
// Defined abstract class IMsg and concrete MsgTest class, representing the
// messages to be sent
class IMsg {
public:
explicit IMsg() = default;
virtual ~IMsg() = default;
IMsg(const IMsg& msg) = delete;
IMsg& operator=(const IMsg& msg) = delete;
virtual const std::string& Serialize() = 0;
};
class MsgTest : public IMsg {
public:
explicit MsgTest(const std::string& msg) : m_testMsg(msg) {}
MsgTest(const MsgTest&) = delete;
MsgTest& operator=(const MsgTest&) = delete;
const std::string& Serialize() override { return m_testMsg; }
private:
std::string m_testMsg;
};
using IMsgPtr = std::unique_ptr<IMsg>;
class MsgClient {
public:
MsgClient(boost::asio::io_service& ioService)
: m_ioService(ioService), m_socket(ioService) {}
void asyncSendMsg(IMsgPtr&& msgPtr) {
m_ioService.post([this, msgPtr{std::move(msgPtr)}]() mutable {
m_messagesOut.emplace_back(std::move(msgPtr));
// Serialize() returns const reference to serialized member data.
// Thus, the data will be valid until the async_write returns.
const std::string& currentMsg = m_messagesOut.front()->Serialize();
// todo: such usage of async_write is dangerous and not thread-safe!
boost::asio::async_write(
m_socket, boost::asio::buffer(currentMsg, currentMsg.length()),
[this](boost::system::error_code ec, std::size_t length) {
if (ec == 0) {
m_messagesOut.pop_front();
} else {
// todo: error
}
});
});
}
private:
std::deque<IMsgPtr> m_messagesOut;
boost::asio::io_service& m_ioService;
boost::asio::ip::tcp::socket m_socket;
};
// Calling the asynSendMsg
int main() {
boost::asio::io_service ioServiceWorker;
MsgClient client(ioServiceWorker);
//todo: client.connect()...
client.asyncSendMsg(std::make_unique<MsgTest>("Hello World"));
// code that runs io_service in a separate thread is present below
// but omitted.
}
P.S. We are limited to using C 14 only and Boost.Asio version 1.57. Also, if anyone is interested in why I'm using post
and call async_write
in the handler (why not async_write
directly):
I have a multi-threaded app, and asyncSendMsg
is usually called from a different thread than the MsgClient
resides in. Thus, by adding the IMsgPtr
into the array by post
-ing, I can avoid using inter-thread synchronization using mutexes - as far as post
's handler will always be executed in the same thread where the MsgClient
is located.
I'd appreciate knowing how I can move construct std::unique_ptr<IMsg>
in the post
's handler.
Also, if you have any ideas on improving the code quality here, this will be appreciated as well.
CodePudding user response:
Move-only handler support has improved over latest releases.
That means when using the post, dispatch and defer interfaces on executors. However, you're using the legacy io_service::post
interface.
If your version of boost is recent enough you may be able to simply get away with switching m_io.post(task)
into asio::post(m_io, task)
First Take
#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <deque>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
// Defined abstract class IMsg and concrete MsgTest class, representing the
// messages to be sent
struct IMsg {
virtual ~IMsg() = default;
using buffers = std::vector<boost::asio::const_buffer>;
virtual buffers as_buffers() const = 0;
};
struct MsgTest : public IMsg {
boost::endian::big_uint32_t msg_type_ = 0x0001;
std::string payload_;
boost::endian::big_uint32_t payload_length_ = payload_.length();
MsgTest(std::string payload) : payload_(std::move(payload)) {}
virtual buffers as_buffers() const override {
return {
boost::asio::buffer(&msg_type_, sizeof(msg_type_)),
boost::asio::buffer(&payload_length_, sizeof(payload_length_)),
boost::asio::buffer(payload_),
};
}
};
using IMsgPtr = std::unique_ptr<IMsg>;
class MsgClient {
public:
MsgClient(boost::asio::io_context& ioc) : m_io(ioc) {}
error_code connect(tcp::endpoint ep) {
auto task = std::packaged_task<error_code()>{[this, ep]() {
error_code ec;
m_socket.connect(ep, ec);
return ec;
}};
return asio::post(m_io, std::move(task)).get();
}
void asyncSendMsg(IMsgPtr msgPtr) {
post(m_io, [this, msgPtr = std::move(msgPtr)]() mutable {
m_messagesOut.emplace_back(std::move(msgPtr));
if (m_messagesOut.size() == 1)
do_send_loop();
});
}
private:
void do_send_loop() {
if (m_messagesOut.empty())
return;
asio::async_write( //
m_socket, m_messagesOut.front()->as_buffers(),
[this](error_code ec, size_t /*length*/) {
if (!ec.failed()) {
m_messagesOut.pop_front();
do_send_loop();
} else {
// todo: error
}
});
}
asio::io_context& m_io;
tcp::socket m_socket{m_io};
std::deque<IMsgPtr> m_messagesOut;
};
// Calling the asynSendMsg
int main() {
boost::asio::io_context ioc;
MsgClient client(ioc);
std::cout << "Connecting: " << client.connect({{}, 7878}).message() << "\n";
for (auto msg : {"Hello", " World!\n", "Bye", " Universe\n"})
client.asyncSendMsg(std::make_unique<MsgTest>(msg));
ioc.run();
}
When run against netcat -tlp 7878 | xxd
:
Connecting: Success
With the netcat pipeline printing:
00000000: 0000 0001 0000 0005 4865 6c6c 6f00 0000 ........Hello...
00000010: 0100 0000 0820 576f 726c 6421 0a00 0000 ..... World!....
00000020: 0100 0000 0342 7965 0000 0001 0000 000a .....Bye........
00000030: 2055 6e69 7665 7273 650a Universe.
Note the many small fixes/improvements:
using buffer sequence instead of serialized string - this may or may not help you depending on how you serialize your data
showing some message framing for fun
showing how to safely dispatch synchronous operations to the service (by using a packaged task/future)
correctly queuing the output messages
- not allowing multiple
async_write
operations concurrently (see docs) - correctly sending FIFO order (you had
Serialize(...back())
but thenpop_front()
...)
- not allowing multiple
Going From Here
I'd actually go the extra step and switch to the executor model, which has some simplifying benefits:
#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <deque>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
// Defined abstract class IMsg and concrete MsgTest class, representing the
// messages to be sent
struct IMsg {
virtual ~IMsg() = default;
using buffers = std::vector<boost::asio::const_buffer>;
virtual buffers as_buffers() const = 0;
};
struct MsgTest : IMsg {
boost::endian::big_uint32_t msg_type_ = 0x0001;
std::string payload_;
boost::endian::big_uint32_t payload_length_ = payload_.length();
MsgTest(std::string payload) : payload_(std::move(payload)) {}
virtual buffers as_buffers() const override {
return {
boost::asio::buffer(&msg_type_, sizeof(msg_type_)),
boost::asio::buffer(&payload_length_, sizeof(payload_length_)),
boost::asio::buffer(payload_),
};
}
};
using IMsgPtr = std::unique_ptr<IMsg>;
class MsgClient {
public:
template <typename Executor> MsgClient(Executor ex) : m_socket(ex) {}
error_code connect(tcp::endpoint ep) {
auto task = std::packaged_task<error_code()>{[this, ep]() {
error_code ec;
m_socket.connect(ep, ec);
return ec;
}};
return asio::post(m_socket.get_executor(), std::move(task)).get();
}
void asyncSendMsg(IMsgPtr msgPtr) {
post(m_socket.get_executor(),
[this, msgPtr = std::move(msgPtr)]() mutable {
m_messagesOut.emplace_back(std::move(msgPtr));
if (m_messagesOut.size() == 1)
do_send_loop();
});
}
private:
void do_send_loop() {
if (m_messagesOut.empty())
return;
asio::async_write( //
m_socket, m_messagesOut.front()->as_buffers(),
[this](error_code ec, size_t /*length*/) {
if (!ec.failed()) {
m_messagesOut.pop_front();
do_send_loop();
} else {
// todo: error
}
});
}
tcp::socket m_socket;
std::deque<IMsgPtr> m_messagesOut;
};
// Calling the asynSendMsg
int main() {
boost::asio::thread_pool ioc(1);
MsgClient client(make_strand(ioc.get_executor()));
std::cout << "Connecting: " << client.connect({{}, 7878}).message() << "\n";
for (auto msg : {"Hello", " World!\n", "Bye", " Universe\n"})
client.asyncSendMsg(std::make_unique<MsgTest>(msg));
ioc.join();
}
BONUS
If you have to use the old interfaces and cannot upgrade Boost, you might use the BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS
define to skip the handler type checks, which makes it compile in more cases: https://godbolt.org/z/76dh9dcjj