Home > Mobile >  Futures generated by ThreadPoolExecutor do not behave asynchronously
Futures generated by ThreadPoolExecutor do not behave asynchronously

Time:01-31

I want to create a list of futures running on ThreadPoolExecutor, then display each one of them as soon as they finist evaluating.

The expected result is: each of 0, 2, 6, 12 will printed every 3 seconds.

However, I'm getting a result only after 12 seconds and the numbers are displayed simulatenously.

from concurrent.futures import ThreadPoolExecutor
import time

def fnc(x, y):
    time.sleep(3)
    return x*y

futures = []
with ThreadPoolExecutor(max_workers=1) as executor:
    for i in range(0, 4):
        print(f"Submitting {i}")
        futures  = [executor.submit(fnc, i, i 1)]

for f in futures:
    print(f.result())

CodePudding user response:

Build your list of submitted futures then use as_completed() to know when a thread has finished and its result is available.

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fnc(x, y):
    time.sleep(3)
    return x*y

futures = []

with ThreadPoolExecutor() as executor:
    for i in range(0, 4):
        print(f"Submitting {i}")
        futures  = [executor.submit(fnc, i, i 1)]
    for future in as_completed(futures):
        print(future.result())

CodePudding user response:

You are calling the result method on each future outside the ThreadPoolExecutor context manager, when you exit, it, it calls the __exit__ method:

def __exit__(self, exc_type, exc_val, exc_tb):
    self.shutdown(wait=True)
    return False

The shutdown method signature is:

shutdown(self, wait=True, *, cancel_futures=False)

And the docs says:

Args:
     wait: If True then shutdown will not return until all running
           futures have finished executing and the resources used by the
           executor have been reclaimed.
     cancel_futures: If True then shutdown will cancel all pending
           futures. Futures that are completed or running will not be
           cancelled.

We can see that by default it will wait until all running futures and their resources have stopped running as well, and that cancel_futures is by default getting the value False, thus we are not canceling pending futures.

We can prove it by changing the fnc to print the values instead of returning them, and do nothing after the ThreadPoolExecutor context manager block:

def fnc(x, y):
    time.sleep(3)
    print(x * y)


futures = []
with ThreadPoolExecutor(max_workers=1) as executor:
    for i in range(0, 4):
        print(f"Submitting {i}")
        futures  = [executor.submit(fnc, i, i   1)]

print("Blah!")

Still prints the values 0, 2, 6, 12! even though we only submitted the function to an executor list...


Fix it by moving the for loop block inside the context manager:

from concurrent.futures import ThreadPoolExecutor
import time

def fnc(x, y):
    time.sleep(3)
    return x*y

futures = []
with ThreadPoolExecutor(max_workers=1) as executor:
    for i in range(0, 4):
        print(f"Submitting {i}")
        futures  = [executor.submit(fnc, i, i 1)]
    for f in futures:
        print(f.result())

Note that setting max_workers=1 is essentially forcing the program to run consecutively rather than concurrently, in this program, setting max_workers=X will print the results of X returns of fnc at a time.

If you want to wait three seconds between each result, then either set max_workers to 1 or remove it at all. If you want to print two results at a time each three seconds - set max_workers=2 etc..

  • Related