Coming from a .Net background I am trying to understand python multithreading using concurrent.futures.ThreadPoolExecutor
and submit
. I was trying to add a timeout to some code for a test but have realised I don't exactly understand some elements of what I'm trying to do. I have put some simplified code below. I would expect the method to return after around 5 seconds, when the call to concurrent.futures.wait(futures, return_when=FIRST_COMPLETED)
completes. In fact it takes the full 10 seconds. I suspect it has to do with my understanding of the with
statement as changing the code to thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=2)
results in the behvaiour I would expect. Adding a call to the shutdown
method doesn't do anything as all the futures are already running. Is there a way to exit out of the with
statement immediately following the call to wait
? I have tried using break
and return
but they have no effect. I am using python 3.10.8
from concurrent.futures import FIRST_COMPLETED
import threading
import concurrent
import time
def test_multiple_threads():
set_timeout_on_method()
print("Current Time =", datetime.now()) # Prints time N 10
def set_timeout_on_method():
futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as thread_pool:
print("Current Time =", datetime.now()) # Prints time N
futures.append(thread_pool.submit(time.sleep, 5))
futures.append(thread_pool.submit(time.sleep, 10))
concurrent.futures.wait(futures, return_when=FIRST_COMPLETED)
print("Current Time =", datetime.now()) # Prints time N 5
print("Current Time =", datetime.now()) # Prints time N 10
CodePudding user response:
AFAIK, there is no native way to terminate threads from ThreadPoolExecutor
and it's supposedly not even a good idea, as described in existing answers (exhibit A, exhibit B).
It is possible to do this with processes in ProcessPoolExecutor
, but even then the main process would apparently wait for all the processes that already started:
If wait is False then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.
This means that even though the "End @" would be printed after cca 5 seconds, the script would terminate after cca 20 seconds.
from concurrent.futures import FIRST_COMPLETED, ProcessPoolExecutor, wait
from datetime import datetime
from time import sleep
def multiple_processes():
print("Start @", datetime.now())
set_timeout_on_method()
print("End @", datetime.now())
def set_timeout_on_method():
futures = []
with ProcessPoolExecutor() as executor:
futures.append(executor.submit(sleep, 5))
futures.append(executor.submit(sleep, 10))
futures.append(executor.submit(sleep, 20))
print("Futures created @", datetime.now())
if wait(futures, return_when=FIRST_COMPLETED):
print("Shortest future completed @", datetime.now())
executor.shutdown(wait=False, cancel_futures=True)
if __name__ == "__main__":
multiple_processes()
With max_workers
set to 1
, the entire script would take cca 35 seconds because (to my surprise) the last future doesn't get cancelled, despite cancel_futures=True
.
You could kill the workers, though. This would make the main process finish without delay:
...
with ProcessPoolExecutor(max_workers=1) as executor:
futures.append(executor.submit(sleep, 5))
futures.append(executor.submit(sleep, 10))
futures.append(executor.submit(sleep, 20))
print("Futures created @", datetime.now())
if wait(futures, return_when=FIRST_COMPLETED):
print("Shortest future completed @", datetime.now())
subprocesses = [p.pid for p in executor._processes.values()]
executor.shutdown(wait=False, cancel_futures=True)
for pid in subprocesses:
os.kill(pid, signal.SIGTERM)
...
Disclaimer: Please don't take this answer as an advice to whatever you are trying achieve. It's just a brainstorming based on your code.
CodePudding user response:
The problem is that you can not cancel Future
if it was already started:
To prove it I made the following changes:
from concurrent.futures import (
FIRST_COMPLETED,
ThreadPoolExecutor,
wait as futures_wait,
)
from time import sleep
from datetime import datetime
def test_multiple_threads():
set_timeout_on_method()
print("Current Time =", datetime.now()) # Prints time N 10
def set_timeout_on_method():
with ThreadPoolExecutor(max_workers=2) as thread_pool:
print("Current Time =", datetime.now()) # Prints time N
futures = [thread_pool.submit(sleep, t) for t in (2, 10, 2, 100, 100, 100, 100, 100)]
futures_wait(futures, return_when=FIRST_COMPLETED)
print("Current Time =", datetime.now()) # Prints time N 5
print([i.cancel() if not i.done() else "DONE" for i in futures])
print("Current Time =", datetime.now()) # Prints time N 10
if __name__ == '__main__':
test_multiple_threads()
As you can see only three of tasks are done. ThreadPoolExecutor
actually based on threading
module and Thread
in Python can't be stopped in some conventional way. Check this answer