I'm trying to write asynchronous write and read data. I have some function which causes write to buffer:
/*
struct DDSEntity
{
std::string key;
std::string participant_key;
std::string topic_name;
std::string topic_type;
bool keyless;
dds_qos_t qos;
std::map<std::string, RouteStatus> routes;
};
struct DiscoveryEvent
{
enum DiscoveryEventType
{
DiscoveredPublication,
UndiscoveredPublication,
DiscoveredSubscription,
UndiscoveredSubscription
};
std::shared_ptr<DDSEntity> entity;
DiscoveryEventType event_type;
};
*/
boost::mutex guard; //global mutex
void send_discovery_event(const dds_entity_t dp, boost::asio::local::stream_protocol::socket *socket,
const DiscoveryEvent& event)
{
SPDLOG_DEBUG("Send discovery event");
boost::async([socket, event]() {
boost::mutex::scoped_lock scoped_lock(guard);
auto bufs = boost::asio::buffer(&event, sizeof(rodds::dds_discovery::DiscoveryEvent));
auto size = boost::asio::write(*socket, bufs);
});
}
On the other hand, I'm trying to start reading the sent data like this
class Plugin
{
public:
Plugin(
const dds_entity_t &dp, boost::asio::local::stream_protocol::socket &rx)
: _reader(&rx), _dp(dp), _buffer(&_de, sizeof(rodds::dds_discovery::DiscoveryEvent))
{
SPDLOG_INFO("Plugin initialized");
_reader->async_read_some(
_buffer,
boost::bind(
&Plugin::async_read_handler,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
));
}
void async_read_handler(const boost::system::error_code &error, std::size_t bytes_trans)
{
assert(!error);
assert(bytes_trans == sizeof(rodds::dds_discovery::DiscoveryEvent));
if (_de.event_type == rodds::dds_discovery::DiscoveryEvent::DiscoveredPublication ||
_de.event_type == rodds::dds_discovery::DiscoveryEvent::DiscoveredSubscription)
SPDLOG_INFO("Catch discovery event:{0}, {1}, {2}", _de.event_type, _de.entity->topic_name, _de.entity->topic_type);
else
SPDLOG_INFO("Catch discovery event:{0}", _de.event_type);
_reader->async_read_some(
_buffer,
boost::bind(
&Plugin::async_read_handler,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
));
}
private:
boost::asio::local::stream_protocol::socket* _reader;
boost::asio::mutable_buffer _buffer;
rodds::dds_discovery::DiscoveryEvent _de;
dds_entity_t _dp;
};
int main(int argc, char* argv[])
{
spdlog::set_level(spdlog::level::debug);
// programm can create reader for only one dds_topic
if (argc == 1 || argc > 2)
{
SPDLOG_ERROR("Provide topic name for forwading reader process");
return 0;
}
SPDLOG_INFO("Provided topic to read: {0}", argv[1]);
// create domain_participant, reader and writer
// sockets to catch rodds::dds_discovery::DiscoveryEvent`s
SPDLOG_INFO("Generate DDS domain participant");
const dds_entity_t dp = dds_create_participant(0, NULL, NULL);
boost::asio::io_service io_service;
SPDLOG_INFO("Create reader/writer sockets");
boost::asio::local::stream_protocol::socket tx(io_service), rx(io_service);
boost::asio::local::connect_pair(tx, rx);
boost::asio::io_service::work work(io_service);
// create Plugin instance
Plugin plugin(dp, rx);
rodds::dds_discovery::run_discovery(dp, &tx);
io_service.run();
return 0;
}
But it turns out that I apparently read already overwritten data, because I get the output
[2022-09-11 13:23:19.226] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, rt/rosout,|msg::dds_::Log_
[2022-09-11 13:23:19.226] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0,|rametersReply,|srv::dds_::GetParameters_Response_
[2022-09-11 13:23:19.226] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, rameter_typesReply,srv::dds_::GetParameterTypes_Response_
[2022-09-11 13:23:19.226] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, CrametersReply,srv::dds_::SetParameters_Response_
[2022-09-11 13:23:19.227] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0,|rameters_atomicallyReply,|srv::dds_::SetParametersAtomically_Response_
[2022-09-11 13:23:19.227] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, |be_parametersReply,|srv::dds_::DescribeParameters_Response_
[2022-09-11 13:23:19.227] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, |arametersReply,|srv::dds_::ListParameters_Response_
[2022-09-11 13:23:19.227] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, |nts,|msg::dds_::ParameterEvent_
[2022-09-11 13:23:19.228] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, rt/chatter,|ds_::String_
I would be very grateful if they point out my mistakes. Thanks.
CodePudding user response:
You seem to be missing the crucial concept of a wire format. You're passing e.g.
auto bufs = boost::asio::buffer(&event, sizeof(rodds::dds_discovery::DiscoveryEvent));
Which treats event
as if it were a POD trivial type. Which it isn't, so this is Undefined Behaviour! It aggregates many non-POD types (std::string, std::shared_ptr, std::map etc). In order to put these on the wire you have to decide on a serialization format.
I recently answered a question Sending an array of ints with boost::asio where the question gives a valid list of options. The answers contain some good advice that you will want to have read. My own answer at the time focused on using POD buffers with Asio, which you might apply to your situation as well.
However in your situation it "smells" like you might be more helped with the Serialization appraoch, as it will not require you to think as much / hard about the memory layout of your objects and the wire format to implement it.
Boost Serialization
You might strive for an implementation like
void send_discovery_event(const dds_entity_t /*dp*/, boost::asio::local::stream_protocol::socket *socket,
const DiscoveryEvent& event)
{
SPDLOG_DEBUG("Send discovery event");
boost::async([socket, event]() {
boost::mutex::scoped_lock scoped_lock(guard);
/*auto size =*/boost::asio::write(
*socket, boost::asio::buffer(ToWireFormat(event)));
});
}
Where ToWireFormat
uses a portable text representation:
template <typename Packet>
std::string ToWireFormat(Packet const& packet) {
std::ostringstream oss;
{
boost::archive::text_oarchive oa(oss);
oa << packet;
} // flush and complete archive
std::string data = std::move(oss).str();
return std::to_string(data.length()) " " data;
}
This works using Boost Serialization, by adding a member serialize
funtion:
struct DDSEntity
{
std::string key;
std::string participant_key;
std::string topic_name;
std::string topic_type;
bool keyless;
dds_qos_t qos;
std::map<std::string, RouteStatus> routes;
// clang-format off
template <typename Ar> void serialize(Ar& ar, unsigned) {
ar & key & participant_key
& topic_name & topic_type & keyless
& qos & routes;
}
// clang-format on
};
using dds_entity_t = DDSEntity;
struct DiscoveryEvent
{
enum DiscoveryEventType
{
DiscoveredPublication,
UndiscoveredPublication,
DiscoveredSubscription,
UndiscoveredSubscription
};
std::shared_ptr<DDSEntity> entity;
DiscoveryEventType event_type;
template <typename Ar> void serialize(Ar& ar, unsigned) {
ar & entity & event_type;
}
};
Of course the constituent types you didn't show should also be serializable. Let's show that with the free-function serialize
instead:
struct dds_qos_t {}; // TODO IMPLEMENT
struct RouteStatus {}; // TODO IMPLEMENT
template <typename Ar>
static inline void serialize(Ar&, dds_qos_t&, unsigned) { /* TODO implement */ }
template <typename Ar>
static inline void serialize(Ar&, RouteStatus&, unsigned) { /* TODO implement */ }
Now, the important part is that std::string
, shared_ptr
and map
are all built-in to Boost:
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/map.hpp>
#include <boost/serialization/shared_ptr.hpp>
That's enough! E.g. doing just
dds_entity_t dds_create_participant(int, void*, void*) {
return dds_entity_t{
"key",
"participant_key",
"topic_name",
"topic_type",
true,
dds_qos_t{},
{
{"route1", RouteStatus{}},
{"route2", RouteStatus{}},
{"route3", RouteStatus{}},
},
};
}
const dds_entity_t dp = dds_create_participant(0, NULL, NULL);
std::cout << ToWireFormat(dp) << std::endl;
Prints e.g.
137 22 serialization::archive 19 1 0
0 3 key 15 participant_key 10 topic_name 10 topic_type 1 0 0 0 0 3 0 0 0 6 route1 0 0 6 route2 6 route3
Further Issues
I'm not sure about the use of the mutex. If it synchronizes access to the event data, you could reduce the scope:
boost::mutex::scoped_lock scoped_lock(guard); auto payload = ToWireFormat(event); scoped_lock.unlock(); /*auto size =*/boost::asio::write(*socket, boost::asio::buffer(payload));
If you want to synchronize access on the socket, prefer using a strand: Strands: Use Threads Without Explicit Locking
when reading, you use
read_some
which has no logic to make sure the read is complete. Instead, use the free-function composed operations (asio::async_read, asio::async_read_until) to read the required information:void do_receive_message() { async_read_until(_reader, _buffer, " ", boost::bind(&Plugin::on_read_length, this, ph::error, ph::bytes_transferred)); } void on_read_length(error_code ec, size_t xfr) { logger << "DEBUG\t" << __FUNCTION__ << "\t" << ec.message() << " " << xfr << std::endl; std::size_t length; char space; if (!ec.failed() && std::istream(&_buffer) >> std::noskipws >> length >> space && space == ' ') // { if (length <= _buffer.size()) on_read_message(ec, 0); else { logger << "Reading " << length << " more to complete message" << std::endl; async_read(_reader, _buffer, asio::transfer_exactly(length - _buffer.size()), boost::bind(&Plugin::on_read_message, this, ph::error, ph::bytes_transferred)); } } } void on_read_message(error_code ec, size_t xfr) { logger << "DEBUG\t" << __FUNCTION__ << "\t" << ec.message() << " " << xfr << std::endl; std::istream msg(&_buffer); auto de = FromWireFormat<DiscoveryEvent>(msg); if (de.event_type == DiscoveryEvent::DiscoveredPublication || de.event_type == DiscoveryEvent::DiscoveredSubscription) logger << "INFO\t" << "Catch discovery event:" << de.event_type << ", " << de.entity->topic_name << ", " << de.entity->topic_type << std::endl; else logger << "INFO\t" << "Catch discovery event:" << de.event_type << std::endl; if (!ec) do_receive_message(); }
This assumes we replaced the buffer to
asio::streambuf
:asio::streambuf _buffer;
And
FromWireFormat
is a simple wrapper around the Boost Serialization functions again:template <typename Packet> Packet FromWireFormat(std::istream& is) { // length has already been taken off by the read operations Packet packet; boost::archive::text_iarchive ia(is); ia >> packet; is.ignore(2, '\n'); // eat newline from archive return packet; }
Full Demo
Also modernizing some things and getting rid of the boost::async
and the mutex in favor of the strand so we have no race conditions on the socket:
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/map.hpp>
#include <boost/serialization/shared_ptr.hpp>
using boost::system::error_code;
namespace asio = boost::asio;
namespace ph = asio::placeholders;
using protocol = asio::local::stream_protocol;
using namespace std::chrono_literals;
static std::ostream logger(std::cout.rdbuf());
namespace rodds { namespace dds_discovery {
struct dds_qos_t {}; // TODO IMPLEMENT
struct RouteStatus {}; // TODO IMPLEMENT
template <typename Ar>
static inline void serialize(Ar&, dds_qos_t&, unsigned) { /* TODO implement */ }
template <typename Ar>
static inline void serialize(Ar&, RouteStatus&, unsigned) { /* TODO implement */ }
struct DDSEntity
{
std::string key;
std::string participant_key;
std::string topic_name;
std::string topic_type;
bool keyless;
dds_qos_t qos;
std::map<std::string, RouteStatus> routes;
// clang-format off
template <typename Ar> void serialize(Ar& ar, unsigned) {
ar & key & participant_key
& topic_name & topic_type & keyless
& qos & routes;
}
// clang-format on
};
using dds_entity_t = DDSEntity;
struct DiscoveryEvent
{
enum DiscoveryEventType
{
DiscoveredPublication,
UndiscoveredPublication,
DiscoveredSubscription,
UndiscoveredSubscription
};
std::shared_ptr<DDSEntity> entity;
DiscoveryEventType event_type;
template <typename Ar> void serialize(Ar& ar, unsigned) {
ar & entity & event_type;
}
};
template <typename Packet>
std::string ToWireFormat(Packet const& packet) {
std::ostringstream oss;
{
boost::archive::text_oarchive oa(oss);
oa << packet;
} // flush and complete archive
std::string data = std::move(oss).str();
return std::to_string(data.length()) " " data;
}
template <typename Packet>
Packet FromWireFormat(std::istream& is) {
// length has already been taken off by the read operations
Packet packet;
boost::archive::text_iarchive ia(is);
ia >> packet;
is.ignore(2, '\n'); // eat newline from archive
return packet;
}
}} // namespace rodds::dds_discovery
using namespace rodds::dds_discovery;
void send_discovery_event(const dds_entity_t /*dp*/, protocol::socket& socket,
const DiscoveryEvent& event) {
logger << "DEBUG\tSend discovery event" << std::endl;
asio::post(socket.get_executor(), [&socket, event]() {
auto size = asio::write(socket, asio::buffer(ToWireFormat(event)));
logger << "DEBUG\tSent discovery event (" << size << ")" << std::endl;
});
}
class Plugin {
public:
Plugin(const dds_entity_t& dp, protocol::socket& rx)
: _reader(rx)
, _dp(dp) {
logger << "INFO\tPlugin initialized" << std::endl;
do_receive_message();
}
void do_receive_message() {
async_read_until(_reader, _buffer, " ",
boost::bind(&Plugin::on_read_length, this, ph::error,
ph::bytes_transferred));
}
void on_read_length(error_code ec, size_t xfr) {
logger << "DEBUG\t" << __FUNCTION__ << "\t" << ec.message() << " " << xfr << std::endl;
std::size_t length;
char space;
if (!ec.failed() &&
std::istream(&_buffer) >> std::noskipws >> length >> space &&
space == ' ') //
{
if (length <= _buffer.size())
on_read_message(ec, 0);
else {
logger << "Reading " << length << " more to complete message"
<< std::endl;
async_read(_reader, _buffer,
asio::transfer_exactly(length - _buffer.size()),
boost::bind(&Plugin::on_read_message, this,
ph::error, ph::bytes_transferred));
}
}
}
void on_read_message(error_code ec, size_t xfr) {
logger << "DEBUG\t" << __FUNCTION__ << "\t" << ec.message() << " " << xfr << std::endl;
std::istream msg(&_buffer);
auto de = FromWireFormat<DiscoveryEvent>(msg);
if (de.event_type == DiscoveryEvent::DiscoveredPublication ||
de.event_type == DiscoveryEvent::DiscoveredSubscription)
logger << "INFO\t"
<< "Catch discovery event:" << de.event_type << ", "
<< de.entity->topic_name << ", " << de.entity->topic_type
<< std::endl;
else
logger << "INFO\t"
<< "Catch discovery event:" << de.event_type << std::endl;
if (!ec)
do_receive_message();
}
private:
protocol::socket& _reader;
asio::streambuf _buffer;
dds_entity_t _dp;
};
static dds_entity_t dds_create_participant(int, void*, void*) {
return dds_entity_t{
"key",
"participant_key",
"topic_name",
"topic_type",
true,
dds_qos_t{},
{
{"route1", RouteStatus{}},
{"route2", RouteStatus{}},
{"route3", RouteStatus{}},
},
};
}
int main(int argc, char* argv[])
{
// program can create reader for only one dds_topic
if (argc != 2) {
logger << "ERROR\tProvide topic name for forwading reader process" << std::endl;
return 1;
}
logger << "INFO\tProvided topic to read: " << argv[1] << std::endl;
// create domain_participant, reader and writer
// sockets to catch rodds::dds_discovery::DiscoveryEvent`s
logger << "INFO\tGenerate DDS domain participant" << std::endl;
asio::io_context ioc;
logger << "INFO\tCreate reader/writer sockets" << std::endl;
protocol::socket tx(make_strand(ioc)), rx(make_strand(ioc));
connect_pair(tx, rx);
// create Plugin instance
const auto dp = std::make_shared<dds_entity_t>( //
dds_create_participant(0, NULL, NULL));
Plugin plugin(*dp, rx);
DiscoveryEvent de { dp, DiscoveryEvent::DiscoveredPublication };
//logger << ToWireFormat(de) << std::endl;
send_discovery_event(*dp, tx, de);
de.event_type = DiscoveryEvent::DiscoveredSubscription;
send_discovery_event(*dp, tx, de);
auto work = make_work_guard(ioc);
//rodds::dds_discovery::run_discovery(dp, &tx);
ioc.run_for(1s);
tx.close();
work.reset();
ioc.run();
}
Printing:
INFO Provided topic to read: tn
INFO Generate DDS domain participant
INFO Create reader/writer sockets
INFO Plugin initialized
DEBUG Send discovery event
DEBUG Send discovery event
DEBUG Sent discovery event (153)
DEBUG Sent discovery event (153)
DEBUG on_read_length Success 4
DEBUG on_read_message Success 0
INFO Catch discovery event:0, topic_name, topic_type
DEBUG on_read_length Success 4
DEBUG on_read_message Success 0
INFO Catch discovery event:2, topic_name, topic_type
DEBUG on_read_length End of file 0