I am trying to use multiprocessing.Queue
to manage some tasks that are sent by the main process and picked up by "worker" processes (multiprocessing.Process
). The workers then run the task and put the results into a result queue.
Here is my main script:
from multiprocessing import Process, Queue, freeze_support
import auxiliaries as aux
import functions
if __name__ == '__main__':
freeze_support()
start = time.perf_counter()
# number of processess
nprocs = 3
# define the tasks
tasks = [(functions.get_stats_from_uniform_dist, (2**23, i)) for i in range(600)]
# start the queues
task_queue = Queue()
result_queue = Queue()
# populate task queue
for task in tasks:
task_queue.put(task)
# after all tasks are in the queue, send a message to stop picking...
for _ in range(nprocs):
task_queue.put('STOP')
# start workers
procs = []
for _ in range(nprocs):
p = Process(target=aux.worker, args=(task_queue, result_queue))
p.start()
procs.append(p)
for p in procs:
p.join()
# print what's in the result queue
while not result_queue.empty():
print(result_queue.get())
The imported modules are
auxiliaries.py
from multiprocessing import current_process
def calculate(func, args):
"""
Calculates a certain function for a list of arguments. Returns a string with the result.
Arguments:
- func (string): function name
- args (list): list of arguments
"""
result = func(*args)
string = current_process().name
string = string " says " func.__name__ str(args)
string = string " = " str(result)
return string
def worker(inputQueue, outputQueue):
"""
Picks up work from the inputQueue and outputs result to outputQueue.
Inputs:
- inputQueue (multiprocessing.Queue)
- outputQueue (multiprocessing.Queue)
"""
for func, args in iter(inputQueue.get, 'STOP'):
result = calculate(func, args)
outputQueue.put(result)
and functions.py
import numpy as np
def get_stats_from_uniform_dist(nDraws, seed):
"""
Calculates average and standard deviation of nDraws from NumPy's random.rand().
Arguments:
- nDraws (int): number of elements to draw
- seed (int): random number generator's seed
Returns:
- results (list): [average, std]
"""
np.random.seed(seed)
x = np.random.rand(nDraws)
return [x.mean(), x.std()]
This is entirely based on https://docs.python.org/3/library/multiprocessing.html#multiprocessing-examples
Everything runs okay with up to ~500 tasks. After that, the code hangs. It's looking like one of the processes never finishes so it gets stuck when I join them.
It does not look like the queues are getting full. I suspect that one of the processes is not finding the "STOP" entry in the task_queue
, so it keeps trying to .get()
forever, but I can't understand how and why that would happen. Any ideas on what could be causing the lock? Thanks!
CodePudding user response:
You'll probably have a better time just using the higher-level Pool.imap_unordered()
method, which does all of that for you.
from multiprocessing import Pool
def get_stats_from_uniform_dist(task):
nDraws, seed = task
# ...
if __name__ == '__main__':
with multiprocessing.Pool(nprocs) as p:
tasks = [(2**23, i) for i in range(600)]
results = list(p.imap_unordered(functions.get_stats_from_uniform_dist, tasks, chunksize=10))
CodePudding user response:
The problem is here:
if __name__ == '__main__':
.
.
.
# start workers
procs = []
for _ in range(nprocs):
p = Process(target=aux.worker, args=(task_queue, result_queue))
p.start()
procs.append(p)
for p in procs:
p.join()
# print what's in the result queue
while not result_queue.empty():
print(result_queue.get())
or, more specifically, the fact that I am joining the processes before the result_queue
is drained.
As @Charchit pointed out, this is actually mentioned in the documentation:
Joining processes that use queues
Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)
This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.
The solution I'm using is, at the expense of spawning yet another process, employing a Manager.Queue()
instead, as suggested here. The main script is then
from multiprocessing import Process, freeze_support, Manager
import auxiliaries as aux
import functions
if __name__ == '__main__':
freeze_support()
# number of processess
nprocs = 3
# define the tasks
tasks = [(functions.get_stats_from_uniform_dist, (2**18, i)) for i in range(1000)]
# use a manager context to share queues between processes
manager = Manager()
task_queue = manager.Queue()
result_queue = manager.Queue()
# populate task queue
for task in tasks:
task_queue.put(task)
# after all tasks are in the queue, send a message to stop picking...
for _ in range(nprocs):
task_queue.put('STOP')
# start processes (workers)
procs = []
for _ in range(nprocs):
p = Process(target=aux.worker, args=(task_queue, result_queue))
p.start()
procs.append(p)
# wait until workers are done
for p in procs:
p.join()
# print what's in the result queue
while not result_queue.empty():
print(result_queue.get())