I have a multiprocessing setup that handles a long running task by appending all calculated values to lst
. It looks roughly like this:
from multiprocessing import Pool
from time import sleep
def fun(_):
lst = [] # list that will be returned
for i in range(200):
lst.append(i)
if not i % 10:
sleep(0.1) # 'long task', cause a KeyboardInterrupt in this time
return lst
if __name__ == '__main__':
master = []
processes = 2
for result in Pool(processes).imap_unordered(fun, range(processes)):
master.append(result)
print(master)
I want to be able to cause a KeyboardInterrupt
and have the processes return the list they worked on, even if they are not done yet, as each iteration just adds a new sublist.
(My actual data looks roughly like lst = ([], [[], ...], [[], ...])
, every empty list contains ints only, the actual function would return lst1, lst2, lst3
)
I have tried to envelop the whole main part in try: except:
like so:
try:
for result in Pool(processes).imap_unordered(fun, range(processes)):
master.append(result)
except KeyboardInterrupt:
# somehow retrieve the values here
pass
I have however not come to any possible solution this way. How can I tell the processes it's time to exit early and return me their current result?
Edit to show the actual structure: main.py:
from other import Other
class Something:
def __init__(self):
pass # stuff here
def spawner(self):
for result in Pool(processes=self.processes).imap_unordered(self.loop, range(self.processes)):
pass # do stuff with the data
def loop(self, _):
# setup stuff
Other(setup_stuff).start()
other.py
class Other:
def __init__(self):
pass # more stuff
def start(self):
lst1, lst2, lst3 = [], [], []
for _ in range(self.episodes):
pass # do the actual computation
return lst1, lst2, lst3
CodePudding user response:
Maybe you can use multiprocessing.Queue
instead of a list
to return variables. Set-up one queue at the beginning and all processes will write to the queue.
At the end, read all values from the queue.
from time import sleep
from multiprocessing import Pool, Queue
q = None
def set_global_data(queue):
global q
q = queue
def fun(_):
for i in range(200):
q.put_nowait(i)
if not i % 10:
sleep(0.1) # 'long task', cause a KeyboardInterrupt in this time
# nothing is returned
if __name__ == "__main__":
master = Queue()
processes = 2
try:
with Pool(processes, set_global_data, (master,)) as p:
for result in p.imap_unordered(fun, range(processes)):
pass
except KeyboardInterrupt:
pass
while not master.empty():
v = master.get_nowait()
print(v)
EDIT: With multiple files:
main.py
from other import Other
from multiprocessing import Pool, Queue
class Something:
def __init__(self):
pass # stuff here
def spawner(self):
master = Queue()
try:
with Pool(2, Something.set_global_data, (master,)) as p:
for _ in p.imap_unordered(self.loop, range(2)):
pass
except KeyboardInterrupt:
pass
while not master.empty():
v = master.get_nowait()
print(v)
def loop(self, _):
# setup stuff
Other().start()
@staticmethod
def set_global_data(queue):
Other.q = queue
s = Something()
s.spawner()
other.py
from time import sleep
class Other:
q = None
def __init__(self):
pass # more stuff
def start(self):
for i in range(200):
Other.q.put_nowait(i)
if not i % 10:
sleep(0.1)