Home > database >  process.join() hangs when using Queue based multi process
process.join() hangs when using Queue based multi process

Time:03-02

The following code is intended to create a process having images data til the process finish. Also these processes are intended to be terminated after the input queue q becomes empty.

import multiprocessing
import numpy as np

def run_mp(images, f_work, n_worker):
    q = multiprocessing.Queue()
    for img_idx in range(len(images)):
        q.put(img_idx)

    result_q = multiprocessing.Queue()

    def f_worker(worker_index, data, q, result_q):
        print("worker {} started".format(worker_index))
        while not q.empty():
            image_idx = q.get(timeout=1)
            print('processing image idx {}'.format(image_idx))
            image_out = f_work(data, image_idx)
            result_q.put((image_out, image_idx))
        print("worker {} finished".format(worker_index))
        return

    processes = list()
    for i in range(n_worker):
        process = multiprocessing.Process(target=f_worker, args=(i, images, q, result_q))
        process.daemon = True
        process.start()
        processes.append(process)

    for process in processes:
        process.join()


images = [np.random.randn(100, 100) for _ in range(20)]

f = lambda image_list, idx: image_list[idx]   np.random.randn()
run_mp(images, f, 2)

After running code, from the embedded print functions, I confirmed that all 20 images are processed and two process were terminated. However, the programs hangs. When I hit ctrl-c I got the following message, from which hang occurs at os.waitpid.

How can I fix this problem?

worker 0 started
processing image idx 0
processing image idx 1
worker 1 started
processing image idx 2
processing image idx 3
processing image idx 4
processing image idx 5
processing image idx 6
processing image idx 7
processing image idx 8
processing image idx 9
processing image idx 10
processing image idx 11
processing image idx 12
processing image idx 13
processing image idx 14
processing image idx 15
processing image idx 16
processing image idx 17
processing image idx 18
processing image idx 19
worker 1 finished
^CTraceback (most recent call last):
  File "example_queue.py", line 35, in <module>
    run_mp(images, f, 2)
  File "example_queue.py", line 29, in run_mp
    process.join()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt

CodePudding user response:

First, be aware that your code will only work on platforms that use OS fork to create new processes because:

  1. The code that creates new processes is not contained within a if __name__ == '__main__': block.
  2. Your worker function, f_worker, is not at global scope.

If you read the documentation on multiprocessing.Queue, you will find that the call to q.empty() is totally unreliable and, fortunately, unnecessary because there is a solution that makes this call unnecessary. Also, if you are interested in the actual results that are being put to the result_q, be aware that you must get those results in the main process before the main process attempts to join the subprocesses that have done the putting to the queue. The documentation explains that, too.

The solution that avoids having to use the undependable call to q.empty() is to write to the "task queue", q in this case, following the actually items that need to be processed by your subprocesses, N sentinel items that cannot be mistaken for actual items to be processed but are instead special signals to the subprocesses that there are no more items to be processed. N is, of course, just the number of subprocesses that are reading from the task queue so as each subprocess reads a sentinel item it terminates and there are enough sentinel items to signal each subprocess. In this case None serves perfectly as a suitable sentinel item:

import multiprocessing
import numpy as np

def run_mp(images, f_work, n_worker):

    def f_worker(worker_index, data, q, result_q):
        print("worker {} started".format(worker_index))
        while True:
            image_idx = q.get() # Blocking get
            if image_idx is None: # Sentinel?
                break # We are done!
            print('processing image idx {}'.format(image_idx))
            image_out = f_work(data, image_idx)
            result_q.put((image_out, image_idx))
        print("worker {} finished".format(worker_index))
        return

    q = multiprocessing.Queue()
    for img_idx in range(len(images)):
        q.put(img_idx)

    # Add sentinels:
    for _ in range(n_worker):
        q.put(None)

    result_q = multiprocessing.Queue()

    processes = list()
    for i in range(n_worker):
        process = multiprocessing.Process(target=f_worker, args=(i, images, q, result_q))
        # We do not need daemon processes now:
        #process.daemon = True
        process.start()
        processes.append(process)

    # If we are interested in the results, we must process the result queue
    # before joining the processes. We are expecting 20 results, so:
    results = [result_q.get() for _ in range(20)]
    print(results)

    for process in processes:
        process.join()


images = [np.random.randn(100, 100) for _ in range(20)]

f = lambda image_list, idx: image_list[idx]   np.random.randn()
run_mp(images, f, 2)
  • Related