Home > Software design >  Cancel run_in_executor coroutine from the main thread not working
Cancel run_in_executor coroutine from the main thread not working

Time:06-11

In my use case I ping to an service while a sync operation is occurring. Once the sync operation is finished I need to to stop the ping operation, but it seems run_in_executor is not able to cancel

import asyncio
import threading

async def running_bg(loop, event):
    await loop.run_in_executor(None, running, loop, event)

def running(loop, event):
    while True:
        if event.is_set():
            print("cancelling")
            break
        print("We are in running")
        future = asyncio.run_coroutine_threadsafe(asyncio.sleep(5), loop)
        future.result()
    return

async def run_them(steps, loop):
    step = steps
    event = threading.Event()
    task = loop.create_task(running_bg(loop, event))
    while steps:
        await asyncio.sleep(2)
        steps -= 1
    event.set()
    task.cancel() # if I comment this, it works well, but if I dont, then it hangs
    try:
        await task
    except asyncio.CancelledError:
        print("task cancelled")
    return


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run_them(3, loop))

When I call cancel() it hangs in the terminal with:

We are in running
We are in running
task cancelled
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python3.8/concurrent/futures/thread.py", line 40, in _python_exit
    t.join()
  File "/usr/lib/python3.8/threading.py", line 1011, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.8/threading.py", line 1027, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

But When I dont call the cancel() it works fine with the threading.Event flag.

We are in running
We are in running
cancelling

I know we dont need to cancel the if we have an event flag, but I saw this example from a different answer.

So why does the program hangs, any possible reason?

CodePudding user response:

The problem is related to the fact that your running method, which executes in the asyncio default thread pool, is calling back into the asyncio event loop thread and waiting for a result. In your example code, you're letting the run_them function exit immediately after you cancel the Task, which immediately shuts down your event loop. When the event loop shuts down, it means any outstanding coroutines do not complete.

This means your event loop shuts down before your running method receives the result from its future.result() call, which is waiting on an asyncio.sleep(5) call that will never complete. That means future.result() never returns, which leaves the running method hanging, which means the ThreadPoolExecutor it is running in can't shutdown. This is what prevents your application from exiting. Note how the stack trace you get when you Ctrl C starts in the concurrent.futures library - that's where it waits for the ThreadPoolExecutor to shut down.

If you're using Python 3.9 , you should be able to fix this by adding a call to await loop.shutdown_default_executor() at the end of your run_them method. If you're using an earlier version, you have to basically implement that method yourself:

def shutdown(fut, loop):
    try:
        loop._default_executor.shutdown(wait=True)
    finally:
        loop.call_soon_threadsafe(fut.set_result, None)


async def run_them(steps, loop):
    step = steps
    event = threading.Event()
    task = loop.create_task(running_bg(loop, event))
    while steps:
        await asyncio.sleep(2)
        steps -= 1
    event.set()
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("task cancelled")
    # Wait for the default thread pool to shut down before exiting
    fut = loop.create_future()
    t = threading.Thread(target=shutdown, args=(fut, loop))
    t.start()
    await fut
    t.join()

Alternatively, you could just not call task.cancel() and rely on the Event() to break out of the running method.

  • Related