I'm trying to write a very simple client/server app with boost::socket
. I need a server to run and a single client to connect, send data, disconnect and possibly reconnect later and repeat.
The code reduced to the minimum is here:
Server app:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
using boost::asio::ip::tcp;
class TheServer
{
public:
TheServer(int port) : m_port(port)
{
m_pIOService = new boost::asio::io_service;
m_pThread = new boost::thread(boost::bind<void>(&TheServer::run, this));
listenForNewConnection();
}
~TheServer()
{
m_bContinueReading = false;
m_pIOService->stop();
m_pThread->join();
delete m_pThread;
delete m_pSocket;
delete m_pAcceptor;
delete m_pIOService;
}
void listenForNewConnection()
{
if (m_pSocket)
delete m_pSocket;
if (m_pAcceptor)
delete m_pAcceptor;
// start new acceptor operation
m_pSocket = new tcp::socket(*m_pIOService);
m_pAcceptor = new tcp::acceptor(*m_pIOService, tcp::endpoint(tcp::v4(), m_port));
std::cout << "Starting async_accept" << std::endl;
m_pAcceptor->async_accept(*m_pSocket,
boost::bind<void>(&TheServer::readSession, this, boost::asio::placeholders::error));
}
void readSession(boost::system::error_code error)
{
if (!error)
{
std::cout << "Connection established" << std::endl;
while ( m_bContinueReading )
{
static unsigned char buffer[1000];
boost::system::error_code error;
size_t length = m_pSocket->read_some(boost::asio::buffer(&buffer, 1000), error);
if (!error && length != 0)
{
std::cout << "Received " << buffer << std::endl;
}
else
{
std::cout << "Received error, connection likely closed by peer" << std::endl;
break;
}
}
std::cout << "Connection closed" << std::endl;
listenForNewConnection();
}
else
{
std::cout << "Connection error" << std::endl;
}
std::cout << "Ending readSession" << std::endl;
}
void run()
{
while (m_bContinueReading)
m_pIOService->run_one();
std::cout << "Exiting run thread" << std::endl;
}
bool m_bContinueReading = true;
boost::asio::io_service* m_pIOService = NULL;
tcp::socket* m_pSocket = NULL;
tcp::acceptor* m_pAcceptor = NULL;
boost::thread* m_pThread = NULL;
int m_port;
};
int main(int argc, char* argv[])
{
TheServer* server = new TheServer(1900);
std::cout << "Press Enter to quit" << std::endl;
std::string sGot;
getline(std::cin, sGot);
delete server;
return 0;
}
Client app:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
int main(int argc, char* argv[])
{
std::cout << std::endl;
std::cout << "Starting client" << std::endl;
using boost::asio::ip::tcp;
boost::asio::io_service* m_pIOService = NULL;
tcp::socket* m_pSocket = NULL;
try
{
m_pIOService = new boost::asio::io_service;
std::stringstream sPort;
sPort << 1900;
tcp::resolver resolver(*m_pIOService);
tcp::resolver::query query(tcp::v4(), "localhost", sPort.str());
tcp::resolver::iterator iterator = resolver.resolve(query);
m_pSocket = new tcp::socket(*m_pIOService);
m_pSocket->connect(*iterator);
std::cout << "Client conected" << std::endl;
std::string hello = "Hello World";
boost::asio::write( *m_pSocket, boost::asio::buffer(hello.data(), hello.size()) );
boost::this_thread::sleep(boost::posix_time::milliseconds(100));
hello = "(2)";
boost::asio::write(*m_pSocket, boost::asio::buffer(hello.data(), hello.size()));
}
catch (std::exception& e)
{
delete m_pSocket;
m_pSocket = NULL;
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
Note that I use non-blocking async_accept to be able to cleanly stop the server when Enter is pressed.
Under Windows, it works perfectly fine, I run the server, it outputs:
Starting async_accept
Press Enter to quit
For each client app run, it outpts:
Starting client
Client conected
and server app outputs:
Connection established
Received Hello World
Received Hello World(2)
Received error, connection likely closed by peer
Connection closed
Starting async_accept
Ending readSession
Then when I press Enter in server app console, it outputs Exiting run thread
and cleanly stops.
Now, when I compile this same code under Linux, the client outputs the same as under Windows, but nothing happens on the server side...
Any idea what's wrong?
CodePudding user response:
There are many questionable elements.
There is a classical data race on
m_bContinueReading
. You write from another thread, but the other thread may never see the change because of the data race.The second race condition is likely your problem:
m_pThread = new boost::thread(boost::bind<void>(&TheServer::run, this)); listenForNewConnection();
Here the
run
thread may complete before you ever post the first work. You can use a work-guard to prevent this. In your specific code you would already fix it by reordering the lines:listenForNewConnection(); m_pThread = new boost::thread(boost::bind<void>(&TheServer::run, this));
I would not do this, because I would not have those statements in my constructor body. See below for the work guard solution
There is a lot of raw pointer handling and new/delete going on, which merely invites errors.
You use the
buffer
assuming that it is NUL-terminated. This is especially unwarranted because you useread_some
which will read partial messages as they arrive on the wire.You use a
static
buffer while the code may have different instances of the class. This is very false optimization. Instead, prevent all the allocations! Combining with the previous item:while (m_bContinueReading) { char buffer[1000]; size_t length = m_Socket.read_some(asio::buffer(&buffer, 1000), ec); std::cout << "Received " << length << " (" << quoted(std::string(buffer, length)) << "), " << ec.message() << std::endl; if (ec.failed()) break; }
You start a new acceptor always, where there is no need: a single acceptor can accept as many connections as you wish. In fact, the method shown runs into the problems
that lingering connections can prevent the new acceptor from binding to the same port. You could also alleviate that with
m_Acceptor.set_option(tcp::acceptor::reuse_address(true));
the destroyed acceptor may have backlogged connections, which are discarded
Typically you want to support concurrent connection, so you can split of a "readSession" and immediately accept the next connection. Now, strangely your code seems to expect clients to be connected until the server is prompted to shutdown (from the console) but after that you somehow start listening to new connections (even though you know the service will be stopping, and
m_bContinueReading
will remainfalse
).In the grand scheme of things, you don't want to destroy the acceptor unless something invalidated it. In practice this is rare (e.g. on Linux the acceptor will happily survive disabling/re-enabling the network adaptor).
you have spurious explicit template arguments (
bind<void>
). This is an anti-pattern and may lead to subtle problemssimilar with the buffer (just say
asio::buffer(buffer)
and no longer have correctness concerns. In fact, don't use C-style arrays:std::array<char, 1000> buffer; size_t n = m_Socket.read_some(asio::buffer(buffer), ec); std::cout << "Received " << n << " " << quoted(std::string(buffer.data(), n)) << " (" << ec.message() << ")" << std::endl;
Instead of running a manual
run_one()
loop (where you forget to handle exceptions), consider "just" letting the servicerun()
. Then you can.cancel()
the acceptor to let the service run out of work.In fact, this subtlety isn't required in your code, since your code already forces "ungraceful" shutdown anyways:
m_IOService.stop(); // nuclear option m_Thread.join();
More gentle would be e.g.
m_Acceptor.cancel(); m_Socket.cancel(); m_Thread.join();
In which case you can respond to the completion
error_code == error::operation_aborted
to stop the session/accept loop.Technically, you may be able to do away with the boolean flag altogether. I keep it because it allows us to handle multiple session-per-thread in "fire-and-forget" manner.
In the client you have many of the same problems, and also a gotcha where you only look at the first resolver result (assuming there was one), ignoring the rest. You can use
asio::connect
instead ofm_Socket.connect
to try all resolved entries
Addressing the majority of these issues, simplifying the code:
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/optional.hpp>
#include <iomanip>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;
using namespace std::chrono_literals;
using boost::system::error_code;
class TheServer {
public:
TheServer(int port) : m_port(port) {
m_Acceptor.set_option(tcp::acceptor::reuse_address(true));
do_accept();
}
~TheServer() {
m_shutdownRequested = true;
m_Work.reset(); // release the work-guard
m_Acceptor.cancel();
m_Thread.join();
}
private:
void do_accept() {
std::cout << "Starting async_accept" << std::endl;
m_Acceptor.async_accept( //
m_Socket, boost::bind(&TheServer::on_accept, this, asio::placeholders::error));
}
void on_accept(error_code ec) {
if (!ec) {
std::cout << "Connection established " << m_Socket.remote_endpoint() << std::endl;
// leave session running in the background:
std::thread(&TheServer::read_session_thread, this, std::move(m_Socket)).detach();
do_accept(); // and immediately accept new connection(s)
} else {
std::cout << "Connection error (" << ec.message() << ")" << std::endl;
std::cout << "Ending readSession" << std::endl;
}
}
void read_session_thread(tcp::socket sock) {
std::array<char, 1000> buffer;
for (error_code ec;;) {
size_t n = sock.read_some(asio::buffer(buffer), ec);
std::cout << "Received " << n << " " << quoted(std::string(buffer.data(), n)) << " ("
<< ec.message() << ")" << std::endl;
if (ec.failed() || m_shutdownRequested)
break;
}
std::cout << "Connection closed" << std::endl;
}
void thread_func() {
// http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
for (;;) {
try {
m_IOService.run();
break; // exited normally
} catch (std::exception const& e) {
std::cerr << "[eventloop] error: " << e.what();
} catch (...) {
std::cerr << "[eventloop] unexpected error";
}
}
std::cout << "Exiting service thread" << std::endl;
}
std::atomic_bool m_shutdownRequested{false};
uint16_t m_port;
asio::io_service m_IOService;
boost::optional<asio::io_service::work> m_Work{m_IOService};
tcp::socket m_Socket{m_IOService};
tcp::acceptor m_Acceptor{m_IOService, tcp::endpoint{tcp::v4(), m_port}};
std::thread m_Thread{boost::bind(&TheServer::thread_func, this)};
};
constexpr uint16_t s_port = 1900;
void run_server() {
TheServer server(s_port);
std::cout << "Press Enter to quit" << std::endl;
std::cin.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
}
void run_client() {
std::cout << std::endl;
std::cout << "Starting client" << std::endl;
using asio::ip::tcp;
try {
asio::io_service m_IOService;
tcp::resolver resolver(m_IOService);
auto iterator = resolver.resolve("localhost", std::to_string(s_port));
tcp::socket m_Socket(m_IOService);
connect(m_Socket, iterator);
std::cout << "Client connected" << std::endl;
std::string hello = "Hello World";
write(m_Socket, asio::buffer(hello));
std::this_thread::sleep_for(100ms);
hello = "(2)";
write(m_Socket, asio::buffer(hello));
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
}
int main(int argc, char**) {
if (argc>1)
run_server();
else
run_client();
}