Home > Software design >  How to batch send unsent messages in asio
How to batch send unsent messages in asio

Time:02-17

There is an example in asio, which caches the sent messages in a deque. I think when there are too many unsent messages in this deque, such as 1000, I want to process it through constbuffersequence, that is, batch sending, so the following How should the code be changed, thank you!

   void deliver(const chat_message& msg)
   {
     bool write_in_progress = !write_msgs_.empty();
     write_msgs_.push_back(msg);
     if (!write_in_progress)
     {   
       boost::asio::async_write(socket_,
           boost::asio::buffer(write_msgs_.front().data(),
             write_msgs_.front().length()),
           boost::bind(&chat_session::handle_write, shared_from_this(),
             boost::asio::placeholders::error));
     }   
   }
 
   void handle_write(const boost::system::error_code& error)
   {
     if (!error)
     {   
       write_msgs_.pop_front();
       if (!write_msgs_.empty())
       {   
         boost::asio::async_write(socket_,
             boost::asio::buffer(write_msgs_.front().data(),
               write_msgs_.front().length()),
             boost::bind(&chat_session::handle_write, shared_from_this(),
               boost::asio::placeholders::error));
       }   
     }   
     else
     {   
       room_.leave(shared_from_this());
     }   
   }

CodePudding user response:

You can transform the deque to any container modeling the const buffer sequence concept:

std::vector<asio::const_buffer> buffers;
std::transform(
    begin(write_msgs_), end(write_msgs_), back_inserter(buffers),
    [](Message const& s) { return asio::buffer(s); });

async_write( //
    socket_, buffers,
    [this, self = shared_from_this()] //
    (error_code ec, std::size_t bytes_written) {

          // ...
          write_msgs_.clear();
    });

The transform is a force of habit here, you might prefer

std::vector<asio::const_buffer> buffers;
for (auto& s: write_msgs_)
    buffers.push_back(asio::buffer(s));

Live Demo

Modified from this recent example How to safely write to a socket from multiple threads?:

Live On Coliru

#include <boost/asio.hpp>
#include <deque>
#include <iostream>

namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;
using Message = std::string;

class chat_session : public std::enable_shared_from_this<chat_session> {
  public:
    chat_session(tcp::socket socket) : socket_(std::move(socket)) {}

    void start() { do_read(); }

    void deliver_many(std::vector<Message> msgs) {
        post(socket_.get_executor(),
             [this, msgs = std::move(msgs), self = shared_from_this()] //
             () mutable {
                 for (auto& msg : msgs) {
                     do_write(std::move(msg));
                 }
             });
    }

    void deliver(Message msg) {
        post(socket_.get_executor(),
             [this, msg = std::move(msg), self = shared_from_this()] //
             () mutable { do_write(std::move(msg)); });
    }

  private:
    void do_read() {
        async_read_until(
            socket_, asio::dynamic_buffer(incoming_), '\0',
            [this, self = shared_from_this()] //
            (error_code ec, std::size_t length) {
                if (!ec) {
                    process_message(incoming_.substr(0, length - 1));
                    incoming_.erase(0, length);

                    do_read();
                } else if (ec != asio::error::eof) {
                    std::cerr << "Read error: " << ec.message() << std::endl;
                }
            });
    }

    void do_write(Message message)
    {
        write_msgs_.push_back(std::move(message)); // assumed on (implicit) strand
        if (write_msgs_.size() == 1) {
            write_loop();
        }
    }

    void write_loop() {
        std::cerr << "write_loop with write_msgs_.size() = " << write_msgs_.size() << std::endl;
        if (write_msgs_.empty())
            return;

        if (write_msgs_.size() > 100) {
            std::vector<asio::const_buffer> buffers;
            std::transform(
                begin(write_msgs_), end(write_msgs_), back_inserter(buffers),
                [](Message const& s) { return asio::buffer(s); });

            async_write( //
                socket_, buffers,
                [this, self = shared_from_this()] //
                (error_code ec, std::size_t /*length*/) {
                    if (!ec) {
                        write_msgs_.clear();
                        write_loop();
                    } else if (ec != asio::error::eof) {
                        std::cerr << "Write error: " << ec.message() << std::endl;
                    }
                });
        } else {
            async_write( //
                socket_, asio::buffer(write_msgs_.front()),
                [this, self = shared_from_this()] //
                (error_code ec, std::size_t /*length*/) {
                    if (!ec) {
                        write_msgs_.pop_front();
                        write_loop();
                    } else if (ec != asio::error::eof) {
                        std::cerr << "Write error: " << ec.message() << std::endl;
                    }
                });
        }
    }

