The problem is that I need to create an async method/library as follow (so that it does not block asyncio event loop):
- Create future from the being defined async method (let's say it methodA)
- Put the future into some Queue/List/Dict and one service will fill result into the future when available (the result take long time to be available)
- await the future in the methodA
The problem I have is awaiting for the created future is blocking forever as simplified example below.
import asyncio
from asyncio import Future
from queue import Queue
from threading import Thread
futures_queue: Queue[Future] = Queue()
def fill_result_service():
counter = 0
while True:
fut = futures_queue.get()
print(f"Processing fut={id(fut)}")
fut.set_result(f"OK: {counter}")
counter = 1
filler_thread = Thread(target=fill_result_service)
filler_thread.start()
async def main_not_ok():
fut: Future[str] = asyncio.get_running_loop().create_future()
print(f"Putting fut={id(fut)} into queue")
futures_queue.put(fut)
result = await fut
assert result.startswith("OK")
print("main_not_ok() completed")
async def main_ok():
fut: Future[str] = asyncio.get_running_loop().create_future()
tmp_thread = Thread(target=lambda: fut.set_result("OK: Local thread"))
tmp_thread.start()
result = await fut
assert result.startswith("OK")
print("main_ok() completed")
if __name__ == "__main__":
print("Running main_ok: ")
asyncio.run(main_ok()) # work as expected
print("\n\n\nRunning main_not_ok: ")
asyncio.run(main_not_ok()) #blocking forever
I have struggled to debug it for half-day and can't figure it out. Please help me.
CodePudding user response:
You can do it without Queue
the following way:
import asyncio
import time
from random import randint
from threading import Thread
def set_future_result(future: asyncio.Future, event: asyncio.Event, loop: asyncio.AbstractEventLoop):
"""Thread target function. It gives some result to futures."""
async def _event_status_change(_event: asyncio.Event):
"""Wrap event in coroutine to run it threadsafe"""
_event.set()
time.sleep(3)
res = randint(1, 10)
if res > 8:
future.set_exception(Exception(f"Result: {res} " "Error !!! " * 2))
else:
future.set_result(res)
asyncio.run_coroutine_threadsafe(_event_status_change(event), loop)
async def asyncio_loop_killer(task: asyncio.Task):
"""Just task to finish our app in several seconds."""
n = 20
while n:
await asyncio.sleep(1)
n -= 1
if task.done():
break
else:
task.cancel()
async def tasks_producer():
"""Main function of our app. It produces futures for children threads."""
loop = asyncio.get_event_loop()
while True:
future, event = asyncio.Future(), asyncio.Event()
event.clear()
worker = Thread(target=set_future_result, args=(future, event, loop,), daemon=True)
worker.start()
await event.wait()
if res := future.exception():
print(f"Error: {res}")
break
print(f"Result: {future.result()}")
async def async_main():
"""Wrapper around all async activity."""
producer_task = asyncio.create_task(tasks_producer())
await asyncio_loop_killer(producer_task)
if __name__ == '__main__':
asyncio.run(async_main())
Also check add_done_callback
of Future