Home > Mobile >  Asio How to write a custom AsyncStream?
Asio How to write a custom AsyncStream?

Time:05-02

I have actually managed to write a working AsyncStream. However, I am not really sure if I did it the way it is supposed to be done.

My main question is: Which executor is the get_executor() function supposed to return?

While implementing it several questions arose. I tagged them with Q<index>:. (I will keep the index stable in case of edits.) I would appreciate answers to them.

I tried to shorten/simplify the example as much as possible. It does compile and execute correctly.

#include <iostream>
#include <syncstream>
#include <thread>
#include <coroutine>
#include <future>
#include <random>
#include <string>
#include <memory>

#include <boost/asio.hpp>
#include <boost/asio/experimental/as_tuple.hpp>

#include <fmt/format.h>

inline std::osyncstream tout(const std::string & tag = "") {
  auto hash = std::hash<std::thread::id>{}(std::this_thread::get_id());
  auto hashStr = fmt::format("T{:04X} ", hash >> (sizeof(hash) - 2) * 8); // only display 2 bytes
  auto stream = std::osyncstream(std::cout);

  stream << hashStr;
  if (not tag.empty())
    stream << tag << " ";
  return stream;
}

namespace asio = boost::asio;

template <typename Executor>
requires asio::is_executor<Executor>::value // Q1: Is this the correct way to require that Executor actually is an executor?
                                            // I can't replace typename as there is no concept for Executors.
class Service : public std::enable_shared_from_this<Service<Executor>> {
  template<typename CallerExecutor, typename ServiceExecutor>
  // requires asio::is_executor<CallerExecutor>::value && asio::is_executor<ServiceExecutor>::value
  friend class MyAsyncStream;
  /// Data sent to the service
  std::string bufferIn;
  /// Data produced by the service
  std::string bufferOut;
  /// The strand used to avoid concurrent execution if the passed executor is backed by multiple threads.
  asio::strand<Executor> strand;
  /// Used to slow the data consumption and generation
  asio::steady_timer timer;

  /// Used to generate data
  std::mt19937 gen;
  /// https://stackoverflow.com/a/69753502/4479969
  constexpr static const char charset[] =
    "0123456789"
    "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
    "abcdefghijklmnopqrstuvwxyz";
  template<typename URBG>
  static std::string gen_string(std::size_t length, URBG &&g) {
    std::string result;
    result.resize(length);
    std::sample(std::cbegin(charset),
                std::cend(charset),
                std::begin(result),
                std::intptr_t(length),
                std::forward<URBG>(g));
    return result;
  }

  static const constexpr auto MAX_OPS = 7;

  asio::awaitable<void> main(std::shared_ptr<Service> captured_self) {
    const constexpr auto TAG = "SrvCo";
    auto exe = co_await asio::this_coro::executor;
    auto use_awaitable = asio::bind_executor(exe, asio::use_awaitable);

    for (size_t ops = 0; ops < MAX_OPS; ops  ) {
      timer.expires_after(std::chrono::milliseconds(1000));
      co_await timer.async_wait(use_awaitable);

      tout(TAG) << "Ops " << ops << std::endl;

      bufferOut  = gen_string(8, gen);
      tout(TAG) << "Produced: " << bufferOut << std::endl;

      auto consumed = std::string_view(bufferIn).substr(0, 4);
      tout(TAG) << "Consumed: " << consumed << std::endl;
      bufferIn.erase(0, consumed.size());
    }
    tout(TAG) << "Done" << std::endl;
  }
  std::once_flag initOnce;

public:

  explicit Service(Executor && exe) : strand{asio::make_strand(exe)}, timer{exe.context()} {}

  void init() {
    std::call_once(initOnce, [this]() {
      asio::co_spawn(strand, main(this->shared_from_this()), asio::detached);
    });
  }
};

/// https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/AsyncReadStream.html
template<typename CallerExecutor, typename ServiceExecutor>
// requires asio::is_executor<CallerExecutor>::value && asio::is_executor<ServiceExecutor>::value // Q2: Q1 is working why isn't this working with two Types?
class MyAsyncStream {
  typedef void async_rw_handler(boost::system::error_code, size_t);
  /// Holds the callers executor.
  /// Q3: Should this field even exist?
  CallerExecutor executor;
  /// Use a weak_ptr to behave like a file descriptor.
  std::weak_ptr<Service<ServiceExecutor>> serviceRef;
public:
  explicit MyAsyncStream(std::shared_ptr<Service<ServiceExecutor>> & service, CallerExecutor & exe) : executor{exe}, serviceRef{service} {}

  /// Needed by the stream specification.
  typedef CallerExecutor executor_type;

