Home > Net >  TCP client socket problem while emplacing in vector (solution and/or improved proposal)
TCP client socket problem while emplacing in vector (solution and/or improved proposal)

Time:08-24

The next code contains a tcp client class which should be created one or more times defined in a file (it is hard-coded for the example) and emplaced into a std::vector object, and then connected to its corresponding server socket.

Godbolt link: https://godbolt.org/z/hzK9jhzjc

#include <chrono>
#include <thread>
#include <fstream>
#include <boost/asio.hpp>

namespace tcpsocket
{

using boost::asio::ip::tcp;

class client
{
public:
    void connect(const std::string& host, const std::string& port)
    {
        if (host.empty() || port.empty()) return;

        tcp::resolver resolver{ io_context };
        tcp::resolver::results_type endpoints = resolver.resolve(host, port);

        boost::asio::async_connect(socket, endpoints, [this](const boost::system::error_code& error, const tcp::endpoint /*endpoint*/)
        {
            if (!error)
                read();
        });
    }

    void read()
    {
        socket.async_read_some(boost::asio::buffer(data, max_length), [this](const boost::system::error_code& error, std::size_t bytes)
        {
            if (error) return socket.close();
            bytes_received = bytes;
            read();
        });
    }

    void write(const std::string& msg)
    {
        boost::system::error_code error;
        size_t bytes = socket.write_some(boost::asio::buffer(msg), error);
        if (error) return socket.close();
    }

    void poll()
    {
        io_context.poll();
    }

private: 
    std::string host;
    std::string port;
    size_t bytes_received{};
    enum { max_length = 512 };
    unsigned char data[max_length];
    boost::asio::io_context io_context;
    tcp::socket socket{io_context};
};

}//namespace tcpsocket

struct Cfg
{
    unsigned id{};
    std::string host;
    std::string port;
};

struct Client
{
    unsigned id{};
    tcpsocket::client sck;
};

int main()
{
    std::vector<Client> clients;
    
    std::vector<Cfg> config{ {125u, "127.0.0.1", "30000"}, {137u, "127.0.0.1", "30001"} };//In real life, this would come from configuration file
    for (auto&[id, host, port] : config)
    {
      //auto& client = clients.push_back(Client{id, {}});//This is failing (typo error with return value detected by Sehe!!!)
      auto& client = clients.emplace_back(id, {});//This is failing
      client.sck.connect(host, port);
    }

    while (true)
    {
        for (auto&[id, client] : clients)
            client.poll();

        using namespace std::chrono_literals;
        std::this_thread::sleep_for(100ms);
    }
}

The program is not compiling, due to an error with copying io_context/socket under my understanding, but I may be wrong in this point.

How can I fix this? And therefore, is there any better alternative to which I am doing? For example, it should be a better approach to make some tcp socket pool into the client class and use the same io_context for all them?

CodePudding user response:

push_back doesn't return a value (return type is void). If you have c 17, emplace_back can be used like that:

    auto& client = clients.emplace_back(Client{id, {}});

But vector can reallocate, which necessitates moving or copying all elements. Since client isn't copyable nor movable, that can't work. And that's only good, because otherwise the async_ operations would run into UB when the vector was reallocated.

