Home > database >  Wrapping python async for synchronous execution
Wrapping python async for synchronous execution

Time:12-07

I'm trying to load data from a local Postgres database as quickly as possible, and it appears that the most performant python package is [asyncpg][1]. My code is synchronous, and I repeatedly need to load chunks of data. I'm not interested in having the async keyword propagate to every function I've written, so I'm trying to wrap the async code in a synchronous function.

The code below works, but is incredibly ugly:

def connect_to_postgres(user, password, database, host):
    async def wrapped():
        return await asyncpg.connect(user=keys['user'], password=keys['password'],
                                    database='markets', host='127.0.0.1')
    loop = asyncio.get_event_loop()    
    db_connection = loop.run_until_complete(wrapped())
    return db_connection
    
db_connection = connect_to_postgres(keys['user'], keys['password'],
                                    'db', '127.0.0.1')

def fetch_from_postgres(query, db_connection):
    async def wrapped():
        return await db_connection.fetch(query)
    loop = asyncio.get_event_loop()    
    values = loop.run_until_complete(wrapped())
    return values

fetch_from_postgres("SELECT * from db LIMIT 5", db_connection)

In Julia I would do something like

f() = @async 5
g() = fetch(f())
g()

But in Python it seems I have to do the rather clunky,

async def f():
  return 5
def g():
  loop = asyncio.get_event_loop()    
  return loop.run_until_complete(f())

Just wondering if there's a better way?

Edit: the latter python example can of course be written using

def fetch(x):
    loop = asyncio.get_event_loop()    
    return loop.run_until_complete(x)

Edit 2: I do care about performance, but wish to use a synchronous programing approach. asyncpg is 3x faster than psycopg2 as its core implementation is in Cython rather than Python, this is explained in more detail at https://magic.io/blog/asyncpg-1m-rows-from-postgres-to-python/. Hence my desire to wrap this asynchronous code.

Although, still need to create an async wrapped function unless I'm missing something. [1]: https://github.com/MagicStack/asyncpg

CodePudding user response:

This is not difficult to do if you set up your program structure at the beginning. You create a second thread in which your async code will run, and start its event loop. When your main thread, which remains entirely synchronous, wants the result of async call (coroutine), you use the method asyncio.run_coroutine_threadsafe. That method returns a concurrent.futures.Future object. You obtain the returned value by calling its method result(), which blocks until the result is available.

It's almost as if you called the async method like a subroutine. There is minimal overhead because you created only one secondary thread. Here is a simple example:

import asyncio
import threading
from datetime import datetime

async def demo(t):
    await asyncio.sleep(t)
    print(f"Demo function {t} {datetime.now()}")
    return t

def main():
    def thr(loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()
        
    loop = asyncio.new_event_loop()
    t = threading.Thread(target=thr, args=(loop, ), daemon=True)
    t.start()

    print("Main", datetime.now())
    t1 = asyncio.run_coroutine_threadsafe(demo(1.0), loop).result()
    t2 = asyncio.run_coroutine_threadsafe(demo(2.0), loop).result()
    print(t1, t2)

if __name__ == "__main__":
    main()

# >>> Main 2021-12-06 19:14:14.135206
# >>> Demo function 1.0 2021-12-06 19:14:15.146803
# >>> Demo function 2.0 2021-12-06 19:14:17.155898
# >>> 1.0 2.0

Your main program experiences a 1-second delay on the first invocation of demo(), and a 2-second delay on the second invocation. That's because your main thread does not have an event loop and therefore cannot execute the two delays in parallel. But that's exactly what you implied that you wanted, when you said that you wanted a synchronous program that uses a third-party async package.

This is a similar answer but the question is slightly different:

How can I have a synchronous facade over asyncpg APIs with Python asyncio?

  • Related