Home > Back-end >  Why do (deamon) threads in Python behave that way when using process pools?
Why do (deamon) threads in Python behave that way when using process pools?

Time:09-07

Given the following code I would expect the code to run for around 6 seconds as I would expect the calls with argument 0 to terminate almost instantly and the calls with argument 100 to timeout after 3 seconds.

import multiprocessing
import threading
import time
import multiprocessing


def timeout_function_wrapper(function, args, timeout):
    # Wrapper function to allow for proper results etc.
    def auxiliary(n, out_q):
        res = function(n)
        out_q.put(res)  # to get result back from thread target

    result_q = multiprocessing.Queue()

    thread = threading.Thread(target=auxiliary, args=(args, result_q))
    thread.start()
    thread.join(timeout=timeout)  # if it times out, thread.is_alive() will be True, no exception is raised

    if thread.is_alive():
        raise multiprocessing.TimeoutError("Timed out after waiting for {}s".format(timeout))
    return result_q.get()

def foo(t):
    time.sleep(t)
    return t*t

if __name__ == '__main__':
    t1 = time.time()
    with multiprocessing.Pool(processes=1, maxtasksperchild=1) as pool:
        futures = [pool.apply_async(timeout_function_wrapper, [foo, i, 3]) for i in [100, 0, 100,0]]

        for i, x in enumerate(futures):
            try:
                res = x.get()  # timeout not needed here, it's in the wrapper
                print("Res", i, res)
            except multiprocessing.TimeoutError as e:
                print("Timeout")

    print("Took {}".format(time.time() - t1))

It turns out that this code runs for around 103 seconds. Why is that the case? If I change the the line

thread = threading.Thread(target=auxiliary, args=(args, result_q))

to

thread = threading.Thread(target=auxiliary, args=(args, result_q), daemon=True)

it finishes in 6 seconds.

Questions

Why do threads need to be daemon threads to timeout "in the intended way"?
Why does it timeout "correctly" once and not at all once when they are not daemon threads?

UPDATE So in addition I found that if foo is replaced by:

def foo(t):
    while True:
        pass
    return 42

The whole program does not terminate (even after the sum of timeouts) regardless of it being daemon threads or not.

CodePudding user response:

I need to elaborate on Aaron's comment:

First of all, without daemon=True your code as posted (plus the missing import statements) will actually takes a little more than 200 seconds.

The issue is that for each submitted task, since you specified maxtasksperchild=1 in addition to processes=1 when initializing your multiprocessing pool, the pool manager must start a new process but it first waits for the old process to complete. This will not occur until the thread it started to run the auxiliary function completes because it is not a daemon thread. If you specify the thread to be a daemon thread, then it will terminate right away as soon as the main thread for the pool process terminates. This is what you observed.

You can instead remove the maxtasksperchild=1 argument so that it no longer becomes necessary to wait for the pool process to finish just to restart another process to run the next submitted task. So the same pool process can now start multiple non-daemon threads without blocking. When you exit the with multiprocessing.Pool(processes=1) as pool: block, there is an implicit call to pool.terminate that immediately terminates all the processes in your pool (which is only 1 in your case) regardless of whether it has any non-daemon threads running in addition the the main thread. Consequently the running time is the 6 seconds you expect. If you instead of using a context manager for the pool, which calls terminate on exit you use the following code:

if __name__ == '__main__':
    t1 = time.time()
    pool = multiprocessing.Pool(processes=1)
    futures = [pool.apply_async(timeout_function_wrapper, [foo, i, 3]) for i in [100, 0, 100,0]]

    for i, x in enumerate(futures):
        try:
            res = x.get()  # timeout not needed here, it's in the wrapper
            print("Res", i, res)
        except multiprocessing.TimeoutError as e:
            print("Timeout")
    pool.close()
    pool.join()

    print("Took {}".format(time.time() - t1))

You will then get your 4 results back right away, unlike in the situation where you specified maxtasksperchild=1, but the program will not reach the print statement until the threads created by the pool process terminates allowing the pool process itself to terminate. Since these threads are running in parallel, the total time will be 103 seconds rather than 203 seconds.

  • Related