I have a requirement for a project to use several hundred threads and each thread should run an asynchronous function. At the end, I need to collect results from all the threads. I have following example but it works synchronously.
import asyncio
import concurrent.futures
async def count():
print("One")
# here I need to use an asynchronous function
await asyncio.sleep(2)
print("Two")
return "some result"
async def main():
futures = []
with concurrent.futures.ThreadPoolExecutor() as executor:
for _ in range(5):
future = executor.submit(count)
futures.append(future)
results = [await f.result() for f in concurrent.futures.as_completed(futures)]
print(results)
if __name__ == "__main__":
asyncio.run(main())
Code above outputs:
One
Two
One
Two
One
Two
One
Two
One
Two
['some result', 'some result', 'some result', 'some result', 'some result']
Expected output:
One
One
One
One
One
Two
Two
Two
Two
Two
['some result', 'some result', 'some result', 'some result', 'some result']
CodePudding user response:
Use could use event loop's run_in_executor()
to schedule the tasks and asyncio.as_completed()
to loop over the results:
async def main():
futures = []
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
for _ in range(5):
futures.append(await loop.run_in_executor(executor, count))
results = [await f for f in asyncio.as_completed(futures)]
print(results)
CodePudding user response:
Your code does not use eventloop correctly, note that asyncio.run
runs the passed coroutine, taking care of managing the asyncio event loop, finalizing asynchronous generators, and closing the threadpool. see more details in https://docs.python.org/3/library/asyncio-task.html#asyncio.run
do it below will properly reuse the same eventloop.
import asyncio
import concurrent.futures
async def count():
print("One")
# here I need to use an asynchronous function
await asyncio.sleep(2)
print("Two")
return "some result"
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# this in fact return coroutine object
coros = [await loop.run_in_executor(executor, count) for _ in range(5)]
results = [await f for f in asyncio.as_completed(coros)]
print(results)
if __name__ == "__main__":
asyncio.run(main())
output
One
One
One
One
One
Two
Two
Two
Two
Two
['some result', 'some result', 'some result', 'some result', 'some result']