Home > other >  Boost async_read_some data doesn't last long
Boost async_read_some data doesn't last long

Time:09-13

I'm trying to write asynchronous write and read data. I have some function which causes write to buffer:

/*
struct DDSEntity
{
    std::string key;
    std::string participant_key;
    std::string topic_name;
    std::string topic_type;
    bool keyless;
    dds_qos_t qos;
    std::map<std::string, RouteStatus> routes;    
};

struct DiscoveryEvent
{   
    enum DiscoveryEventType
    {
        DiscoveredPublication, 
        UndiscoveredPublication, 
        DiscoveredSubscription, 
        UndiscoveredSubscription 
    };

    std::shared_ptr<DDSEntity> entity;
    DiscoveryEventType event_type;
};

*/
boost::mutex guard; //global mutex

void send_discovery_event(const dds_entity_t dp, boost::asio::local::stream_protocol::socket *socket,
    const DiscoveryEvent& event)
{
    SPDLOG_DEBUG("Send discovery event");
    boost::async([socket, event]() {
            boost::mutex::scoped_lock scoped_lock(guard);
            auto bufs = boost::asio::buffer(&event, sizeof(rodds::dds_discovery::DiscoveryEvent));
            auto size = boost::asio::write(*socket, bufs);
        });
}

On the other hand, I'm trying to start reading the sent data like this

class Plugin
{
public:
    Plugin(
        const dds_entity_t &dp, boost::asio::local::stream_protocol::socket &rx)
    : _reader(&rx), _dp(dp), _buffer(&_de, sizeof(rodds::dds_discovery::DiscoveryEvent))
    {
        SPDLOG_INFO("Plugin initialized");
        
        _reader->async_read_some(
            _buffer, 
             boost::bind(
                &Plugin::async_read_handler,
                this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred 
            ));
    }

    void async_read_handler(const boost::system::error_code &error, std::size_t bytes_trans)
    {
        assert(!error);
        assert(bytes_trans == sizeof(rodds::dds_discovery::DiscoveryEvent));
        
      
        if (_de.event_type == rodds::dds_discovery::DiscoveryEvent::DiscoveredPublication || 
            _de.event_type == rodds::dds_discovery::DiscoveryEvent::DiscoveredSubscription)
            SPDLOG_INFO("Catch discovery event:{0}, {1}, {2}", _de.event_type, _de.entity->topic_name, _de.entity->topic_type);
        else
            SPDLOG_INFO("Catch discovery event:{0}", _de.event_type);

        _reader->async_read_some(
            _buffer, 
             boost::bind(
                &Plugin::async_read_handler,
                this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred 
            ));
    }

private:
    boost::asio::local::stream_protocol::socket* _reader;
    boost::asio::mutable_buffer _buffer;
    rodds::dds_discovery::DiscoveryEvent _de;
    dds_entity_t _dp;
};



int main(int argc, char* argv[])
{
    spdlog::set_level(spdlog::level::debug);
    // programm can create reader for only one dds_topic
    if (argc == 1 || argc > 2) 
    {
        SPDLOG_ERROR("Provide topic name for forwading reader process");
        return 0;
    }
    SPDLOG_INFO("Provided topic to read: {0}", argv[1]);

    // create domain_participant, reader and writer 
    // sockets to catch rodds::dds_discovery::DiscoveryEvent`s
    SPDLOG_INFO("Generate DDS domain participant");
    const dds_entity_t dp = dds_create_participant(0, NULL, NULL);
    boost::asio::io_service io_service;
    SPDLOG_INFO("Create reader/writer sockets");
    boost::asio::local::stream_protocol::socket tx(io_service), rx(io_service);
    boost::asio::local::connect_pair(tx, rx);

    boost::asio::io_service::work work(io_service); 

    // create Plugin instance
    Plugin plugin(dp, rx);

    rodds::dds_discovery::run_discovery(dp, &tx);
    
    io_service.run();

    return 0;
}

But it turns out that I apparently read already overwritten data, because I get the output

[2022-09-11 13:23:19.226] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, rt/rosout,|msg::dds_::Log_
[2022-09-11 13:23:19.226] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0,|rametersReply,|srv::dds_::GetParameters_Response_
[2022-09-11 13:23:19.226] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, rameter_typesReply,srv::dds_::GetParameterTypes_Response_
[2022-09-11 13:23:19.226] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, CrametersReply,srv::dds_::SetParameters_Response_
[2022-09-11 13:23:19.227] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0,|rameters_atomicallyReply,|srv::dds_::SetParametersAtomically_Response_
[2022-09-11 13:23:19.227] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, |be_parametersReply,|srv::dds_::DescribeParameters_Response_
[2022-09-11 13:23:19.227] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, |arametersReply,|srv::dds_::ListParameters_Response_
[2022-09-11 13:23:19.227] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0,  |nts,|msg::dds_::ParameterEvent_
[2022-09-11 13:23:19.228] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, rt/chatter,|ds_::String_

