The next code contains a tcp client class which should be created one or more times defined in a file (it is hard-coded for the example) and emplaced into a std::vector
object, and then connected to its corresponding server socket.
Godbolt link: https://godbolt.org/z/hzK9jhzjc
#include <chrono>
#include <thread>
#include <fstream>
#include <boost/asio.hpp>
namespace tcpsocket
{
using boost::asio::ip::tcp;
class client
{
public:
void connect(const std::string& host, const std::string& port)
{
if (host.empty() || port.empty()) return;
tcp::resolver resolver{ io_context };
tcp::resolver::results_type endpoints = resolver.resolve(host, port);
boost::asio::async_connect(socket, endpoints, [this](const boost::system::error_code& error, const tcp::endpoint /*endpoint*/)
{
if (!error)
read();
});
}
void read()
{
socket.async_read_some(boost::asio::buffer(data, max_length), [this](const boost::system::error_code& error, std::size_t bytes)
{
if (error) return socket.close();
bytes_received = bytes;
read();
});
}
void write(const std::string& msg)
{
boost::system::error_code error;
size_t bytes = socket.write_some(boost::asio::buffer(msg), error);
if (error) return socket.close();
}
void poll()
{
io_context.poll();
}
private:
std::string host;
std::string port;
size_t bytes_received{};
enum { max_length = 512 };
unsigned char data[max_length];
boost::asio::io_context io_context;
tcp::socket socket{io_context};
};
}//namespace tcpsocket
struct Cfg
{
unsigned id{};
std::string host;
std::string port;
};
struct Client
{
unsigned id{};
tcpsocket::client sck;
};
int main()
{
std::vector<Client> clients;
std::vector<Cfg> config{ {125u, "127.0.0.1", "30000"}, {137u, "127.0.0.1", "30001"} };//In real life, this would come from configuration file
for (auto&[id, host, port] : config)
{
//auto& client = clients.push_back(Client{id, {}});//This is failing (typo error with return value detected by Sehe!!!)
auto& client = clients.emplace_back(id, {});//This is failing
client.sck.connect(host, port);
}
while (true)
{
for (auto&[id, client] : clients)
client.poll();
using namespace std::chrono_literals;
std::this_thread::sleep_for(100ms);
}
}
The program is not compiling, due to an error with copying io_context/socket under my understanding, but I may be wrong in this point.
How can I fix this? And therefore, is there any better alternative to which I am doing? For example, it should be a better approach to make some tcp socket pool into the client class and use the same io_context for all them?
CodePudding user response:
push_back
doesn't return a value (return type is void
). If you have c 17, emplace_back
can be used like that:
auto& client = clients.emplace_back(Client{id, {}});
But vector can reallocate, which necessitates moving or copying all elements. Since client
isn't copyable nor movable, that can't work. And that's only good, because otherwise the async_ operations would run into UB when the vector was reallocated.
Consider deque
or list
which afford reference stability (meaning elements don't reallocate, or in fewer cases). std::list
is the safer of the two here:
std::list<Client> clients;
This gets you somewhere. However I'd note a few things:
- it's not efficient to create separate IO services for each client
- manually polling them is not typical
- you had
host
andport
members that were never used bytes_received
was being overwrittenwrite_some
doesn't guarantee the whole buffer will be written- you're mixing async and sync operations (
async_read
vswrite_some
). This is not always a good idea. I think fortcp::socket
this will be fine in the given use case, but don't expect IO objects to support this in general - There's no reason to supply the array length for
boost::asio::buffer
- it will be deduced. Even better to usestd::array
instead of C style array - I see your
#include <thread>
; if you intend to run on multiple threads, be aware of strands: Why do I need strand per connection when using boost::asio?
Here's a simplified, fixed version with the above:
#include <boost/asio.hpp>
#include <chrono>
#include <fstream>
#include <iostream>
#include <list>
using namespace std::chrono_literals;
namespace tcpsocket {
using boost::asio::ip::tcp;
using boost::system::error_code;
class client {
public:
client(boost::asio::any_io_executor ex) : socket_(ex) {}
size_t bytes_received() const { return bytes_received_; }
void connect(const std::string& host, const std::string& port) {
post(socket_.get_executor(), [=, this] { do_connect(host, port); });
}
void write(std::string msg) {
post(socket_.get_executor(), [=, this] { do_write(msg); });
}
void read() {
post(socket_.get_executor(), [=, this] { do_read(); });
}
private:
void do_connect(const std::string& host, const std::string& port) {
if (host.empty() || port.empty())
return;
tcp::resolver resolver{socket_.get_executor()};
async_connect(socket_, resolver.resolve(host, port),
[this](error_code ec, tcp::endpoint /*endpoint*/) {
if (!ec)
do_read();
else
std::cerr << ec.message() << std::endl;
});
}
void do_write(const std::string& msg) {
error_code ec;
boost::asio::write(socket_, boost::asio::buffer(msg), ec);
if (ec) {
std::cerr << "Closing (" << ec.message() << ")" << std::endl;
return socket_.close();
}
}
void do_read() {
socket_.async_read_some( //
boost::asio::buffer(data),
[this](error_code ec, std::size_t bytes) {
if (ec)
return socket_.close();
bytes_received_ = bytes;
do_read();
});
}
std::atomic_size_t bytes_received_{0};
std::array<unsigned char, 512> data;
tcp::socket socket_;
};
} // namespace tcpsocket
struct Cfg {
unsigned id{};
std::string host;
std::string port;
};
struct Client {
Client(unsigned id, boost::asio::any_io_executor ex) : id_(id), impl_(ex) {}
unsigned id_;
tcpsocket::client impl_;
};
int main()
{
boost::asio::io_context ioc;
std::list<Client> clients;
std::vector<Cfg> const config{
{125u, "127.0.0.1", "30000"},
{137u, "127.0.0.1", "30001"},
{149u, "127.0.0.1", "30002"},
{161u, "127.0.0.1", "30003"},
{173u, "127.0.0.1", "30004"},
{185u, "127.0.0.1", "30005"},
{197u, "127.0.0.1", "30006"},
{209u, "127.0.0.1", "30007"},
{221u, "127.0.0.1", "30008"},
{233u, "127.0.0.1", "30009"},
{245u, "127.0.0.1", "30010"},
};
for (auto& [id, host, port] : config) {
auto& c = clients.emplace_back(id, make_strand(ioc));
c.impl_.connect(host, port);
c.impl_.write(std::to_string(id) " connected to " host ":" port "\n");
}
ioc.run_for(150ms);
for (auto& [id, impl]: clients)
std::cout << id << " received " << impl.bytes_received() << "\n";
}
Prints
(for a in {30000..30010}; do netcat -tlp $a < main.cpp & done)
g -std=c 20 -O2 -Wall -pedantic -pthread main.cpp
./a.out
125 connected to 127.0.0.1:30000
149 connected to 127.0.0.1:30002
161 connected to 127.0.0.1:30003
185 connected to 127.0.0.1:30005
197 connected to 127.0.0.1:30006
209 connected to 127.0.0.1:30007
221 connected to 127.0.0.1:30008
233 connected to 127.0.0.1:30009
173 connected to 127.0.0.1:30004
245 connected to 127.0.0.1:30010
137 connected to 127.0.0.1:30001
125 received 3386
137 received 3386
149 received 3386
161 received 3386
173 received 3386
185 received 3386
197 received 3386
209 received 3386
221 received 3386
233 received 3386
245 received 3386
Other Notes
Read operations form a loop (implicit strand).
Note that it is still your responsibility to ensure no write operations overlap. If necessary, introduce a queue so that you can have multiple messages pending. See e.g. How to safely write to a socket from multiple threads?