Home > database >  How to exit ThreadPoolExecutor with statement immediately when a future is running
How to exit ThreadPoolExecutor with statement immediately when a future is running

Time:02-02

Coming from a .Net background I am trying to understand python multithreading using concurrent.futures.ThreadPoolExecutor and submit. I was trying to add a timeout to some code for a test but have realised I don't exactly understand some elements of what I'm trying to do. I have put some simplified code below. I would expect the method to return after around 5 seconds, when the call to concurrent.futures.wait(futures, return_when=FIRST_COMPLETED) completes. In fact it takes the full 10 seconds. I suspect it has to do with my understanding of the with statement as changing the code to thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=2) results in the behvaiour I would expect. Adding a call to the shutdown method doesn't do anything as all the futures are already running. Is there a way to exit out of the with statement immediately following the call to wait? I have tried using break and return but they have no effect. I am using python 3.10.8

from concurrent.futures import FIRST_COMPLETED
import threading
import concurrent
import time

def test_multiple_threads():
    set_timeout_on_method()
    print("Current Time =", datetime.now()) # Prints time N   10

  

def set_timeout_on_method():
    futures = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as thread_pool:
        print("Current Time =", datetime.now()) # Prints time N
        futures.append(thread_pool.submit(time.sleep, 5))
        futures.append(thread_pool.submit(time.sleep, 10))
        concurrent.futures.wait(futures, return_when=FIRST_COMPLETED)
        print("Current Time =", datetime.now()) # Prints time N   5
    print("Current Time =", datetime.now()) # Prints time N   10

CodePudding user response:

AFAIK, there is no native way to terminate threads from ThreadPoolExecutor and it's supposedly not even a good idea, as described in existing answers (exhibit A, exhibit B).

It is possible to do this with processes in ProcessPoolExecutor, but even then the main process would apparently wait for all the processes that already started:

If wait is False then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.

This means that even though the "End @" would be printed after cca 5 seconds, the script would terminate after cca 20 seconds.

from concurrent.futures import FIRST_COMPLETED, ProcessPoolExecutor, wait
from datetime import datetime
from time import sleep

def multiple_processes():
    print("Start @", datetime.now())
    set_timeout_on_method()
    print("End @", datetime.now())

def set_timeout_on_method():
    futures = []

    with ProcessPoolExecutor() as executor:
        futures.append(executor.submit(sleep, 5))
        futures.append(executor.submit(sleep, 10))
        futures.append(executor.submit(sleep, 20))
        print("Futures created @", datetime.now())

        if wait(futures, return_when=FIRST_COMPLETED):
            print("Shortest future completed @", datetime.now())
            executor.shutdown(wait=False, cancel_futures=True)

if __name__ == "__main__":
    multiple_processes()

With max_workers set to 1, the entire script would take cca 35 seconds because (to my surprise) the last future doesn't get cancelled, despite cancel_futures=True.

You could kill the workers, though. This would make the main process finish without delay:

...
    with ProcessPoolExecutor(max_workers=1) as executor:
        futures.append(executor.submit(sleep, 5))
        futures.append(executor.submit(sleep, 10))
        futures.append(executor.submit(sleep, 20))
        print("Futures created @", datetime.now())

        if wait(futures, return_when=FIRST_COMPLETED):
            print("Shortest future completed @", datetime.now())
            subprocesses = [p.pid for p in executor._processes.values()]
            executor.shutdown(wait=False, cancel_futures=True)

    for pid in subprocesses:
        os.kill(pid, signal.SIGTERM)
...

Disclaimer: Please don't take this answer as an advice to whatever you are trying achieve. It's just a brainstorming based on your code.

CodePudding user response:

The problem is that you can not cancel Future if it was already started:

Attempt to cancel the call. If the call is currently being executed or finished running and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

To prove it I made the following changes:

from concurrent.futures import (
    FIRST_COMPLETED,
    ThreadPoolExecutor,
    wait as futures_wait,
)
from time import sleep
from datetime import datetime


def test_multiple_threads():
    set_timeout_on_method()
    print("Current Time =", datetime.now())  # Prints time N   10


def set_timeout_on_method():
    with ThreadPoolExecutor(max_workers=2) as thread_pool:
        print("Current Time =", datetime.now())  # Prints time N
        futures = [thread_pool.submit(sleep, t) for t in (2, 10, 2, 100, 100, 100, 100, 100)]
        futures_wait(futures, return_when=FIRST_COMPLETED)
        print("Current Time =", datetime.now())  # Prints time N   5
        print([i.cancel() if not i.done() else "DONE" for i in futures])
    print("Current Time =", datetime.now())  # Prints time N   10


if __name__ == '__main__':
    test_multiple_threads()

As you can see only three of tasks are done. ThreadPoolExecutor actually based on threading module and Thread in Python can't be stopped in some conventional way. Check this answer

  • Related