Home > front end >  Boost Asio and Udp Poll() No incoming data
Boost Asio and Udp Poll() No incoming data

Time:04-27

I have to handle information from 100 ports in parallel for 100ms per second.

I am using Ubuntu OS.

I did some research and i saw that poll() function is a good candidate, to avoid to open 100 threads to handle in parallel data coming on udp protocol.

I did main part with boost and I tried to integrate poll() with boost.

The problem is when i am trying to send by client data to the server, I receive nothing. According to wireshark, data are coming on the right host. (localhost, port 1234)

Did I miss something or did I put something wrong ?

enter image description here

The test code (server) :

#include <deque>
#include <iostream>
#include <chrono>
#include <thread>

#include <sys/poll.h>

#include <boost/optional.hpp>
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>


using boost::asio::ip::udp;
using namespace boost::asio;

using namespace std::chrono_literals;

std::string ip_address = "127.0.0.1";

template<typename T, size_t N>
size_t arraySize( T(&)[N] )
{
    return(N);
}

class UdpReceiver
{
    using Resolver   = udp::resolver;
    using Sockets    = std::deque<udp::socket>;
    using EndPoint   = udp::endpoint;
    using Buffer     = std::array<char, 100>; // receiver buffer

public:
    explicit UdpReceiver()
            : work_(std::ref(resolver_context)), thread_( [this]{ resolver_context.run(); })
    { }

    ~UdpReceiver()
    {
        work_ = boost::none; // using work to keep run active always !
        thread_.join();
    }

    void async_resolve(udp::resolver::query const& query_) {
        resolver_context.post([this, query_] { do_resolve(query_); });
    }

    // callback for event-loop in main thread
    void run_handler(int fd_idx) {
        // start reading
        auto result = read(fd_idx, receive_buf.data(), sizeof(Buffer));
        // increment number of received packets
        received_packets = received_packets   1;
        std::cout << "Received bytes " << result << " current recorded packets " << received_packets <<'\n';

        // run handler posted from resolver threads
        handler_context.poll();
        handler_context.reset();
    }

    static void handle_receive(boost::system::error_code error, udp::resolver::iterator const& iterator) {
        std::cout << "handle_resolve:\n"
                     "  " << error.message() << "\n";
        if (!error)
            std::cout << "  " << iterator->endpoint() << "\n";
    }

    // get current file descriptor
    int fd(size_t idx)
    {
        return sockets[idx].native_handle();
    }

private:

    void do_resolve(boost::asio::ip::udp::resolver::query const& query_) {

        boost::system::error_code error;
        Resolver resolver(resolver_context);
        Resolver::iterator result = resolver.resolve(query_, error);

        sockets.emplace_back(udp::socket(resolver_context, result->endpoint()));

        // post handler callback to service running in main thread
        resolver_context.post(boost::bind(&UdpReceiver::handle_receive, error, result));
    }

private:
    Sockets sockets;
    size_t received_packets = 0;
    EndPoint remote_receiver;
    Buffer receive_buf {};

    io_context resolver_context;
    io_context handler_context;
    boost::optional<boost::asio::io_context::work> work_;
    std::thread thread_;
};

int main (int argc, char** argv)
{
    UdpReceiver udpReceiver;
    udpReceiver.async_resolve(udp::resolver::query(ip_address, std::to_string(1234)));

    //logic
    pollfd fds[2] { };
    for(int i = 0; i < arraySize(fds);   i)
    {
        fds[i].fd = udpReceiver.fd(0);
        fds[i].events = 0;
        fds[i].events |= POLLIN;
        fcntl(fds[i].fd, F_SETFL, O_NONBLOCK);
    }

    // simple event-loop
    while (true) {
        if (poll(fds, arraySize(fds), -1)) // waiting for wakeup call. Timeout - inf
        {
            for(auto &fd : fds)
            {
                if(fd.revents & POLLIN) // checking if we have something to read
                {
                    fd.revents = 0; // reset kernel message
                    udpReceiver.run_handler(fd.fd); // call resolve handler. Do read !
                }
            }
        }
    }
    return 0;
}

CodePudding user response:

This looks like a confused mix of C style poll code and Asio code. The point is

  • you don't need poll (Asio does it internally (or epoll/select/kqueue/IOCP - whatever is available)
  • UDP is connectionless, so you don't need more than one socket to receive all "connections" (senders)

I'd replace it all with a single udp::socket on a single thread. You don't even have to manage the thread/work:

net::thread_pool io(1); // single threaded
udp::socket s{io, {{}, 1234}};

Let's run an asynchronous receive loop for 5s:

std::array<char, 100> receive_buffer;
udp::endpoint sender;

std::function<void(error_code, size_t)> read_loop;
read_loop = [&](error_code ec, size_t bytes) {
    if (bytes != size_t(-1)) {
        //std::cout << "read_loop (" << ec.message() << ")\n";
        if (ec)
            return;

        received_packets  = 1;
        unique_senders.insert(sender);
        //std::cout << "Received:" << bytes << " sender:" << sender << " recorded:" << received_packets << "\n";
        //std::cout << std::string_view(receive_buffer.data(), bytes) << "\n";
    }
    s.async_receive_from(net::buffer(receive_buffer), sender, read_loop);
};

read_loop(error_code{}, -1); // prime the async pump

// after 5s stop
std::this_thread::sleep_for(5s);
post(io, [&s] { s.cancel(); });

io.join();

At the end, we can report the statistics:

std::cout << "A total of " << received_packets << " were received from "
          << unique_senders.size() << " unique senders\n";

With a similated load in bash:

function client() { while read a; do echo "$a" > /dev/udp/localhost/1234 ; done < /etc/dictionaries-common/words; }

for a in {1..20}; do client& done; time wait

We get:

A total of 294808 were received from 28215 unique senders

real    0m5,007s
user    0m0,801s
sys     0m0,830s

enter image description here

This is obviously not optimized, the bottle neck here is likely the many many bash subshells being launched for the clients.

Full Listing

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <set>

namespace net = boost::asio;
using boost::asio::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;

int main ()
{
    net::thread_pool io(1); // single threaded
    udp::socket s{io, {{}, 1234}};

    std::set<udp::endpoint> unique_senders;
    size_t                  received_packets = 0;

    {
        std::array<char, 100> receive_buffer;
        udp::endpoint sender;

        std::function<void(error_code, size_t)> read_loop;
        read_loop = [&](error_code ec, size_t bytes) {
            if (bytes != size_t(-1)) {
                //std::cout << "read_loop (" << ec.message() << ")\n";
                if (ec)
                    return;

                received_packets  = 1;
                unique_senders.insert(sender);
                //std::cout << "Received:" << bytes << " sender:" << sender << " recorded:" << received_packets << "\n";
                //std::cout << std::string_view(receive_buffer.data(), bytes) << "\n";
            }
            s.async_receive_from(net::buffer(receive_buffer), sender, read_loop);
        };

        read_loop(error_code{}, -1); // prime the async pump

        // after 5s stop
        std::this_thread::sleep_for(5s);
        post(io, [&s] { s.cancel(); });

        io.join();
    }

    std::cout << "A total of " << received_packets << " were received from "
              << unique_senders.size() << " unique senders\n";
}
  • Related