I need to call method which is a request for a remote server. After that i want to wait for an answer, and waiting is not blocked by other asynchronous function/objects(timers for example).
Method got_response(...) tells user that he got an answer from remote server, also method gets entry data which we got as an answer. Below I got my solution, but sometimes timer can be called in single thread, which will lead to method got_response() hanging.
How can I call timer to be guaranteed in other thread for answer simulation. Is there any other solution to my problem?
#include <iostream>
#include <boost/asio.hpp>
#include <future>
#include <thread>
#include <vector>
using namespace std;
namespace io = boost::asio;
struct Reply
{
atomic<bool> ready;
atomic<int> result;
future<void> future_result;
Reply()
{
ready = false;
result = 0;
}
void call()
{
cout << "retry called!" << endl;
future_result = async([&]()
{
while (!ready)
{
this_thread::yield();
}
});
}
int get()
{
future_result.wait();
return result.load();
}
void got_response(int res)
{
result = res;
ready = true;
}
};
int main()
{
Reply reply;
reply.call();
io::io_context context(4);
io::steady_timer timer1(context, std::chrono::seconds(2));
timer1.async_wait([&](const boost::system::error_code &ec)
{ cout << "timer 1, thread name: " << this_thread::get_id() << endl; });
io::steady_timer timer2(context, std::chrono::seconds(3));
timer2.async_wait([&](const boost::system::error_code &ec)
{
cout << "timer 2, thread name: " << this_thread::get_id() << endl;
cout << reply.get() << endl;
});
io::steady_timer timer3(context, std::chrono::seconds(10));
timer3.async_wait([&](const boost::system::error_code &ec)
{
cout << "timer 3, thread name: " << this_thread::get_id() << endl;
reply.got_response(1337);
});
vector<thread> threads;
auto count = 2;
for (int n = 0; n < count; n)
{
threads.emplace_back([&]
{ context.run(); });
}
for (auto &th : threads)
{
th.join();
}
}
Result:
retry called!
timer 1, thread name: 140712511198784
timer 2, thread name: 140712519591488
timer 3, thread name: 140712511198784
1337
CodePudding user response:
Wow. This overcomplicating on several levels.
futures can have typed return values (that's actually the whole point of a future over synchronization primitives)
the fturue can signal the readiness with a value, no need to duplicate the readiness into a
bool
and then copy the result somewherethis has me confused:
int get() { _fut.wait(); return _result.load(); }
It awaits the future, then returns the
_result
, you know what you invented_ready
for?Do you realize that
std::async
is not part of Boost ASIO? In fact, it doesn't work well with it because, as you correctly notice, it introduces (unspecified numbers of) threads. In general my advice is not to usestd::async
(it's hard to use correctly) and certainly never when using ASIOWhen you see the same variables name var1, var2, var3 it's time to refactor your code (into functions or classes if it includes data members):
std::deque<io::steady_timer> timers; for (int i = 1; i <= 3; i) { auto& timer = timers.emplace_back(context, std::chrono::seconds(1 i)); timer.async_wait([i](error_code ec) { std::cout << "timer " << i << ", thread name: " << std::this_thread::get_id() << std::endl; }); }
instead of a vector of threads, consider
boost::thread_group
or indeedboost::asio::thread_pool
.If you manually run the IO threads, remember to handle exceptions (Should the exception thrown by boost::asio::io_service::run() be caught?), so
boost::thread_group threads; for (int n = 0; n < 2; n) { threads.create_thread([&] { context.run(); }); } threads.join_all();
Or indeed
io::thread_pool context(2); context.join();
This is very inefficient
while (!_ready) { std::this_thread::yield(); }
Just set the future value to signify it's ready:
using namespace std
is generally not a good idea (Why is "using namespace std;" considered bad practice?)
Demo
Here's my expanded but simplified take on the question code:
#include <boost/asio.hpp>
#include <deque>
#include <future>
#include <iostream>
#include <thread>
namespace io = boost::asio;
using namespace std::chrono_literals;
using boost::system::error_code;
// not very useful in practice, but for debug output in main
std::ostream& debug(error_code);
template <typename Fut> bool is_ready(Fut const& fut) {
return fut.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
}
int main() {
std::promise<int> reply;
std::shared_future got_value = reply.get_future();
io::thread_pool context(2);
std::deque<io::steady_timer> timers;
for (int i = 1; i <= 10; i) {
timers //
.emplace_back(context, i * 1s)
.async_wait([&got_value](error_code ec) {
if (is_ready(got_value))
debug(ec) << " Reply:" << got_value.get() << std::endl;
else
debug(ec) << " (reply not ready)" << std::endl;
});
}
timers //
.emplace_back(context, 4'500ms)
.async_wait([&reply](error_code ec) {
debug(ec) << " setting value" << std::endl;
reply.set_value(1337);
});
context.join();
}
int friendly_thread_id() {
return std::hash<std::thread::id>{}(std::this_thread::get_id()) % 256;
}
#include <iomanip>
std::ostream& debug(error_code ec) {
auto now = std::chrono::system_clock::now;
static auto program_start = now();
return std::cout //
<< ((now() - program_start) / 1ms) << "ms\t"
<< "thread:" << std::hex << std::setfill('0') << std::showbase
<< std::setw(2) << friendly_thread_id() << std::dec << " ";
}
#include <iomanip>
std::ostream& debug(error_code ec) {
auto now = std::chrono::system_clock::now;
static auto program_start = now();
return std::cout //
<< ((now() - program_start) / 1ms) << "ms\t"
<< "thread:" << std::hex << std::setfill('0') << std::showbase
<< std::setw(2) << pretty_thread_id() << std::dec << " ";
}
Prints
0ms thread:0x5f (reply not ready)
999ms thread:0xf3 (reply not ready)
1999ms thread:0x5f (reply not ready)
2999ms thread:0x5f (reply not ready)
3499ms thread:0xf3 setting value
3999ms thread:0x5f Reply:1337
4999ms thread:0xf3 Reply:1337
5999ms thread:0xf3 Reply:1337
6999ms thread:0xf3 Reply:1337
7999ms thread:0xf3 Reply:1337
8999ms thread:0xf3 Reply:1337