I have situation where I need to collect data from more than 100 clients in 100 ms. After that time I need to process collected data. When process is done, need to restart step where I am collecting data from the clients and so on in the loop.
To collect the data I am using the current implementation :
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <iostream>
#include <list>
#include <set>
namespace net = boost::asio;
using net::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::placeholders;
struct listener {
using Buffer = std::array<char, 100>; // receiver buffer
udp::socket s;
listener(net::any_io_executor ex, uint16_t port) : s{ex, {{}, port}} {}
void start() {
read_loop(error_code{}, -1); // prime the async pump
}
void stop() {
post(s.get_executor(), [this] { s.cancel(); });
}
void report() const {
std::cout << s.local_endpoint() << ": A total of " << received_packets
<< " were received from " << unique_senders.size()
<< " unique senders\n";
}
private:
Buffer receive_buffer;
udp::endpoint sender;
std::set<udp::endpoint> unique_senders;
size_t received_packets = 0;
void read_loop(error_code ec, size_t bytes) {
if (bytes != size_t(-1)) {
// std::cout << "read_loop (" << ec.message() << ")\n";
if (ec)
return;
received_packets = 1;
unique_senders.insert(sender);
std::cout << "Received:" << bytes << " sender:" << sender << " recorded:"
<< received_packets << "\n";
//std::cout <<
// std::string_view(receive_buffer.data(), bytes) << "\n";
}
s.async_receive_from(net::buffer(receive_buffer), sender,
std::bind_front(&listener::read_loop, this));
};
};
int main() {
net::thread_pool io(1); // single threaded
using Timer = net::steady_timer;
using TimePoint = std::chrono::steady_clock::time_point;
using Clock = std::chrono::steady_clock;
Timer timer_(io);
std::list<listener> listeners;
auto each = [&](auto mf) { for (auto& l : listeners) (l.*mf)(); };
for (uint16_t port : {1234, 1235, 1236})
listeners.emplace_back(io.get_executor(), port);
each(&listener::start);
TimePoint startTP = Clock::now();
timer_.expires_at(startTP 100ms); // collect data for 100 ms
timer_.async_wait([&](auto &&){each(&listener::stop);});
std::cout << "Done ! \n";
each(&listener::report);
io.join();
}
Is it okay approach to stop collecting process ?
TimePoint startTP = Clock::now();
timer_.expires_at(startTP 100ms); // collect data for 100 ms
timer_.async_wait([&](auto &&){each(&listener::stop);});
CodePudding user response:
I'm interpreting this as basically asking how to combine
- the
EDIT In case the output goes too fast to interpret:
0.0.0.0:1234 Waiting 899ms (1587 received) 0.0.0.0:1236 Waiting 899ms (1966 received) 0.0.0.0:1235 Waiting 899ms (1933 received) 0.0.0.0:1235 Waiting 899ms (4054 received) 0.0.0.0:1234 Waiting 899ms (3454 received) 0.0.0.0:1236 Waiting 899ms (4245 received) 0.0.0.0:1236 Waiting 899ms (6581 received) 0.0.0.0:1235 Waiting 899ms (6257 received) 0.0.0.0:1234 Waiting 899ms (5499 received) 0.0.0.0:1235 Waiting 899ms (8535 received) 0.0.0.0:1234 Waiting 899ms (7494 received) 0.0.0.0:1236 Waiting 899ms (8811 received) 0.0.0.0:1236 Waiting 899ms (11048 received) 0.0.0.0:1234 Waiting 899ms (9397 received) 0.0.0.0:1235 Waiting 899ms (10626 received) 0.0.0.0:1234: A total of 9402 were received from 7932 unique senders 0.0.0.0:1235: A total of 10630 were received from 8877 unique senders 0.0.0.0:1236: A total of 11053 were received from 9133 unique senders
If you are sure you are remaining single threaded, you might consider using the same actual timer, at the cost of significantly increasing complexity.