Home > Back-end >  Kill all "workers" on "listener" error (multiprocessing, manager and queue set-u
Kill all "workers" on "listener" error (multiprocessing, manager and queue set-u

Time:11-24

I'm using multiprocessing to run workers on different files in parallel. Worker's results are put into queue. A listener gets the results from the queue and writes them to the file.

Sometimes listener might run into errors (of various origins). In this case, the listener silently dies, but all other processes continue running (rather surprisingly, worker errors causes all processes to terminate).

I would like to stop all processes (workers, listener, e.t.c.) when listener catches an error. How this can be done?

The scheme of my code is as follows:

def worker(file_path, q):
    ## do something
    q.put(1.)
    return True

def listener(q):
    while True:
        m = q.get()
            if m == 'kill':
                break
            else:
                try:
                    # do something and write to file
                except Exception as err:
                    # raise error
                    tb = sys.exc_info()[2]
                    raise err.with_traceback(tb)

def main():
    manager = mp.Manager()
    q = manager.Queue(maxsize=3)
    with mp.Pool(5) as pool:
        watcher = pool.apply_async(listener, (q,))
        files = ['path_1','path_2','path_3'] 
        jobs = [ pool.apply_async(worker, (p,q,)) for p in files ]
        
        # fire off workers
        for job in jobs: 
            job.get()
        # kill the listener when done
        q.put('kill')

# run
if __name__ == "__main__":
   main()

I tried introducing event = manager.Event() and using it as a flag in main():

## inside the pool, after starting workers
while True:
    if event.is_set():
        for job in jobs:
            job.terminate()

No success. Calling os._exit(1) in listener exception block rises broken pipe error, but processes are not killed.

I also tried setting daemon = True,

for job in jobs:
    job.daemon = True

Did not help.

In fact, to handle listener exceptions, I'm using a callable, as required by apply_async (so that they are not entirely silenced). This complicates the situation, but not much.

Thank you in advance.

CodePudding user response:

As always there are many ways to accomplish what you're after, but I would probably suggest using an Event to signal that the processes should quit. I also would not use a Pool in this instance, as it only really simplifies things for simple cases where you need something like map. More complicated use cases quickly make it easier to just build you own "pool" with the functionality you need.

from multiprocessing import Process, Queue, Event
from random import random


def might_fail(a):
    assert(a > .001)

def worker(args_q: Queue, result_q: Queue, do_quit: Event):
    try:
        while not do_quit.is_set():
            args = args_q.get()
            if args is None:
                break
            else:
                # do something
                result_q.put(random())
    finally: #signal that worker is exiting even if exception is raised
        result_q.put(None) #signal listener that worker is exiting

def listener(result_q: Queue, do_quit: Event, n_workers: int):
    n_completed = 0
    while n_workers > 0:
        res = result_q.get()
        if res is None:
            n_workers -= 1
        else:
            n_completed  = 1
            try:
                might_fail(res)
            except:
                do_quit.set() #let main continue
                print(n_completed)
                raise #reraise error after we signal others to stop
    do_quit.set() #let main continue
    print(n_completed)

if __name__ == "__main__":
    args_q = Queue()
    result_q = Queue()
    do_quit = Event()
    n_workers = 4

    listener_p = Process(target=listener, args=(result_q, do_quit, n_workers))
    listener_p.start()

    for _ in range(n_workers):
        worker_p = Process(target=worker, args=(args_q, result_q, do_quit))
        worker_p.start()

    for _ in range(1000):
        args_q.put("some/file.txt")

    for _ in range(n_workers):
        args_q.put(None)

    do_quit.wait()
    print('done')
  • Related