Home > Blockchain >  How to safely write to a socket from multiple threads?
How to safely write to a socket from multiple threads?

Time:02-15

I'm using asio (non-boost) to create a TCP server and while my code works it's not done properly because I'm calling asio::async_write from multiple threads. I think I should use strands but the more I read about that the more lost I am.

#include <cstdlib>
#include <iostream>
#include <utility>
#include <thread>
#include <asio/ts/buffer.hpp>
#include <asio/ts/internet.hpp>
#include "messages.h"

using asio::ip::tcp;

class session
    : public std::enable_shared_from_this<session>
{
public:
    session(tcp::socket socket)
        : socket_(std::move(socket))
    {
    }

    void start()
    {
        handler = MessageHandler();
        asio::write(socket_, asio::buffer(handler.initialMessage()));
        do_read();
    }

private:
    void do_read()
    {
        auto self(shared_from_this());
        socket_.async_read_some(asio::buffer(data_, max_length),
            [this, self](std::error_code ec, std::size_t length)
            {
                if (!ec)
                {
                    buffer_.append(data_, length);
                    size_t pos;
                    while ((pos = buffer_.find('\0')) != std::string::npos)
                    {
                        std::string message = buffer_.substr(0, pos);
                        buffer_.erase(0, pos   1);

                        std::thread(&session::process_message, this, message).detach();
                    }

                    do_read();
                }
                else if (ec != asio::error::eof)
                {
                    std::cerr << "Read error: " << ec.message() << '\n';
                }
            });
    }

    void do_write(std::string message)
    {
        auto self(shared_from_this());
        asio::async_write(socket_, asio::buffer(message),
            [this, self](std::error_code ec, std::size_t /*length*/)
            {
                if (!ec)
                {
                }
                else if (ec != asio::error::eof)
                {
                    std::cerr << "Write error: " << ec.message() << '\n';
                }
            });
    }

    void process_message(std::string message) {
        std::string response = handler.processMessage(message);
        do_write(response);
    }

    tcp::socket socket_;
    enum { max_length = 1024 };
    char data_[max_length];
    std::string buffer_;
    MessageHandler handler;
};

class server
{
public:
    server(asio::io_context& io_context, unsigned short port)
        : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
        socket_(io_context)
    {
        do_accept();
    }

private:
    void do_accept()
    {
        acceptor_.async_accept(socket_,
            [this](std::error_code ec)
            {
                if (!ec)
                {
                    std::make_shared<session>(std::move(socket_))->start();
                }

                do_accept();
            });
    }

    tcp::acceptor acceptor_;
    tcp::socket socket_;
};

void serverInit()
{
    try
    {
        asio::io_context io_context;

        server s(io_context, 0);

        io_context.run();
    }
    catch (std::exception& e)
    {
        std::cerr << "Exception: " << e.what() << '\n';
    }
}

CodePudding user response:

You only have 1 thread running the IO service. Everything is on an implicit strand (enter image description here

  • Related