Home > Software engineering >  How do I write my own async operations with (non-boost) Asio?
How do I write my own async operations with (non-boost) Asio?

Time:10-16

I want to write my own asynchronous function based on Asio 1.19.2 (without boost).

The motivation is to create an interface that works with a JSON-RPC-like protocol (not JSON-RPC exactly).

On the socket it would look something like

--> { "method": "subtract", "params": [42, 23], "id": 1 }
<-- ... some irrelevant notifications, etc.
<-- { "result": 19, "id": 1 } <-- ID matches initial call

I have this figured out using std::futures (without Asio). That method signature is something like:

// near useless, because futures don't allow for continuation
std::future<std::string> json_rpc(std::string method, json::array args);

To implement that, I just needed to:

  • Keep the socket open in the background (it could also be passed into the method or whatever).
  • Store a set of pending promises.
  • When you call json_rpc, make a new promise, add it to the set, return the get_future().
  • Send the RPC on the socket.
  • If you get a { "result" ... } message on the socket, find the pending future with that ID and set_value() its promise with the result.

How do I translate this future-based approach into an asio-based one, that has a similar interface to e.g. async_read_until? In other words, I want to implement something like:

template <typename CompletionToken>
void async_json_rpc(std::string method, json::array args, CompletionToken&& completion_token);

I found this tutorial from beast, but it uses Boost.Beast, seems outdated, and the only asynchronous part of it is where it calls async_read_some. My code is not really able to make use of these asio async_ "primitives", since my completion handler needs to be invoked at some time outside of the OS's control.

I also found these examples from the asio docs, but they have the same limitation of just wrapping other async_ calls. They also get increasingly unmanageably complex, for application code.

The core of my question, I suppose, is: I have an async function that basically works like it uses asio::use_future as its CompletionToken. How do I let it use any kind of CompletionToken? What is the promise.set_value() equivallent for an asio completion handler?

I'm very interested in simple to understand solutions, so that I'm not the only one on my team able to maintain these sorts of functions.

CodePudding user response:

I think you are asking how to implement the "strand"? I've done things similar to this.

In general, you need a worker thread that services a queue. It will loop and execute each item in the queue, but then sleep when it is empty. Adding something to the queue will signal the thread to wake up, if necessary.

A fancier variation is to use a thread pool and common queue, pruning excess threads if you have too many and creating more if there is too much work for the existing threads. Optimally this requires OS support (I've implemented this for Windows Completion Ports).

When you want to do something async, you put the promise on the queue rather than starting a dedicated thread for that.

Or... were you looking at coroutines, to do co_await calls in C ?

CodePudding user response:

It does not sound like you have any "custom" async operations.

All of your bullets can be implemented in completion handlers of either async_read or async_write.

  • When you call json_rpc, make a new promise, add it to the set, return the get_future().

Yes, this is fine. If the I/O service runs in a separate thread, make sure to protect any shared state with a mutex.

  • Send the RPC on the socket.

Just before returning the future, invoke async_write to send the request. Use a strand to serialize the writes (remember that only one async_write may be in-flight).

At a later stage, if you find yourself needing better performance under load, you can abandon the strand and do the queueing "manually" using a queue of pending requests. Then you will be able to send all pending requests in a single async_write operation, which will pack them into larger TCP packets, thus improving throughput.

  • If you get a { "result" ... } message on the socket, find the pending future with that ID and set_value() its promise with the result.

Exactly that is done in the completion handler of async_read. Fire another async_read before returning from async_read.

  • Related