  /**
   * Q4: Which executor should this function return? The CallerExecutor or the ServiceExecutor or something different.
   *     In this example it is never called. However it is needed by the stream specification. https://www.boost.org/doc/libs/1_79_0/doc/html/boost_asio/reference/AsyncReadStream.html
   * I really don't want to leak the ServiceExecutor to library users.
   * @return Returns the executor supplied in the constructor.
   */
  auto get_executor() {
    tout() << "GETTING EXE" << std::endl;
    return executor;
  }

  template<typename MutableBufferSequence,
    asio::completion_token_for<async_rw_handler>
    CompletionToken = typename asio::default_completion_token<CallerExecutor>::type>
  requires asio::is_mutable_buffer_sequence<MutableBufferSequence>::value
  auto async_read_some(const MutableBufferSequence &buffer,
                       CompletionToken &&token = typename asio::default_completion_token<CallerExecutor>::type()) {
    return asio::async_initiate<CompletionToken, async_rw_handler>([&](auto completion_handler) { // Q5: Can I avoid this async_initiate somehow?
      BOOST_ASIO_READ_HANDLER_CHECK(CompletionToken, completion_handler) type_check;              // I tried using co_spawn directly without success.
      asio::co_spawn(
          asio::get_associated_executor(completion_handler), // Q6-1: should I use get_executor() here? Currently, I just get the callers executor.
          [&, buffer = std::move(buffer), completion_handler = std::forward<CompletionToken>(completion_handler)]
          () mutable -> asio::awaitable<void> {
        const constexpr auto TAG = "ARS";
        auto callerExe = co_await asio::this_coro::executor;
        auto to_caller = asio::bind_executor(callerExe, asio::use_awaitable);

        auto service = serviceRef.lock();
        if (service == nullptr) {
          std::move(completion_handler)(asio::error::bad_descriptor, 0);
          co_return;
        }
        auto to_service = asio::bind_executor(service->strand, asio::use_awaitable);

        co_await asio::post(to_service);

        tout(TAG) << "performing read" << std::endl;

        auto buf_begin = asio::buffers_begin(buffer);
        auto buf_end = asio::buffers_end(buffer);
        boost::system::error_code err = asio::error::fault;
        size_t it = 0;
        while (!service->bufferOut.empty()) {
          if (buf_begin == buf_end) {
            // error the buffer is smaller than the request read amount
            err = asio::error::no_buffer_space;
            goto completion;
          }

          *buf_begin   = service->bufferOut.at(0);
          service->bufferOut.erase(0, 1);
          it  ;
        }
        err = asio::stream_errc::eof;
        completion:
        co_await asio::post(to_caller); // without this call the function returns on the wrong thread
        tout(TAG) << "read done returned" << std::endl;
        std::move(completion_handler)(err, it);
      }, asio::detached);
    }, token);
  }

  template<typename ConstBufferSequence,
    asio::completion_token_for <async_rw_handler>
    CompletionToken = typename asio::default_completion_token<CallerExecutor>::type>
  requires asio::is_const_buffer_sequence<ConstBufferSequence>::value
  auto async_write_some(const ConstBufferSequence &buffer,
                        CompletionToken &&token = typename asio::default_completion_token<CallerExecutor>::type()) {
    return asio::async_initiate<CompletionToken, async_rw_handler>([&](auto completion_handler) {
      BOOST_ASIO_WRITE_HANDLER_CHECK(CompletionToken, completion_handler) type_check;
      asio::co_spawn(
          asio::get_associated_executor(completion_handler), // Q6-2: should I use get_executor() here? Currently, I just get the callers executor.
          [&, buffer = std::move(buffer), completion_handler = std::forward<CompletionToken>(completion_handler)]
          () mutable -> asio::awaitable<void> {
        const constexpr auto TAG = "AWS";
        auto callerExe = co_await asio::this_coro::executor;
        auto to_caller = asio::bind_executor(callerExe, asio::use_awaitable);

        auto service = serviceRef.lock();
        if (service == nullptr) {
          std::move(completion_handler)(asio::error::bad_descriptor, 0);
          co_return;
        }
        auto to_service = asio::bind_executor(service->strand, asio::use_awaitable);

        co_await asio::post(to_service);

        tout(TAG) << "performing write" << std::endl;

        auto buf_begin = asio::buffers_begin(buffer);
        auto buf_end = asio::buffers_end(buffer);
        boost::system::error_code err = asio::error::fault;
        size_t it = 0;
        while (buf_begin != buf_end) {
          service->bufferIn.push_back(static_cast<char>(*buf_begin  ));
          it  ;
        }
        err = asio::stream_errc::eof;
        completion:
        co_await asio::post(to_caller); // without this call the function returns on the wrong thread
        tout(TAG) << "write done returned" << std::endl;
        std::move(completion_handler)(err, it);
      }, asio::detached);
    }, token);
  }
};