I would be very grateful if they point out my mistakes. Thanks.

CodePudding user response:

You seem to be missing the crucial concept of a wire format. You're passing e.g.

auto bufs = boost::asio::buffer(&event, sizeof(rodds::dds_discovery::DiscoveryEvent));

Which treats event as if it were a POD trivial type. Which it isn't, so this is Undefined Behaviour! It aggregates many non-POD types (std::string, std::shared_ptr, std::map etc). In order to put these on the wire you have to decide on a serialization format.

I recently answered a question Sending an array of ints with boost::asio where the question gives a valid list of options. The answers contain some good advice that you will want to have read. My own answer at the time focused on using POD buffers with Asio, which you might apply to your situation as well.

However in your situation it "smells" like you might be more helped with the Serialization appraoch, as it will not require you to think as much / hard about the memory layout of your objects and the wire format to implement it.

Boost Serialization

You might strive for an implementation like

void send_discovery_event(const dds_entity_t /*dp*/, boost::asio::local::stream_protocol::socket *socket,
    const DiscoveryEvent& event)
{
    SPDLOG_DEBUG("Send discovery event");
    boost::async([socket, event]() {
        boost::mutex::scoped_lock scoped_lock(guard);
        /*auto size =*/boost::asio::write(
            *socket, boost::asio::buffer(ToWireFormat(event)));
    });
}

Where ToWireFormat uses a portable text representation:

template <typename Packet>
std::string ToWireFormat(Packet const& packet) {
    std::ostringstream oss;
    {
        boost::archive::text_oarchive oa(oss);
        oa << packet;
    } // flush and complete archive

    std::string data = std::move(oss).str();
    return std::to_string(data.length())   " "   data;
}

This works using Boost Serialization, by adding a member serialize funtion:

struct DDSEntity
{
    std::string key;
    std::string participant_key;
    std::string topic_name;
    std::string topic_type;
    bool keyless;
    dds_qos_t qos;
    std::map<std::string, RouteStatus> routes;

    // clang-format off
    template <typename Ar> void serialize(Ar& ar, unsigned) {
        ar & key & participant_key
           & topic_name & topic_type & keyless
           & qos & routes;
    }
    // clang-format on
};

using dds_entity_t = DDSEntity;

struct DiscoveryEvent
{
    enum DiscoveryEventType
    {
        DiscoveredPublication,
        UndiscoveredPublication,
        DiscoveredSubscription,
        UndiscoveredSubscription
    };

    std::shared_ptr<DDSEntity> entity;
    DiscoveryEventType event_type;

    template <typename Ar> void serialize(Ar& ar, unsigned) {
        ar & entity & event_type;
    }
};

Of course the constituent types you didn't show should also be serializable. Let's show that with the free-function serialize instead:

struct dds_qos_t {}; // TODO IMPLEMENT
struct RouteStatus {}; // TODO IMPLEMENT

template <typename Ar>
static inline void serialize(Ar&, dds_qos_t&, unsigned) { /* TODO implement */ }
template <typename Ar>
static inline void serialize(Ar&, RouteStatus&, unsigned) { /* TODO implement */ }

Now, the important part is that std::string, shared_ptr and map are all built-in to Boost:

#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/map.hpp>
#include <boost/serialization/shared_ptr.hpp>

That's enough! E.g. doing just

dds_entity_t dds_create_participant(int, void*, void*) {
    return dds_entity_t{
        "key",
        "participant_key",
        "topic_name",
        "topic_type",
        true,
        dds_qos_t{},
        {
            {"route1", RouteStatus{}},
            {"route2", RouteStatus{}},
            {"route3", RouteStatus{}},
        },
    };
}

const dds_entity_t dp = dds_create_participant(0, NULL, NULL);
std::cout << ToWireFormat(dp) << std::endl;

Prints e.g.

137 22 serialization::archive 19 1 0
0 3 key 15 participant_key 10 topic_name 10 topic_type 1 0 0 0 0 3 0 0 0 6 route1 0 0 6 route2 6 route3

