Given the following simple method that causes a TimeoutError
at some point. How can I determine what were the conditions (e.g. arguments or something else) that caused the timeout?
In [82]: def f(i):
...: print(i)
...: if i == 2: time.sleep(1)
...: return "done"
...:
In [83]: with ThreadPoolExecutor(max_workers=10) as ex:
...: res = ex.map(f, range(10), timeout=0.001)
...: try:
...: for r in res:
...: print(f"{r=}")
...: except Exception as e:
...: print(f"{e=}")
...:
0
1
2
3
4
5
6
8
7
r='done'
9
r='done'
e=TimeoutError()
Exception handling for TimeoutError
in f
does not work as the TimeoutError
is raised when iterating through the result set.
In [84]: with ThreadPoolExecutor(max_workers=10) as ex:
...: res = ex.map(f, range(10), timeout=0.001)
...: for r in res:
...: print(f"{r=}")
...:
0
1
2
3
4
5
6
7
r='done'
r='done'
8
9
---------------------------------------------------------------------------
TimeoutError Traceback (most recent call last)
Cell In [84], line 3
1 with ThreadPoolExecutor(max_workers=10) as ex:
2 res = ex.map(f, range(10), timeout=0.001)
----> 3 for r in res:
4 print(f"{r=}")
File /Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py:621, in Executor.map.<locals>.result_iterator()
619 yield fs.pop().result()
620 else:
--> 621 yield fs.pop().result(end_time - time.monotonic())
622 finally:
623 for future in fs:
File /Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py:446, in Future.result(self, timeout)
444 return self.__get_result()
445 else:
--> 446 raise TimeoutError()
447 finally:
448 # Break a reference cycle with the exception in self._exception
449 self = None
TimeoutError:
CodePudding user response:
The exception itself does not carry any relevant information. However, as the documentation says:
The returned iterator raises a
concurrent.futures.TimeoutError
if__next__()
is called and the result isn’t available after timeout seconds from the original call toExecutor.map()
So the exception is always associated with the next element you tried to retrieve. If you keep track of the arguments, you can tell which one caused the issue.
import time
from concurrent import futures
def f(i):
time.sleep(i)
return i
def run_checked(fun, args):
args = list(args)
idx = -1
with futures.ThreadPoolExecutor(max_workers=4) as tp:
try:
for idx, result in enumerate(tp.map(fun, args, timeout=4)):
print(result)
except futures.TimeoutError:
print("Timeout on arguments '%s'" % args[idx 1])
if __name__ == '__main__':
run_checked(f, range(10))