asio::awaitable<int> mainCo() {
  const constexpr auto TAG = "MainCo";
  auto exe = co_await asio::this_coro::executor;
  auto use_awaitable = asio::bind_executor(exe, asio::use_awaitable);
  auto as_tuple  = asio::experimental::as_tuple(use_awaitable);
  auto use_future = asio::use_future;
  auto timer = asio::steady_timer(exe);

  asio::thread_pool servicePool{1};

  co_await asio::post(asio::bind_executor(servicePool, asio::use_awaitable));
  tout() << "ServiceThread run start" << std::endl;
  co_await asio::post(use_awaitable);

  auto service = std::make_shared<Service<boost::asio::thread_pool::basic_executor_type<std::allocator<void>, 0> >>(servicePool.get_executor());
  service->init();
  auto stream = MyAsyncStream{service, exe};

  for (size_t it = 0; it < 4; it  ) {
    {
      std::vector<char> dataBackend;
      auto dynBuffer = asio::dynamic_buffer(dataBackend, 50);
      auto [ec, n] = co_await asio::async_read(stream, dynBuffer, as_tuple); // Q7-1: Can I avoid using as_tuple here?

      tout(TAG) << "read done: " << std::endl
                << "n:   " << n  << std::endl
                << "msg: " << std::string{dataBackend.begin(), dataBackend.end()} << std::endl
                << "ec:  " << ec.message()
                << std::endl;
    }

    {
      auto const constexpr str = std::string_view{"HelloW"};
      std::vector<char> dataBackend{str.begin(), str.end()};

      auto dynBuffer = asio::dynamic_buffer(dataBackend, 50);
      auto [ec, n] = co_await asio::async_write(stream, dynBuffer, as_tuple); // Q7-2: Can I avoid using as_tuple here?

      tout(TAG) << "write done: " << std::endl
                << "n:   " << n   << std::endl
                << "msg: " << str << std::endl
                << "ec:  " << ec.message()
                << std::endl;
    }


    timer.expires_after(std::chrono::milliseconds(2500));
    co_await timer.async_wait(use_awaitable);
  }

  servicePool.join();
  tout(TAG) << "Normal exit" << std::endl;
  co_return 0;
}

int main() {
  asio::io_context appCtx;

  auto fut = asio::co_spawn(asio::make_strand(appCtx), mainCo(), asio::use_future);

  tout() << "MainThread run start" << std::endl;
  appCtx.run();
  tout() << "MainThread run done" << std::endl;

  return fut.get();
}

CodePudding user response:

  • Q1

    Looks fine I guess. But, see Q2.

  • Q2

    Looks like it kills CTAD for AsyncStream. If I had to guess it's because ServiceExecutor is in non-deduced context. Helping it manually might help, but note how the second static assert here fails:

      using ServiceExecutor = asio::thread_pool::executor_type;
      using CallerExecutor = asio::any_io_executor;
      static_assert(asio::is_executor<ServiceExecutor>::value);
      static_assert(asio::is_executor<CallerExecutor>::value);
    

    That's because co_await this_coro::executor returns any_io_executor, which is a different "brand" of executor. You need to check with execution::is_executor<T>::value instead. In fact, you might want to throw in a compatibility check as happens in Asio implementation functions:

        (is_executor<Executor>::value || execution::is_executor<Executor>::value)
          && is_convertible<Executor, AwaitableExecutor>::value
    

PS: It dawned on me that the non-deduced context is a symptom of overly-specific template arguments. Just make AsyncStream<Executor, Service> (why bother with the specific type arguments that are implementation details of Service?). That fixes the CTAD (Live On Compiler Explorer)

template <typename CallerExecutor, typename Service>
requires my_is_executor<CallerExecutor>::value //
class MyAsyncStream {
  • Q3: Should this field even exist?

     CallerExecutor executor;
    

    Yes, that's how the IO object remembers its bound executor.

  • Q4: that's the spot where you return that caller executor.

    It's not called in your application, but it might be. If you call any composed operation (like asio::async_read_until) against your IO Object (MyAsyncStream) it will - by default - run any handlers on the associated executor. This may add behaviours (like handler serialization, work tracking etc) that are required for correctness.

    Like ever, the handler can be bound to another executor to override this.

  • Q5 I don't think so, unless you want to mandate use_awaitable (or compatible) completion tokens. The fact that you run a coro inside should be an implementation detail for the caller.

  • Q6 Yes, but not instead off. I'd assume you need to use the IO object's executor as the fallback:

     asio::get_associated_executor(
         completion_handler, this->get_executor())
    
  • Q7-1: Can I avoid using as_tuple here?

     auto [ec, n] = co_await asio::async_read(stream, dynBuffer, as_tuple);
    

    I suppose if you can "just" handle system_error exceptions:

    auto n = co_await asio::async_read(stream, dynBuffer, use_awaitable);
    

    Alternatively, I believe maybe redirect_error is applicable?

  • Related