Home > Net >  How to kill all threads conditioned on status of on thread?
How to kill all threads conditioned on status of on thread?

Time:12-27

I have n threads running simultaneously. These threads are processing a list containing m test cases. For example, thread n-1 is working on item m[i-1] while thread n is working on item m[i]. I want to stop all threads if for example thread n-1 failed or return a signal. How can I achieve this?

Here is a MWE:

This is my processing function

def process(input_addr):
    i =  1
    print('Total number of executed unit tests: {}'.format(i))
    print("executed {}. thread".format(input_addr))
    try:
        command = 'python3 ' input_addr
        result = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
        msg, err = result.communicate()
        if msg.decode('utf-8') != '':
            stat = parse_shell(msg.decode('utf-8'))
            if stat:
                print('Test Failed')
                return True
        else:
            stat = parse_shell(err)                
            if stat:
                print('Test Failed')
                return True
    except Exception as e:
        print("thread.\nMessage:{1}".format(e))
 

Here is my pool:

def pre_run_test_files(self):
    with Pool(10) as p:
       p.map(process, self.test_files)

I am using:

from multiprocessing import Pool

Thanks.

CodePudding user response:

I found the solution:


def process(i, input_addr, event):
    kill_flag = False
    if not event.is_set():
        print('Total number of executed unit tests: {}'.format(i))
        print("executed {}. thread".format(input_addr))
        try:
            command = 'python3 ' input_addr
            result = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
            msg, err = result.communicate()
            if msg.decode('utf-8') != '':
                stat = parse_shell(msg.decode('utf-8'))
                if stat:
                    print('Test Failed')
                    kill_flag = True
                    # all_run.append(input_addr)
                    #write_list_to_txt(input_addr, valid_tests)
                else:
                    kill_flag = False
            else:
                stat = parse_shell(err)                
                if stat:
                    print('Test Failed')
                    kill_flag = True
                    # all_run.append(input_addr)
                    #write_list_to_txt(input_addr, valid_tests)
                else:
                    kill_flag = False
        except Exception as e:
            print("thread.\nMessage:{1}".format(e))
    if kill_flag:
        event.set()
def manager():
     p= multiprocessing.Pool(10) 
     m = multiprocessing.Manager()
     event = m.Event()
     for i,f in enumerate(self.test_files):
         p.apply_async(process, (i, f, event))
     p.close()
     event.wait()
     p.terminate()

CodePudding user response:

You can have your worker function, process simply raise an exception and use an error_callback function with apply_async that calls terminate on the pool as in the following demo:

from multiprocessing import Pool

def process(i):
    import time

    time.sleep(1)
    if i == 6:
        raise ValueError(f'Bad value: {i}')
    print(i, flush=True)

def my_error_callback(e):
    pool.terminate()
    print(e)

if __name__ == '__main__':
    pool = Pool(4)
    for i in range(20):
        pool.apply_async(process, args=(i,), error_callback=my_error_callback)
    # wait for all tasks to complete
    pool.close()
    pool.join()

Prints:

0
1
3
2
4
5
7
Bad value: 6

You should be able to adapt the above code to your particular problem.

  • Related