Home > Software design >  Use asyncio tasks in a separate thread
Use asyncio tasks in a separate thread

Time:03-17

I have a requirement for a project to use several hundred threads and each thread should run an asynchronous function. At the end, I need to collect results from all the threads. I have following example but it works synchronously.

import asyncio
import concurrent.futures

async def count():
    print("One")
    # here I need to use an asynchronous function
    await asyncio.sleep(2)
    print("Two")
    return "some result"

async def main():
    futures = []
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for _ in range(5):
            future = executor.submit(count)
            futures.append(future)

        results = [await f.result() for f in concurrent.futures.as_completed(futures)]
        print(results)


if __name__ == "__main__":
    asyncio.run(main())

Code above outputs:

One
Two
One
Two
One
Two
One
Two
One
Two
['some result', 'some result', 'some result', 'some result', 'some result']

Expected output:

One
One
One
One
One
Two
Two
Two
Two
Two
['some result', 'some result', 'some result', 'some result', 'some result']

CodePudding user response:

Use could use event loop's run_in_executor() to schedule the tasks and asyncio.as_completed() to loop over the results:

async def main():
    futures = []
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for _ in range(5):
            futures.append(await loop.run_in_executor(executor, count))

        results = [await f for f in asyncio.as_completed(futures)]
        print(results)

CodePudding user response:

Your code does not use eventloop correctly, note that asyncio.run runs the passed coroutine, taking care of managing the asyncio event loop, finalizing asynchronous generators, and closing the threadpool. see more details in https://docs.python.org/3/library/asyncio-task.html#asyncio.run

do it below will properly reuse the same eventloop.

import asyncio
import concurrent.futures

async def count():
    print("One")
    # here I need to use an asynchronous function
    await asyncio.sleep(2)
    print("Two")
    return "some result"

async def main():
    loop = asyncio.get_running_loop()
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # this in fact return coroutine object
        coros = [await loop.run_in_executor(executor, count) for _ in range(5)]
        results = [await f for f in asyncio.as_completed(coros)]
        print(results)


if __name__ == "__main__":
    asyncio.run(main())

output

One
One
One
One
One
Two
Two
Two
Two
Two
['some result', 'some result', 'some result', 'some result', 'some result']
  • Related