Home > Mobile >  asio How to change the executor inside an awaitable
asio How to change the executor inside an awaitable

Time:04-25

I have read this question and tried to replicate the answer with the following code:

#include <iostream>
#include <syncstream>
#include <thread>
#include <coroutine>
#include <boost/asio.hpp>
#include <boost/asio/experimental/as_single.hpp>
#include <boost/bind/bind.hpp>
#include <boost/thread/thread.hpp>

inline std::osyncstream tout() {
  auto hash = std::hash<std::thread::id>{}(std::this_thread::get_id());
  return std::osyncstream(std::cout) << "T" << hash << " ";
}

namespace asio = boost::asio;

asio::awaitable<void> mainCo(asio::io_context &appIO, asio::io_context &prodIO) {
  // the thread should also change when using the IO contexts directly.
  auto astrand = asio::io_context::strand{appIO};
  auto pstrand = asio::io_context::strand{prodIO};

  tout() << "MC on APPIO" << std::endl;
  co_await asio::post(pstrand, asio::use_awaitable);
  tout() << "MC on PRODIO" << std::endl;
  co_await asio::post(astrand, asio::use_awaitable);
  tout() << "MC on APPIO" << std::endl;
  co_await asio::post(pstrand, asio::use_awaitable);
  tout() << "MC on PRODIO" << std::endl;
  co_await asio::post(pstrand, asio::use_awaitable); // nop - no operation because we are already on the correct execution_context
  tout() << "MC on PRODIO" << std::endl;
  co_await asio::post(astrand, asio::use_awaitable);
  tout() << "MC on APPIO" << std::endl;
}

int main() {
  asio::io_context prodIO;
  boost::thread prodThread;
  {
    // ensure the producer io context doesn't exit
    auto prodWork = asio::make_work_guard(prodIO);

    prodThread = boost::thread{[&prodIO] {
      tout() << "ProdThread run start" << std::endl;
      prodIO.run(); // if this call is removed the mainCo is stuck as expected
      tout() << "ProdThread run done" << std::endl;
    }};
    asio::io_context appIO;

    asio::co_spawn(appIO, mainCo(appIO, prodIO), asio::detached);

    tout() << "MainThread run start" << std::endl;
    appIO.run();
    tout() << "MainThread run done" << std::endl;
  }
  prodThread.join();
  return 42;
}

Current output:

/tmp/tmp.wz38MWkttM/cmake-build-debug-remote-host/CoroContextSwitching
T14386720116392206644 MainThread run start
T8726023523478668610 ProdThread run start
T14386720116392206644 MC on APPIO
T14386720116392206644 MC on PRODIO
T14386720116392206644 MC on APPIO
T14386720116392206644 MC on PRODIO
T14386720116392206644 MC on PRODIO
T14386720116392206644 MC on APPIO
T14386720116392206644 MainThread run done
T8726023523478668610 ProdThread run done

Process finished with exit code 42

Expected output:

/tmp/tmp.wz38MWkttM/cmake-build-debug-remote-host/CoroContextSwitching
T14386720116392206644 MainThread run start
T8726023523478668610 ProdThread run start
T14386720116392206644 MC on APPIO
T8726023523478668610 MC on PRODIO
T14386720116392206644 MC on APPIO
T8726023523478668610 MC on PRODIO
T8726023523478668610 MC on PRODIO
T14386720116392206644 MC on APPIO
T14386720116392206644 MainThread run done
T8726023523478668610 ProdThread run done

Process finished with exit code 42

I expect the thread id to change according to the cout statements. However all cout statements are executed on the MainThread.

How can I get the desired behaviour?

Edit 2:

The original question still stands this justs adds more information to the problem.

It seems like asio::use_awaitable is binding a default executor from somewhere. Is there a documented default?

With the following edited function I can achieve what I want:

