I have n
threads running simultaneously. These threads are processing a list containing m
test cases. For example, thread n-1
is working on item m[i-1]
while thread n
is working on item m[i]
. I want to stop all threads if for example thread n-1
failed or return a signal. How can I achieve this?
Here is a MWE:
This is my processing function
def process(input_addr):
i = 1
print('Total number of executed unit tests: {}'.format(i))
print("executed {}. thread".format(input_addr))
try:
command = 'python3 ' input_addr
result = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
msg, err = result.communicate()
if msg.decode('utf-8') != '':
stat = parse_shell(msg.decode('utf-8'))
if stat:
print('Test Failed')
return True
else:
stat = parse_shell(err)
if stat:
print('Test Failed')
return True
except Exception as e:
print("thread.\nMessage:{1}".format(e))
Here is my pool:
def pre_run_test_files(self):
with Pool(10) as p:
p.map(process, self.test_files)
I am using:
from multiprocessing import Pool
Thanks.
CodePudding user response:
I found the solution:
def process(i, input_addr, event):
kill_flag = False
if not event.is_set():
print('Total number of executed unit tests: {}'.format(i))
print("executed {}. thread".format(input_addr))
try:
command = 'python3 ' input_addr
result = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
msg, err = result.communicate()
if msg.decode('utf-8') != '':
stat = parse_shell(msg.decode('utf-8'))
if stat:
print('Test Failed')
kill_flag = True
# all_run.append(input_addr)
#write_list_to_txt(input_addr, valid_tests)
else:
kill_flag = False
else:
stat = parse_shell(err)
if stat:
print('Test Failed')
kill_flag = True
# all_run.append(input_addr)
#write_list_to_txt(input_addr, valid_tests)
else:
kill_flag = False
except Exception as e:
print("thread.\nMessage:{1}".format(e))
if kill_flag:
event.set()
def manager():
p= multiprocessing.Pool(10)
m = multiprocessing.Manager()
event = m.Event()
for i,f in enumerate(self.test_files):
p.apply_async(process, (i, f, event))
p.close()
event.wait()
p.terminate()
CodePudding user response:
You can have your worker function, process
simply raise an exception and use an error_callback function with apply_async
that calls terminate
on the pool as in the following demo:
from multiprocessing import Pool
def process(i):
import time
time.sleep(1)
if i == 6:
raise ValueError(f'Bad value: {i}')
print(i, flush=True)
def my_error_callback(e):
pool.terminate()
print(e)
if __name__ == '__main__':
pool = Pool(4)
for i in range(20):
pool.apply_async(process, args=(i,), error_callback=my_error_callback)
# wait for all tasks to complete
pool.close()
pool.join()
Prints:
0
1
3
2
4
5
7
Bad value: 6
You should be able to adapt the above code to your particular problem.