Consider deque or list which afford reference stability (meaning elements don't reallocate, or in fewer cases). std::list is the safer of the two here:

std::list<Client> clients;

This gets you somewhere. However I'd note a few things:

  • it's not efficient to create separate IO services for each client
  • manually polling them is not typical
  • you had host and port members that were never used
  • bytes_received was being overwritten
  • write_some doesn't guarantee the whole buffer will be written
  • you're mixing async and sync operations (async_read vs write_some). This is not always a good idea. I think for tcp::socket this will be fine in the given use case, but don't expect IO objects to support this in general
  • There's no reason to supply the array length for boost::asio::buffer - it will be deduced. Even better to use std::array instead of C style array
  • I see your #include <thread>; if you intend to run on multiple threads, be aware of strands: Why do I need strand per connection when using boost::asio?

Here's a simplified, fixed version with the above:

Live On Coliru

#include <boost/asio.hpp>
#include <chrono>
#include <fstream>
#include <iostream>
#include <list>

using namespace std::chrono_literals;

namespace tcpsocket {

    using boost::asio::ip::tcp;
    using boost::system::error_code;

    class client {
      public:
        client(boost::asio::any_io_executor ex) : socket_(ex) {}

        size_t bytes_received() const { return bytes_received_; }

        void connect(const std::string& host, const std::string& port) {
            post(socket_.get_executor(), [=, this] { do_connect(host, port); });
        }
        void write(std::string msg) {
            post(socket_.get_executor(), [=, this] { do_write(msg); });
        }
        void read() {
            post(socket_.get_executor(), [=, this] { do_read(); });
        }

      private:
        void do_connect(const std::string& host, const std::string& port) {
            if (host.empty() || port.empty())
                return;

            tcp::resolver resolver{socket_.get_executor()};

            async_connect(socket_, resolver.resolve(host, port),
                          [this](error_code ec, tcp::endpoint /*endpoint*/) {
                              if (!ec)
                                  do_read();
                              else
                                  std::cerr << ec.message() << std::endl;
                          });
        }

        void do_write(const std::string& msg) {
            error_code ec;
            boost::asio::write(socket_, boost::asio::buffer(msg), ec);
            if (ec) {
                std::cerr << "Closing (" << ec.message() << ")" << std::endl;
                return socket_.close();
            }
        }

        void do_read() {
            socket_.async_read_some( //
                boost::asio::buffer(data),
                [this](error_code ec, std::size_t bytes) {
                    if (ec)
                        return socket_.close();
                    bytes_received_  = bytes;
                    do_read();
                });
        }

        std::atomic_size_t             bytes_received_{0};
        std::array<unsigned char, 512> data;
        tcp::socket                    socket_;
    };

} // namespace tcpsocket

struct Cfg {
    unsigned id{};
    std::string host;
    std::string port;
};

struct Client {
    Client(unsigned id, boost::asio::any_io_executor ex) : id_(id), impl_(ex) {}

    unsigned          id_;
    tcpsocket::client impl_;
};

int main()
{
    boost::asio::io_context ioc;

    std::list<Client> clients;

    std::vector<Cfg> const config{
        {125u, "127.0.0.1", "30000"},
        {137u, "127.0.0.1", "30001"},
        {149u, "127.0.0.1", "30002"},
        {161u, "127.0.0.1", "30003"},
        {173u, "127.0.0.1", "30004"},
        {185u, "127.0.0.1", "30005"},
        {197u, "127.0.0.1", "30006"},
        {209u, "127.0.0.1", "30007"},
        {221u, "127.0.0.1", "30008"},
        {233u, "127.0.0.1", "30009"},
        {245u, "127.0.0.1", "30010"},
    };

    for (auto& [id, host, port] : config) {
        auto& c = clients.emplace_back(id, make_strand(ioc));

        c.impl_.connect(host, port);
        c.impl_.write(std::to_string(id)   " connected to "   host   ":"   port   "\n");
    }

    ioc.run_for(150ms);

    for (auto& [id, impl]: clients)
        std::cout << id << " received " << impl.bytes_received() << "\n";
}

Prints

(for a in {30000..30010}; do netcat -tlp $a < main.cpp & done)
g   -std=c  20 -O2 -Wall -pedantic -pthread main.cpp
./a.out
125 connected to 127.0.0.1:30000
149 connected to 127.0.0.1:30002
161 connected to 127.0.0.1:30003
185 connected to 127.0.0.1:30005
197 connected to 127.0.0.1:30006
209 connected to 127.0.0.1:30007
221 connected to 127.0.0.1:30008
233 connected to 127.0.0.1:30009
173 connected to 127.0.0.1:30004
245 connected to 127.0.0.1:30010
137 connected to 127.0.0.1:30001
125 received 3386
137 received 3386
149 received 3386
161 received 3386
173 received 3386
185 received 3386
197 received 3386
209 received 3386
221 received 3386
233 received 3386
245 received 3386

Other Notes

Read operations form a loop (implicit strand).

Note that it is still your responsibility to ensure no write operations overlap. If necessary, introduce a queue so that you can have multiple messages pending. See e.g. How to safely write to a socket from multiple threads?

  • Related