asio::awaitable<void> mainCo(asio::io_context &appIO, asio::io_context &prodIO) {
  // the thread should also change when using the IO contexts directly.
  auto astrand = asio::io_context::strand{appIO};
  auto pstrand = asio::io_context::strand{prodIO};

  tout() << "MC on APPIO" << std::endl;
  co_await asio::post(pstrand, asio::bind_executor(pstrand, asio::use_awaitable)); // so use_awaitable is binding a default executor from somewhere.
  tout() << "MC on PRODIO" << std::endl;
  co_await asio::post(astrand, asio::use_awaitable);
  tout() << "MC on APPIO" << std::endl;
  co_await asio::post(pstrand, asio::bind_executor(pstrand, asio::use_awaitable));
  tout() << "MC on PRODIO" << std::endl;
  co_await asio::post(pstrand, asio::bind_executor(pstrand, asio::use_awaitable)); // nop - no operation because we are already on the correct execution_context
  tout() << "MC on PRODIO" << std::endl;
  co_await asio::post(astrand, asio::use_awaitable);
  tout() << "MC on APPIO" << std::endl;
  co_await asio::post(astrand, /* the first parameter in post doesn't even matter (can be astrand, pstrand, appIO, prodIO) same result */
                      asio::bind_executor(pstrand, asio::use_awaitable));
  tout() << "MC on PRODIO" << std::endl;
  co_await asio::post(astrand, asio::use_awaitable);
  tout() << "MC on APPIO" << std::endl;
}

HOWEVER, it seems that this is not the proper way to switch executor as the first argument of asio::post doesn't matter. So what is the correct way to do this?

Edit: The question was closed and pointed me to this does boost::asio co_spawn create an actual thread?. I am aware that co_spawn does not spawn a new thread. That's why I spawn a new thread myself called prodThread. I expect the execution_context to switch after awaiting the post statements. The linked question does not answer my question.

CodePudding user response:

You want to associate your executor with the completion token, and then let post/dispatch/defer figure it out from there:

co_await asio::post(bind_executor(pstrand, asio::use_awaitable));

See also e.g. When must you pass io_context to boost::asio::spawn? (C ) or boost::asio::bind_executor does not execute in strand for more details on how it works/why it works/when it works.

It does in fact explain how get_associated_executor does use a default so it might explain how explicitly posting to an executor didn't seem to work here (I haven't checked the associated executor implementation for use_awaitable just now)

Here's my take:

Live On Compiler Explorer

#include <boost/asio.hpp>
#include <boost/asio/experimental/as_single.hpp>
#include <boost/bind/bind.hpp>
#include <coroutine>
#include <iostream>
#include <thread>

inline void tout(auto const& msg) {
    static std::mutex mx;
    std::lock_guard   lk(mx);

    static const std::hash<std::thread::id> h{};

    std::cout << "T" << (h(std::this_thread::get_id()) % 100) << " " << msg
            << std::endl;
}

namespace asio = boost::asio;

asio::awaitable<void> mainCo(asio::io_context& appIO,
                            asio::io_context& prodIO) {
    auto to_app = bind_executor(make_strand(appIO), asio::use_awaitable);
    auto to_prod = bind_executor(make_strand(prodIO), asio::use_awaitable);

    tout("MC on APPIO");
    co_await asio::post(to_prod); tout("MC on PRODIO");
    co_await asio::post(to_app);  tout("MC on APPIO");
    co_await asio::post(to_prod); tout("MC on PRODIO");
    co_await asio::post(to_prod); tout("MC on PRODIO");
    co_await asio::post(to_app);  tout("MC on APPIO");
}

int main() {
    asio::io_context prodIO, appIO;
    auto             prodWork = asio::make_work_guard(prodIO);

    std::thread prodThread{[&prodIO] {
        tout("ProdThread run start");
        prodIO.run(); // if this call is removed the mainCo is stuck as
                    // expected
        tout("ProdThread run done");
    }};

    asio::co_spawn(appIO, mainCo(appIO, prodIO), asio::detached);

    tout("MainThread run start");
    appIO.run();
    tout("MainThread run done");

    prodWork.reset();
    prodThread.join();
}

Prints e.g.

T49 ProdThread run start
T31 MainThread run start
T31 MC on APPIO
T49 MC on PRODIO
T31 MC on APPIO
T49 MC on PRODIO
T49 MC on PRODIO
T31 MC on APPIO
T31 MainThread run done
T49 ProdThread run done

BONUS

I'd suggest passing executors, not execution context references. It's cleaner and more flexible: https://godbolt.org/z/Tr54vf8PM

It then makes it trivial to replace the prodIO thread with a simple thread_pool execution context. It removes the need for work guards and also the fixes missing exception handling: https://godbolt.org/z/a3GT61qdh

  • Related