I want to implement producer consumer pattern with multiprocessing with the tkinter GUI. I would like to start execution of prcessess by pushing button and end execution of processes by pushing another button.
What I made so far is:
import multiprocessing as mp
import random
import tkinter as tk
import logging
class MainApp(tk.Frame):
def __init__(self, parent, *args, **kwargs):
super().__init__(parent, *args, **kwargs)
self.parent = parent
self.startbtn = tk.Button(self, text="Start MP", command=self.onstart)
self.stopbtn = tk.Button(self, text="Stop MP", command=self.onstop)
self.columnconfigure(index=0,weight=1)
self.startbtn.grid(row=0, column=0, sticky="ew", pady=(25,0))
self.stopbtn.grid(row=1, column=0, sticky="ew", pady=(25,25))
def onstart(self):
print("Start button clicked!")
self.tasks = mp.Queue()
self.producer = Producer(self.tasks)
self.consumer = Consumer(self.tasks)
self.producer.start()
self.consumer.start()
self.startbtn.config(state="disabled")
self.stopbtn.config(state="active")
def onstop(self):
self.producer.event.set()
self.consumer.event.set()
self.producer.join()
self.consumer.join()
self.producer.close()
self.consumer.close()
self.startbtn.config(state="active")
self.stopbtn.config(state="disabled")
class Consumer(mp.Process):
def __init__(self, taskqueue):
super().__init__()
self.taskqueue = taskqueue
self.event = mp.Event()
def run(self):
procname = self.name
print('{}'.format("Consumer started!"))
while True:
nexttask = self.taskqueue.get()
print('Task:{}'.format(nexttask))
if self.event.is_set():
print('{}:Exiting'.format(procname))
break
return
class Producer(mp.Process):
def __init__(self, taskqueue):
super(Producer, self).__init__()
self.taskqueue = taskqueue
self.event = mp.Event()
def run(self):
procname = self.name
print('{}'.format("Producer started!"))
while True:
task = random.random()
self.taskqueue.put(task)
if self.event.is_set():
print('{}:Exiting'.format(procname))
break
return
if __name__ == '__main__':
mlogger = mp.log_to_stderr()
#mlogger.setLevel(mp.SPDEBUG)
logging.basicConfig(level=logging.DEBUG)
root = tk.Tk()
root.geometry("600x400")
root.columnconfigure(index=0, weight=1)
root.rowconfigure(index=0, weight=1)
mapp = MainApp(root)
mapp.grid(row=0, column=0, sticky="nsew")
root.mainloop()
But I'm not able to finish execution of process by pushing "stopbtn" and calling onstop function. The onstop function hangs (there is deadlock on the queue thread) with messages:
Consumer-2:Exiting
Producer-1:Exiting
[INFO/Consumer-2] process shutting down
[DEBUG/Consumer-2] running all "atexit" finalizers with priority >= 0
[INFO/Producer-1] process shutting down
[DEBUG/Producer-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Consumer-2] running the remaining "atexit" finalizers
[INFO/Consumer-2] process exiting with exitcode 0
[DEBUG/Producer-1] telling queue thread to quit
[DEBUG/Producer-1] running the remaining "atexit" finalizers
[DEBUG/Producer-1] joining queue thread
It possible to end these procesess (producer, consumer and queue) in this way by pushing button in tkinter? I tried everything what I have found over Internet in this topic without any success.
CodePudding user response:
If there's a Queue
waiting to send or receive an item, it can block the process from joining. It is good practice to attempt to clear the queue, which can be done in many ways. I would set it up such that the producer gets a signal to quit, and sends that signal to the consumer as the last item in the queue:
from multiprocessing import Process, Event, Queue
from queue import Empty, Full
PUT_TIMEOUT = 1
GET_TIMEOUT = 1
class STOPFLAG: pass #unique value as a flag for exiting
def producer(queue, event):
while not event.is_set():
try:
queue.put("item", True, PUT_TIMEOUT)
except Full:
pass #handle queue is full
queue.put(STOPFLAG())
def consumer(queue):
while True:
try:
item = queue.get(True, GET_TIMEOUT)
except Empty:
continue #maybe do some housekeeping if no item is available from the producer right now
if isinstance(item, STOPFLAG):
return
You can also technically force the queue to use a daemon thread in the background for putting and getting items, so that any remaining work when the process ends it simply dropped. You have to be absolutely sure however that you are fine just dropping everything in the middle, and you don't mind potentially partial / invalid objects showing up on the other end of the queue:
def producer(queue, event):
queue.cancel_join_thread()
while not event.is_set():
try:
queue.put("item", True, PUT_TIMEOUT)
except Full:
pass #handle queue is full
def consumer(queue, event):
queue.cancel_join_thread()
while not event.is_set():
try:
item = queue.get(True, GET_TIMEOUT)
except Empty:
continue #maybe do some housekeeping if no item is available from the producer