    void process_message(Message const& message) {
        std::vector<Message> responses;
        for (int i = 0; i < 200;   i) {
            responses.push_back("Response #"   std::to_string(i)   " for "  
                                message   "\n");
        }

        // dispatch/post to executor because we might be on a different thread (not in this example)
        // (not in this example)
        post(socket_.get_executor(),
             std::bind(&chat_session::deliver_many, shared_from_this(),
                       std::move(responses)));
    }

    tcp::socket         socket_;
    Message             incoming_;
    std::deque<Message> write_msgs_;
};

class server {
  public:
    server(asio::any_io_executor ex, unsigned short port)
        : acceptor_(ex, tcp::endpoint(tcp::v4(), port))
    {
        do_accept();
    }

  private:
    void do_accept()
    {
        acceptor_.async_accept(
            make_strand(acceptor_.get_executor()),
            [this](error_code ec, tcp::socket&& s) {
                if (!ec) {
                    std::cout << "Accepted " << s.remote_endpoint() << std::endl;
                    std::make_shared<chat_session>(std::move(s))->start();
                }

                do_accept();
            });
    }

    tcp::acceptor acceptor_;
};

int main() {
    asio::thread_pool ctx;
    server s(ctx.get_executor(), 8989);
    ctx.join();
 }

When sending a single message from a client:

g   -std=c  20 -O2 -Wall -pedantic -pthread main.cpp 
./a.out&
sleep .5; printf 'HelloWorld\0' | nc 127.0.0.1 8989 -w1

shows e.g.:

