Home > Software design >  Python multiprocessing Queue Process hanging
Python multiprocessing Queue Process hanging

Time:08-05

I am trying to use multiprocessing.Queue to manage some tasks that are sent by the main process and picked up by "worker" processes (multiprocessing.Process). The workers then run the task and put the results into a result queue.

Here is my main script:

from multiprocessing import Process, Queue, freeze_support
import auxiliaries as aux
import functions

if __name__ == '__main__':
    freeze_support()
    
    start = time.perf_counter()
    # number of processess
    nprocs = 3

    # define the tasks
    tasks = [(functions.get_stats_from_uniform_dist, (2**23, i)) for i in range(600)]

    # start the queues
    task_queue = Queue()
    result_queue = Queue()

    # populate task queue
    for task in tasks:
        task_queue.put(task)

    # after all tasks are in the queue, send a message to stop picking...
    for _ in range(nprocs):
        task_queue.put('STOP')

    # start workers 
    procs = []
    for _ in range(nprocs):
        p = Process(target=aux.worker, args=(task_queue, result_queue))
        p.start()
        procs.append(p)

    for p in procs:
        p.join()

    # print what's in the result queue
    while not result_queue.empty():
        print(result_queue.get())

The imported modules are

auxiliaries.py

from multiprocessing import current_process

def calculate(func, args):
    """
    Calculates a certain function for a list of arguments. Returns a string with the result.

    Arguments:
        - func (string): function name
        - args (list): list of arguments
    """
    result = func(*args)
    string = current_process().name
    string = string   " says "   func.__name__   str(args)
    string = string   " = "   str(result)
    return string


def worker(inputQueue, outputQueue):
    """
    Picks up work from the inputQueue and outputs result to outputQueue.

    Inputs:
        - inputQueue (multiprocessing.Queue)
        - outputQueue (multiprocessing.Queue)
    """
    for func, args in iter(inputQueue.get, 'STOP'):
        result = calculate(func, args)
        outputQueue.put(result)

and functions.py

import numpy as np

def get_stats_from_uniform_dist(nDraws, seed):
    """
    Calculates average and standard deviation of nDraws from NumPy's random.rand().

    Arguments:
        - nDraws (int): number of elements to draw
        - seed (int): random number generator's seed

    Returns:
        - results (list): [average, std]
    """
    np.random.seed(seed)
    x = np.random.rand(nDraws)
    return [x.mean(), x.std()]

This is entirely based on https://docs.python.org/3/library/multiprocessing.html#multiprocessing-examples

Everything runs okay with up to ~500 tasks. After that, the code hangs. It's looking like one of the processes never finishes so it gets stuck when I join them. It does not look like the queues are getting full. I suspect that one of the processes is not finding the "STOP" entry in the task_queue, so it keeps trying to .get() forever, but I can't understand how and why that would happen. Any ideas on what could be causing the lock? Thanks!

CodePudding user response:

You'll probably have a better time just using the higher-level Pool.imap_unordered() method, which does all of that for you.

from multiprocessing import Pool

def get_stats_from_uniform_dist(task):
    nDraws, seed = task
    # ...

if __name__ == '__main__':
    with multiprocessing.Pool(nprocs) as p:
        tasks = [(2**23, i) for i in range(600)]
        results = list(p.imap_unordered(functions.get_stats_from_uniform_dist, tasks, chunksize=10))

CodePudding user response:

The problem is here:

if __name__ == '__main__':
    .
    .
    .

    # start workers 
    procs = []
    for _ in range(nprocs):
        p = Process(target=aux.worker, args=(task_queue, result_queue))
        p.start()
        procs.append(p)

    for p in procs:
        p.join()

    # print what's in the result queue
    while not result_queue.empty():
        print(result_queue.get())

or, more specifically, the fact that I am joining the processes before the result_queue is drained. As @Charchit pointed out, this is actually mentioned in the documentation:

Joining processes that use queues

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.

The solution I'm using is, at the expense of spawning yet another process, employing a Manager.Queue() instead, as suggested here. The main script is then

from multiprocessing import Process, freeze_support, Manager
import auxiliaries as aux
import functions

if __name__ == '__main__':
    freeze_support()
    
    # number of processess
    nprocs = 3

    # define the tasks
    tasks = [(functions.get_stats_from_uniform_dist, (2**18, i)) for i in range(1000)]

    # use a manager context to share queues between processes
    manager = Manager()
    task_queue = manager.Queue()
    result_queue = manager.Queue()

    # populate task queue
    for task in tasks:
        task_queue.put(task)

    # after all tasks are in the queue, send a message to stop picking...
    for _ in range(nprocs):
        task_queue.put('STOP')

    # start processes (workers)
    procs = []
    for _ in range(nprocs):
        p = Process(target=aux.worker, args=(task_queue, result_queue))
        p.start()
        procs.append(p)

    # wait until workers are done
    for p in procs:
        p.join()

    # print what's in the result queue
    while not result_queue.empty():
        print(result_queue.get())
  • Related