Further Issues

  1. I'm not sure about the use of the mutex. If it synchronizes access to the event data, you could reduce the scope:

    boost::mutex::scoped_lock scoped_lock(guard);
    auto payload = ToWireFormat(event);
    scoped_lock.unlock();
    
    /*auto size =*/boost::asio::write(*socket,
                                      boost::asio::buffer(payload));
    

    If you want to synchronize access on the socket, prefer using a strand: Strands: Use Threads Without Explicit Locking

  2. when reading, you use read_some which has no logic to make sure the read is complete. Instead, use the free-function composed operations (asio::async_read, asio::async_read_until) to read the required information:

    void do_receive_message() {
        async_read_until(_reader, _buffer, " ",
                         boost::bind(&Plugin::on_read_length, this, ph::error,
                                     ph::bytes_transferred));
    }
    
    void on_read_length(error_code ec, size_t xfr) {
        logger << "DEBUG\t" << __FUNCTION__ << "\t" << ec.message() << " " << xfr << std::endl;
    
        std::size_t length;
        char space;
        if (!ec.failed() &&
            std::istream(&_buffer) >> std::noskipws >> length >> space &&
            space == ' ') //
        {
            if (length <= _buffer.size())
                on_read_message(ec, 0);
            else {
                logger << "Reading " << length << " more to complete message"
                       << std::endl;
                async_read(_reader, _buffer,
                        asio::transfer_exactly(length - _buffer.size()),
                        boost::bind(&Plugin::on_read_message, this,
                            ph::error, ph::bytes_transferred));
            }
        }
    }
    
    void on_read_message(error_code ec, size_t xfr) {
        logger << "DEBUG\t" << __FUNCTION__ << "\t" << ec.message() << " " << xfr << std::endl;
    
        std::istream msg(&_buffer);
        auto         de = FromWireFormat<DiscoveryEvent>(msg);
    
        if (de.event_type == DiscoveryEvent::DiscoveredPublication ||
            de.event_type == DiscoveryEvent::DiscoveredSubscription)
            logger << "INFO\t"
                   << "Catch discovery event:" << de.event_type << ", "
                   << de.entity->topic_name << ", " << de.entity->topic_type
                   << std::endl;
        else
            logger << "INFO\t"
                   << "Catch discovery event:" << de.event_type << std::endl;
    
        if (!ec)
            do_receive_message();
    }
    

    This assumes we replaced the buffer to asio::streambuf:

    asio::streambuf   _buffer;
    

    And FromWireFormat is a simple wrapper around the Boost Serialization functions again:

    template <typename Packet>
    Packet FromWireFormat(std::istream& is) {
        // length has already been taken off by the read operations
        Packet packet;
        boost::archive::text_iarchive ia(is);
        ia >> packet;
        is.ignore(2, '\n'); // eat newline from archive
        return packet;
    }
    

Full Demo

Also modernizing some things and getting rid of the boost::async and the mutex in favor of the strand so we have no race conditions on the socket:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>

#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/map.hpp>
#include <boost/serialization/shared_ptr.hpp>

using boost::system::error_code;
namespace asio = boost::asio;
namespace ph   = asio::placeholders;
using protocol = asio::local::stream_protocol;
using namespace std::chrono_literals;
static std::ostream logger(std::cout.rdbuf());

namespace rodds { namespace dds_discovery {
    struct dds_qos_t {}; // TODO IMPLEMENT
    struct RouteStatus {}; // TODO IMPLEMENT

    template <typename Ar>
    static inline void serialize(Ar&, dds_qos_t&, unsigned) { /* TODO implement */ }
    template <typename Ar>
    static inline void serialize(Ar&, RouteStatus&, unsigned) { /* TODO implement */ }

    struct DDSEntity
    {
        std::string key;
        std::string participant_key;
        std::string topic_name;
        std::string topic_type;
        bool keyless;
        dds_qos_t qos;
        std::map<std::string, RouteStatus> routes;

        // clang-format off
        template <typename Ar> void serialize(Ar& ar, unsigned) {
            ar & key & participant_key
               & topic_name & topic_type & keyless
               & qos & routes;
        }
        // clang-format on
    };

    using dds_entity_t = DDSEntity;

    struct DiscoveryEvent
    {
        enum DiscoveryEventType
        {
            DiscoveredPublication,
            UndiscoveredPublication,
            DiscoveredSubscription,
            UndiscoveredSubscription
        };

        std::shared_ptr<DDSEntity> entity;
        DiscoveryEventType event_type;

        template <typename Ar> void serialize(Ar& ar, unsigned) {
            ar & entity & event_type;
        }
    };

    template <typename Packet>
    std::string ToWireFormat(Packet const& packet) {
        std::ostringstream oss;
        {
            boost::archive::text_oarchive oa(oss);
            oa << packet;
        } // flush and complete archive

        std::string data = std::move(oss).str();
        return std::to_string(data.length())   " "   data;
    }

    template <typename Packet>
    Packet FromWireFormat(std::istream& is) {
        // length has already been taken off by the read operations
        Packet packet;
        boost::archive::text_iarchive ia(is);
        ia >> packet;
        is.ignore(2, '\n'); // eat newline from archive
        return packet;
    }
}} // namespace rodds::dds_discovery
  
using namespace rodds::dds_discovery;

