Home > Net >  Return result of process after an exception is triggered
Return result of process after an exception is triggered

Time:09-28

I have a multiprocessing setup that handles a long running task by appending all calculated values to lst. It looks roughly like this:

from multiprocessing import Pool
from time import sleep


def fun(_):
    lst = []  # list that will be returned
    for i in range(200):
        lst.append(i)
        if not i % 10:
            sleep(0.1)  # 'long task', cause a KeyboardInterrupt in this time
    return lst


if __name__ == '__main__':
    master = []
    processes = 2
    for result in Pool(processes).imap_unordered(fun, range(processes)):
        master.append(result)
    print(master)

I want to be able to cause a KeyboardInterrupt and have the processes return the list they worked on, even if they are not done yet, as each iteration just adds a new sublist. (My actual data looks roughly like lst = ([], [[], ...], [[], ...]), every empty list contains ints only, the actual function would return lst1, lst2, lst3)

I have tried to envelop the whole main part in try: except: like so:

try:
    for result in Pool(processes).imap_unordered(fun, range(processes)):
        master.append(result)
except KeyboardInterrupt:
    # somehow retrieve the values here
    pass

I have however not come to any possible solution this way. How can I tell the processes it's time to exit early and return me their current result?

Edit to show the actual structure: main.py:


from other import Other

class Something:
    def __init__(self):
        pass  # stuff here
    
    def spawner(self):
        for result in Pool(processes=self.processes).imap_unordered(self.loop, range(self.processes)):
            pass  # do stuff with the data

    def loop(self, _):
        # setup stuff
        Other(setup_stuff).start()

other.py


class Other:
    def __init__(self):
        pass  # more stuff

    def start(self):
        lst1, lst2, lst3 = [], [], []
        for _ in range(self.episodes):
            pass  # do the actual computation
        return lst1, lst2, lst3

CodePudding user response:

Maybe you can use multiprocessing.Queue instead of a list to return variables. Set-up one queue at the beginning and all processes will write to the queue.

At the end, read all values from the queue.

from time import sleep
from multiprocessing import Pool, Queue

q = None


def set_global_data(queue):
    global q
    q = queue


def fun(_):
    for i in range(200):
        q.put_nowait(i)
        if not i % 10:
            sleep(0.1)  # 'long task', cause a KeyboardInterrupt in this time
    # nothing is returned


if __name__ == "__main__":
    master = Queue()
    processes = 2

    try:
        with Pool(processes, set_global_data, (master,)) as p:
            for result in p.imap_unordered(fun, range(processes)):
                pass
    except KeyboardInterrupt:
        pass

    while not master.empty():
        v = master.get_nowait()
        print(v)

EDIT: With multiple files:

main.py

from other import Other
from multiprocessing import Pool, Queue


class Something:
    def __init__(self):
        pass  # stuff here

    def spawner(self):
        master = Queue()

        try:
            with Pool(2, Something.set_global_data, (master,)) as p:
                for _ in p.imap_unordered(self.loop, range(2)):
                    pass
        except KeyboardInterrupt:
            pass

        while not master.empty():
            v = master.get_nowait()
            print(v)

    def loop(self, _):
        # setup stuff
        Other().start()

    @staticmethod
    def set_global_data(queue):
        Other.q = queue


s = Something()
s.spawner()

other.py

from time import sleep


class Other:
    q = None

    def __init__(self):
        pass  # more stuff

    def start(self):
        for i in range(200):
            Other.q.put_nowait(i)
            if not i % 10:
                sleep(0.1)
  • Related