I want to end my script when one of the child processes reaches a specific point. Let's suppose I have the following code:
import multiprocessing
import time
import sys
def another_child_process (my_queue):
time.sleep(3)
my_queue.put("finish")
def my_process(my_queue):
while True:
if my_queue.empty() is False:
my_queue.get()
print("Killing the program...")
### THIS IS WHERE I WANT TO KILL MAIN PROCESS AND EXIT
sys.exit(0)
def main():
## PARENT PROCESS WHICH I WANT TO KILL FROM THE CHILD
my_queue = multiprocessing.Queue()
child_process = multiprocessing.Process(target=my_process, args=(my_queue,))
another_process = multiprocessing.Process(target=another_child_process, args=(my_queue,))
child_process.start()
another_process.start()
while True:
pass ## I want to end the program in the child process
if __name__=="__main__":
main()
I have read something about using signals, but they are for Linux mainly and I don't know well how to use them in Windows. I'm a begginer in Python in reality. How could I end the script completely? Thank you so much in advance for your time
CodePudding user response:
First of all, if you read the documentation carefully, you will see that calling method is_empty
on a multiprocessing.Queue
is not reliable and should not be used. Moreover, you have a race condition. That is, if my_process
runs before another_child_process
(and assuming is_empty
was reliable), it would find the queue empty and terminate prematurely because another_child_process
has not yet had the chance to put any items on the queue. So what you should be doing is having another_child_process
put whatever messages it wants on the queue but then put an additional sentinel item whose purpose is to signal that there are no more items that will be put on the queue. Thus the sentinel serves as a quasi end-of-file indicator. You can use any distinct object to serve as the sentinel as long as it cannot be taken for a "real" data item. In this case we will use None
as the sentinel.
But the actual example you have coded is not a realistic example demonstrating why you would need some special mechanism to terminate the main process and exit since once another_process
has put its items on the queue, it returns and thus the process terminates and once my_process
detects that it has retrieved all the items from the queue and that there will be no more, it returns and thus its process terminates. Thus, all the main process has to do is issue a calls to join
on the two subprocesses and wait for them to complete and then exit:
import multiprocessing
import time
import sys
def another_child_process (my_queue):
time.sleep(3)
my_queue.put("finish")
my_queue.put(None)
def my_process(my_queue):
while True:
item = my_queue.get()
if item is None:
break
print('Item:', item)
def main():
## PARENT PROCESS WHICH I WANT TO KILL FROM THE CHILD
my_queue = multiprocessing.Queue()
child_process = multiprocessing.Process(target=my_process, args=(my_queue,))
another_process = multiprocessing.Process(target=another_child_process, args=(my_queue,))
child_process.start()
another_process.start()
child_process.join()
another_process.join()
if __name__=="__main__":
main()
Prints:
Item: finish
Here is perhaps a better example. another_child_process
gets data in some fashion (for demo purposes we have a generator function, get_data
). If no abnormal situation occurs, it puts all the data on the queue for my_process
to get followed by the None
sentinel item so my_process
knows that there is no more data forthcoming and it can terminate. But let's suppose that there is the possibility of get_data
producing a special, abnormal data item, the string 'finish' for demo purposes. In that case another_child_process
will terminate immediately. At that point, however, there be many items on the queue that my_process
has not retrieved and processed yet. We would like to force the termination of my_process
immediately so that the main process can immediately join
the subprocesses and terminate.
To accomplish this we pass an event to a daemon thread started by the main process, which waits for the event to be set. If the event is set by another_child_process
, to which we have also passed the event, the thread will immediately terminate the my_process
process:
import multiprocessing
import time
import sys
def get_data():
for item in ['a', 'b', 'c', 'finish', 'd', 'e', 'f', 'g']:
yield item
def another_child_process(my_queue, exit_event):
for item in get_data():
if item == 'finish':
# Abnormal condition where we must exit imemediately.
# Immediately signal main process terminate:
exit_event.set()
# And we terminate:
return
my_queue.put(item)
# Normal situation where we just continue
# Put in sentinel signifying no more data:
my_queue.put(None)
def my_process(my_queue):
while True:
item = my_queue.get()
if item is None: # Sentinel?
# No more data:
break
print("Got: ", repr(item))
print('my_process terminating normally.')
def main():
import threading
def wait_for_quit(exit_event):
nonlocal child_process
exit_event.wait()
child_process.terminate()
print("Exiting because event was set.")
exit_event = multiprocessing.Event()
# Start daemon thread that will wait for the quit_event
threading.Thread(target=wait_for_quit, args=(exit_event,), daemon=True).start()
my_queue = multiprocessing.Queue()
child_process = multiprocessing.Process(target=my_process, args=(my_queue,))
another_process = multiprocessing.Process(target=another_child_process, args=(my_queue, exit_event))
child_process.start()
another_process.start()
# Wait for processes to end:
child_process.join()
another_process.join()
if __name__=="__main__":
main()
Prints:
Got: 'a'
Exiting because event was set.
If you remove the finish
message from the data returned by get_data
, then all processes will complete normally and what gets printed will be:
Got: 'a'
Got: 'b'
Got: 'c'
Got: 'd'
Got: 'e'
Got: 'f'
Got: 'g'
my_process terminating normally.