Accepted 127.0.0.1:39538
write_loop with write_msgs_.size() = 1
Response #0 for HelloWorld
write_loop with write_msgs_.size() = 199
Response #1 for HelloWorld
Response #2 for HelloWorld
Response #3 for HelloWorld
Response #4 for HelloWorld
Response #5 for HelloWorld
Response #6 for HelloWorld
Response #7 for HelloWorld
Response #8 for HelloWorld
Response #9 for HelloWorld
Response #10 for HelloWorld
Response #11 for HelloWorld
Response #12 for HelloWorld
Response #13 for HelloWorld
Response #14 for HelloWorld
Response #15 for HelloWorld
Response #16 for HelloWorld
Response #17 for HelloWorld
Response #18 for HelloWorld
Response #19 for HelloWorld
Response #20 for HelloWorld
Response #21 for HelloWorld
Response #22 for HelloWorld
Response #23 for HelloWorld
Response #24 for HelloWorld
Response #25 for HelloWorld
Response #26 for HelloWorld
Response #27 for HelloWorld
Response #28 for HelloWorld
Response #29 for HelloWorld
Response #30 for HelloWorld
Response #31 for HelloWorld
Response #32 for HelloWorld
Response #33 for HelloWorld
Response #34 for HelloWorld
Response #35 for HelloWorld
Response #36 for HelloWorld
Response #37 for HelloWorld
Response #38 for HelloWorld
Response #39 for HelloWorld
Response #40 for HelloWorld
Response #41 for HelloWorld
Response #42 for HelloWorld
Response #43 for HelloWorld
Response #44 for HelloWorld
Response #45 for HelloWorld
Response #46 for HelloWorld
Response #47 for HelloWorld
Response #48 for HelloWorld
Response #49 for HelloWorld
Response #50 for HelloWorld
Response #51 for HelloWorld
Response #52 for HelloWorld
Response #53 for HelloWorld
Response #54 for HelloWorld
Response #55 for HelloWorld
Response #56 for HelloWorld
Response #57 for HelloWorld
Response #58 for HelloWorld
Response #59 for HelloWorld
Response #60 for HelloWorld
Response #61 for HelloWorld
Response #62 for HelloWorld
Response #63 for HelloWorld
Response #64 for HelloWorld
Response #65 for HelloWorld
Response #66 for HelloWorld
Response #67 for HelloWorld
Response #68 for HelloWorld
Response #69 for HelloWorld
Response #70 for HelloWorld
Response #71 for HelloWorld
Response #72 for HelloWorld
Response #73 for HelloWorld
Response #74 for HelloWorld
Response #75 for HelloWorld
Response #76 for HelloWorld
Response #77 for HelloWorld
Response #78 for HelloWorld
Response #79 for HelloWorld
Response #80 for HelloWorld
Response #81 for HelloWorld
Response #82 for HelloWorld
Response #83 for HelloWorld
Response #84 for HelloWorld
Response #85 for HelloWorld
Response #86 for HelloWorld
Response #87 for HelloWorld
Response #88 for HelloWorld
Response #89 for HelloWorld
Response #90 for HelloWorld
Response #91 for HelloWorld
Response #92 for HelloWorld
Response #93 for HelloWorld
Response #94 for HelloWorld
Response #95 for HelloWorld
Response #96 for HelloWorld
Response #97 for HelloWorld
Response #98 for HelloWorld
Response #99 for HelloWorld
Response #100 for HelloWorld
Response #101 for HelloWorld
Response #102 for HelloWorld
Response #103 for HelloWorld
Response #104 for HelloWorld
Response #105 for HelloWorld
Response #106 for HelloWorld
Response #107 for HelloWorld
Response #108 for HelloWorld
Response #109 for HelloWorld
Response #110 for HelloWorld
Response #111 for HelloWorld
Response #112 for HelloWorld
Response #113 for HelloWorld
Response #114 for HelloWorld
Response #115 for HelloWorld
Response #116 for HelloWorld
Response #117 for HelloWorld
Response #118 for HelloWorld
Response #119 for HelloWorld
Response #120 for HelloWorld
Response #121 for HelloWorld
Response #122 for HelloWorld
Response #123 for HelloWorld
Response #124 for HelloWorld
Response #125 for HelloWorld
Response #126 for HelloWorld
Response #127 for HelloWorld
Response #128 for HelloWorld
Response #129 for HelloWorld
Response #130 for HelloWorld
Response #131 for HelloWorld
Response #132 for HelloWorld
Response #133 for HelloWorld
Response #134 for HelloWorld
Response #135 for HelloWorld
Response #136 for HelloWorld
Response #137 for HelloWorld
Response #138 for HelloWorld
Response #139 for HelloWorld
Response #140 for HelloWorld
Response #141 for HelloWorld
Response #142 for HelloWorld
Response #143 for HelloWorld
Response #144 for HelloWorld
Response #145 for HelloWorld
Response #146 for HelloWorld
Response #147 for HelloWorld
Response #148 for HelloWorld
Response #149 for HelloWorld
Response #150 for HelloWorld
Response #151 for HelloWorld
Response #152 for HelloWorld
Response #153 for HelloWorld
Response #154 for HelloWorld
Response #155 for HelloWorld
Response #156 for HelloWorld
Response #157 for HelloWorld
Response #158 for HelloWorld
Response #159 for HelloWorld
Response #160 for HelloWorld
Response #161 for HelloWorld
Response #162 for HelloWorld
Response #163 for HelloWorld
Response #164 for HelloWorld
Response #165 for HelloWorld
Response #166 for HelloWorld
Response #167 for HelloWorld
Response #168 for HelloWorld
Response #169 for HelloWorld
Response #170 for HelloWorld
Response #171 for HelloWorld
Response #172 for HelloWorld
Response #173 for HelloWorld
Response #174 for HelloWorld
Response #175 for HelloWorld
Response #176 for HelloWorld
Response #177 for HelloWorld
Response #178 for HelloWorld
Response #179 for HelloWorld
Response #180 for HelloWorld
Response #181 for HelloWorld
Response #182 for HelloWorld
Response #183 for HelloWorld
Response #184 for HelloWorld
Response #185 for HelloWorld
Response #186 for HelloWorld
Response #187 for HelloWorld
Response #188 for HelloWorld
Response #189 for HelloWorld
Response #190 for HelloWorld
Response #191 for HelloWorld
Response #192 for HelloWorld
Response #193 for HelloWorld
Response #194 for HelloWorld
Response #195 for HelloWorld
Response #196 for HelloWorld
Response #197 for HelloWorld
Response #198 for HelloWorld
Response #199 for HelloWorld
write_loop with write_msgs_.size() = 0
  • Related