void send_discovery_event(const dds_entity_t /*dp*/, protocol::socket& socket,
                          const DiscoveryEvent& event) {
    logger << "DEBUG\tSend discovery event" << std::endl;
    asio::post(socket.get_executor(), [&socket, event]() {
        auto size = asio::write(socket, asio::buffer(ToWireFormat(event)));
        logger << "DEBUG\tSent discovery event (" << size << ")" << std::endl;
    });
}

class Plugin {
  public:
    Plugin(const dds_entity_t& dp, protocol::socket& rx)
        : _reader(rx)
        , _dp(dp) {
        logger << "INFO\tPlugin initialized" << std::endl;
        do_receive_message();
    }

    void do_receive_message() {
        async_read_until(_reader, _buffer, " ",
                         boost::bind(&Plugin::on_read_length, this, ph::error,
                                     ph::bytes_transferred));
    }

    void on_read_length(error_code ec, size_t xfr) {
        logger << "DEBUG\t" << __FUNCTION__ << "\t" << ec.message() << " " << xfr << std::endl;

        std::size_t length;
        char space;
        if (!ec.failed() &&
            std::istream(&_buffer) >> std::noskipws >> length >> space &&
            space == ' ') //
        {
            if (length <= _buffer.size())
                on_read_message(ec, 0);
            else {
                logger << "Reading " << length << " more to complete message"
                       << std::endl;
                async_read(_reader, _buffer,
                        asio::transfer_exactly(length - _buffer.size()),
                        boost::bind(&Plugin::on_read_message, this,
                            ph::error, ph::bytes_transferred));
            }
        }
    }

    void on_read_message(error_code ec, size_t xfr) {
        logger << "DEBUG\t" << __FUNCTION__ << "\t" << ec.message() << " " << xfr << std::endl;
      
        std::istream msg(&_buffer);
        auto         de = FromWireFormat<DiscoveryEvent>(msg);

        if (de.event_type == DiscoveryEvent::DiscoveredPublication ||
            de.event_type == DiscoveryEvent::DiscoveredSubscription)
            logger << "INFO\t"
                   << "Catch discovery event:" << de.event_type << ", "
                   << de.entity->topic_name << ", " << de.entity->topic_type
                   << std::endl;
        else
            logger << "INFO\t"
                   << "Catch discovery event:" << de.event_type << std::endl;

        if (!ec)
            do_receive_message();
    }

  private:
    protocol::socket& _reader;
    asio::streambuf   _buffer;
    dds_entity_t      _dp;
};

static dds_entity_t dds_create_participant(int, void*, void*) {
    return dds_entity_t{
        "key",
        "participant_key",
        "topic_name",
        "topic_type",
        true,
        dds_qos_t{},
        {
            {"route1", RouteStatus{}},
            {"route2", RouteStatus{}},
            {"route3", RouteStatus{}},
        },
    };
}

int main(int argc, char* argv[])
{
    // program can create reader for only one dds_topic
    if (argc != 2) {
        logger << "ERROR\tProvide topic name for forwading reader process" << std::endl;
        return 1;
    }
    logger << "INFO\tProvided topic to read: " << argv[1] << std::endl;

    // create domain_participant, reader and writer 
    // sockets to catch rodds::dds_discovery::DiscoveryEvent`s
    logger << "INFO\tGenerate DDS domain participant" << std::endl;
    asio::io_context ioc;

    logger << "INFO\tCreate reader/writer sockets" << std::endl;
    protocol::socket tx(make_strand(ioc)), rx(make_strand(ioc));
    connect_pair(tx, rx);

    // create Plugin instance
    const auto dp = std::make_shared<dds_entity_t>( //
        dds_create_participant(0, NULL, NULL));
    Plugin     plugin(*dp, rx);

    DiscoveryEvent de { dp, DiscoveryEvent::DiscoveredPublication };
    //logger << ToWireFormat(de) << std::endl;
    send_discovery_event(*dp, tx, de);

    de.event_type = DiscoveryEvent::DiscoveredSubscription;
    send_discovery_event(*dp, tx, de);

    auto work = make_work_guard(ioc);
    //rodds::dds_discovery::run_discovery(dp, &tx);
    
    ioc.run_for(1s);

    tx.close();
    work.reset();

    ioc.run();
}

Printing:

INFO    Provided topic to read: tn
INFO    Generate DDS domain participant
INFO    Create reader/writer sockets
INFO    Plugin initialized
DEBUG   Send discovery event
DEBUG   Send discovery event
DEBUG   Sent discovery event (153)
DEBUG   Sent discovery event (153)
DEBUG   on_read_length  Success 4
DEBUG   on_read_message Success 0
INFO    Catch discovery event:0, topic_name, topic_type
DEBUG   on_read_length  Success 4
DEBUG   on_read_message Success 0
INFO    Catch discovery event:2, topic_name, topic_type
DEBUG   on_read_length  End of file 0
  • Related