Home > Net >  Create asyncio Future and fill result in other thread via Queue not working
Create asyncio Future and fill result in other thread via Queue not working

Time:10-12

The problem is that I need to create an async method/library as follow (so that it does not block asyncio event loop):

  1. Create future from the being defined async method (let's say it methodA)
  2. Put the future into some Queue/List/Dict and one service will fill result into the future when available (the result take long time to be available)
  3. await the future in the methodA

The problem I have is awaiting for the created future is blocking forever as simplified example below.

import asyncio
from asyncio import Future
from queue import Queue
from threading import Thread

futures_queue: Queue[Future] = Queue()


def fill_result_service():
    counter = 0
    while True:
        fut = futures_queue.get()
        print(f"Processing fut={id(fut)}")
        fut.set_result(f"OK: {counter}")
        counter  = 1


filler_thread = Thread(target=fill_result_service)
filler_thread.start()


async def main_not_ok():
    fut: Future[str] = asyncio.get_running_loop().create_future()
    print(f"Putting fut={id(fut)} into queue")
    futures_queue.put(fut)
    result = await fut
    assert result.startswith("OK")
    print("main_not_ok() completed")


async def main_ok():
    fut: Future[str] = asyncio.get_running_loop().create_future()
    tmp_thread = Thread(target=lambda: fut.set_result("OK: Local thread"))
    tmp_thread.start()
    result = await fut
    assert result.startswith("OK")
    print("main_ok() completed")
    


if __name__ == "__main__":
    print("Running main_ok: ")
    asyncio.run(main_ok()) # work as expected
    print("\n\n\nRunning main_not_ok: ")
    asyncio.run(main_not_ok()) #blocking forever

I have struggled to debug it for half-day and can't figure it out. Please help me.

CodePudding user response:

You can do it without Queue the following way:

import asyncio
import time
from random import randint
from threading import Thread


def set_future_result(future: asyncio.Future, event: asyncio.Event, loop: asyncio.AbstractEventLoop):
    """Thread target function. It gives some result to futures."""
    async def _event_status_change(_event: asyncio.Event):
        """Wrap event in coroutine to run it threadsafe"""
        _event.set()

    time.sleep(3)
    res = randint(1, 10)
    if res > 8:
        future.set_exception(Exception(f"Result: {res} "   "Error  !!! " * 2))
    else:
        future.set_result(res)
    asyncio.run_coroutine_threadsafe(_event_status_change(event), loop)


async def asyncio_loop_killer(task: asyncio.Task):
    """Just task to finish our app in several seconds."""
    n = 20
    while n:
        await asyncio.sleep(1)
        n -= 1
        if task.done():
            break
    else:
        task.cancel()


async def tasks_producer():
    """Main function of our app. It produces futures for children threads."""
    loop = asyncio.get_event_loop()
    while True:
        future, event = asyncio.Future(), asyncio.Event()
        event.clear()
        worker = Thread(target=set_future_result, args=(future, event, loop,), daemon=True)
        worker.start()
        await event.wait()
        if res := future.exception():
            print(f"Error: {res}")
            break

        print(f"Result: {future.result()}")


async def async_main():
    """Wrapper around all async activity."""
    producer_task = asyncio.create_task(tasks_producer())
    await asyncio_loop_killer(producer_task)

if __name__ == '__main__':
    asyncio.run(async_main())

Also check add_done_callback of Future

  • Related