Home > Mobile >  Wrapping a polling-based asynchronous API as an Awaitable
Wrapping a polling-based asynchronous API as an Awaitable

Time:01-21

Consider some library with an interface like this:

  • RemoteTask.start()
  • RemoteTask.cancel()
  • RemoteTask.get_id()
  • RemoteTask.get_result()
  • RemoteTask.is_done()

For example, concurrent.futures.Future implements an API like this, but I don't want to assume the presence of a function like concurrent.futures.wait.

In traditional Python code, you might need to poll for results:

def foo():
   task = RemoteTask()
    while not task.is_done():
        time.sleep(2)
    return task.get_result()

Is there some general recommended best-practice technique for wrapping this in an Awaitable interface?

The desired usage would be:

async def foo():
    task = RemoteTask()
    return await run_remote_task()

I understand that the implementation details might differ across async libraries, so I am open to both general strategies for solving this problem, and specific solutions for Asyncio, Trio, AnyIO, or even Curio.

Assume that this library cannot be easily modified, and must be wrapped.

CodePudding user response:

First possibility: If the library has a way to block until completed (preferably one that doesn't just call while not task.is_done(): in a tight loop), you can use anyio.to_thread.run_sync to avoid blocking your main loop.

Second: If the library has a way to hook up a completion callback, you can set an event from it, which your anyio/trio/asyncio task await evt.wait()s for.

Third: If neither of these is true, you might try asking its author to add at least one of those. Busy waiting is not nice!

The fourth method is to fork the remote code's source code and liberally sprinkle it with import anyio, async def and await keywords until calling await task.run() Just Works.

Method five would be to split the library into a core that does all the work but doesn't do any I/O by itself, and a wrapper that tells you when you should do some possibly-asynchronous work. This way is called "sans I/O". Several nontrivial popular libraries work that way, e.g. Trio's SSL implementation, or the httpx handler for the HTTP protocol. Upside, this way is most useful because you can combine the protocol with others most easily, including writing a simple front-end that behaves just like the original module. Downside, if you start from an existing codebase it's the most work.

Method four often is way easier than it sounds, I've done it for a lot of not-too-complex libraries.

CodePudding user response:

As the answer by Matthias says, if there is a way to block waiting for the remote task to complete (something like task.wait_until_done()) then you can turn that into an async wait function using the respective libraries' functions to run blocking code in a thread.

Failing that, and assuming that none of those functions block, then polling will still work. If you don't want to handle cancellation then it looks almost identical to the synchronous version and would be very similar across the different frameworks. It looks like this:

async def run_remote_task(*args, poll_interval=0.1):
    task = RemoteTask(*args)
    while not task.is_done():
        await trio.sleep(poll_interval)
    return task.result()

The most basic way to handle cancellation is to cancel the remote task if the async task is cancelled, but then don't wait for remote cancellation to complete. For that, things are still fairly simple:

async def run_remote_task(*args, poll_interval=0.1):
    task = RemoteTask(*args)
    try:
        while not task.is_done():
            await trio.sleep(poll_interval)
        return task.result()
    finally:
        if not task.is_done():
            task.cancel()

Again, that would look almost identical on the other frameworks too. Just replace trio.sleep() with asyncio.sleep() or anyio.sleep().

If you want to wait for the remote cancellation to finish, things are slightly more fiddly. This is what it would look like in Trio (anyio is the same but I'm not sure about asyncio):

async def run_remote_task(*args, poll_interval=0.1):
    task = RemoteTask(*args)
    try:
        while not task.is_done():
            await trio.sleep(poll_interval)
        return task.result()
    finally:
        if not task.is_done():
            with trio.CancelScope(shield=True):
                task.cancel()
                while not task.is_done():
                    await trio.sleep(poll_interval)
  • Related