Using Python multiprocessing I want to capture processes discard them and continue to the next process.
In the example below I have a list of 1's and 0's as inputs. The 0's will initiate a sleep function to trigger the timeout error. Processes triggering the timeout are reexecuted and therefore the script will run forever.
How can I catch a TimeOut error, kill the process that caused it and prevent this process from reexecution? It's important that I can do this using imap.
import time
import multiprocessing as mp
def a_func(x):
print(x)
if x:
return x
# Function sleeps before returning
# to trigger timeout error
else:
time.sleep(2.0)
return x
if __name__ == "__main__":
solutions = []
# Inputs sum to 4
inputs = [1, 1, 0, 1, 1, 0]
with mp.get_context("spawn").Pool(1) as pool:
futures_res = pool.imap(a_func, inputs)
idx = 0
for s in (inputs):
try:
res = futures_res.next(timeout=0.1)
# If successful (no time out), append the result
solutions.append(res)
except mp.context.TimeoutError:
print(s, "err")
# Catch time out error
# I want this to also prevent the process from being executed again
# solutions.append(0.0)
# Should print 4
print(len(solutions))
print(solutions)
CodePudding user response:
You may be somewhat confused on how imap
works with timeouts or you haven't expressed your question clearly or I am confused. So let's take it from the top:
For the purpose of determining whether a multiprocessing.TimeoutError
exception will be thrown when you do a next(timeout=some_value)
on the iterator returned by imap
, timing begins when the task is taken off the queue by a process for execution. So if you have only one process in the pool and 6 tasks submitted, there is no parallel processing being performed and the third task, for example, will not start until the second task has completed and that is when timing for the third task will begin and not from the start of submission of all the tasks.
But when you get a timeout exception, nothing actually happens to the task being executed -- it continues to be executed. You are only iterating the return value from imap
6 times. But if you iterated indefinitely until you got a StopIteration
exception, you would eventually see that all tasks eventually completed and returned a value, possibly throwing multiple timeout errors along the way.
One solution is to keep removing from the inputs
list the input value corresponding to the task whose result you are iterating but as soon as you get a timeout exception you terminate the remaining tasks in the pool if any and if there are any inputs still left in the inputs
list, rerun imap
with the new inputs
list.
Three points: When you terminate the pool, the process in the pool may have already started to execute the next task on the input queue. So this needs to be a task that is re-startable. You also need to be passing to imap
a copy of the inputs list since imap
"lazily" evaluates the pasaed iterable and you will be modifying the inputs
list while you are iterating the return value from imap
and imap
would otherwise still be evaluating inputs
if you did not pass a copy. You should pass a slightly larger timeout value than .1 since on my desktop even when passing values of 1 to the worker function I still got a timeout exception from time to time.
import time
import multiprocessing as mp
def a_func(x):
print(x)
if x:
return x
# Function sleeps before returning
# to trigger timeout error
else:
time.sleep(2.0)
return x
if __name__ == "__main__":
solutions = []
# Inputs sum to 4
inputs = [1, 1, 0, 1, 1, 0]
while inputs:
with mp.get_context("spawn").Pool(1) as pool:
futures_res = pool.imap(a_func, inputs.copy())
while inputs:
s = inputs.pop(0)
try:
res = futures_res.next(timeout=.5)
# If successful (no time out), append the result
solutions.append(res)
except mp.context.TimeoutError:
print(s, "err")
break
# Should print 4
print(len(solutions))
print(solutions)
Prints:
1
1
0
0 err
1
1
0
0 err
4
[1, 1, 1, 1]