In my use case I ping to an service while a sync operation is occurring. Once the sync operation is finished I need to to stop the ping operation, but it seems run_in_executor
is not able to cancel
import asyncio
import threading
async def running_bg(loop, event):
await loop.run_in_executor(None, running, loop, event)
def running(loop, event):
while True:
if event.is_set():
print("cancelling")
break
print("We are in running")
future = asyncio.run_coroutine_threadsafe(asyncio.sleep(5), loop)
future.result()
return
async def run_them(steps, loop):
step = steps
event = threading.Event()
task = loop.create_task(running_bg(loop, event))
while steps:
await asyncio.sleep(2)
steps -= 1
event.set()
task.cancel() # if I comment this, it works well, but if I dont, then it hangs
try:
await task
except asyncio.CancelledError:
print("task cancelled")
return
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(run_them(3, loop))
When I call cancel()
it hangs in the terminal with:
We are in running
We are in running
task cancelled
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/lib/python3.8/concurrent/futures/thread.py", line 40, in _python_exit
t.join()
File "/usr/lib/python3.8/threading.py", line 1011, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.8/threading.py", line 1027, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
But When I dont call the cancel()
it works fine with the threading.Event
flag.
We are in running
We are in running
cancelling
I know we dont need to cancel the if we have an event flag, but I saw this example from a different answer.
So why does the program hangs, any possible reason?
CodePudding user response:
The problem is related to the fact that your running
method, which executes in the asyncio
default thread pool, is calling back into the asyncio event loop thread and waiting for a result. In your example code, you're letting the run_them
function exit immediately after you cancel the Task, which immediately shuts down your event loop. When the event loop shuts down, it means any outstanding coroutines do not complete.
This means your event loop shuts down before your running
method receives the result from its future.result()
call, which is waiting on an asyncio.sleep(5)
call that will never complete. That means future.result()
never returns, which leaves the running
method hanging, which means the ThreadPoolExecutor
it is running in can't shutdown. This is what prevents your application from exiting. Note how the stack trace you get when you Ctrl C starts in the concurrent.futures
library - that's where it waits for the ThreadPoolExecutor
to shut down.
If you're using Python 3.9 , you should be able to fix this by adding a call to await loop.shutdown_default_executor()
at the end of your run_them
method. If you're using an earlier version, you have to basically implement that method yourself:
def shutdown(fut, loop):
try:
loop._default_executor.shutdown(wait=True)
finally:
loop.call_soon_threadsafe(fut.set_result, None)
async def run_them(steps, loop):
step = steps
event = threading.Event()
task = loop.create_task(running_bg(loop, event))
while steps:
await asyncio.sleep(2)
steps -= 1
event.set()
task.cancel()
try:
await task
except asyncio.CancelledError:
print("task cancelled")
# Wait for the default thread pool to shut down before exiting
fut = loop.create_future()
t = threading.Thread(target=shutdown, args=(fut, loop))
t.start()
await fut
t.join()
Alternatively, you could just not call task.cancel() and rely on the Event()
to break out of the running
method.