Home > Blockchain >  Broken pipe error when trying to send anything over pipe between processes with sending process runn
Broken pipe error when trying to send anything over pipe between processes with sending process runn

Time:11-11

I am playing around with Pipe and Process from the multiprocessing module (Python 3.8). My initial program looks like this:

from multiprocessing import Process, Pipe


class Process1(object):
    def __init__(self, pipe_out):
        self.pipe_out = pipe_out

        self.run()

    def run(self):
        try:
            while True:
                print("Sending message to process 2")
                self.pipe_out.send(["hello"])
        except KeyboardInterrupt:
            pass


class Process2(object):
    def __init__(self, pipe_in):
        self.pipe_in = pipe_in

        self.run()

    def run(self):
        try:
            while self.pipe_in.poll():
                request = self.pipe_in.recv()
                method = request[0]
                args = request[1:]

                try:
                    getattr(self, method   "_callback")(*args)
                except AttributeError as ae:
                    print("Unknown callback received from pipe", str(ae))

            print("Process 2 done with receiving")
        except KeyboardInterrupt:
            pass

    def hello_callback(self):
        print("Process 1 said hello")


class Controller(object):
    def __init__(self):
        pipe_proc1_out, pipe_proc2_in = Pipe()

        self.proc1 = Process(
            target=Process1,
            args=(pipe_proc1_out, )
        )

        self.proc2 = Process(
            target=Process2,
            args=(pipe_proc2_in, )
        )

    def run(self):
        try:
            self.proc1.start()
            self.proc2.start()

            while True:
                continue
        except KeyboardInterrupt:
            print("Quitting processes...")
            self.proc1.join(1)
            if self.proc1.is_alive():
                self.proc1.terminate()

            self.proc2.join(1)
            if self.proc2.is_alive():
                self.proc2.terminate()

            print("Finished")


def pipes():
    c = Controller()
    c.run()


if __name__ == "__main__":
    pipes()

I have a Controller instance that runs until a keyboard interruption is received. It also handles two processes Process1 and Process2 with the former constantly sending and the latter constantly receiving.

The code above is a skeleton for a larger undertaking that involves a complex GUI (PySide), image processing (OpenCV) and a game engine (Panda3D). So I tried to add Tkinter as a GUI example:

from multiprocessing import Process, Pipe
import tkinter as tk


class Process1(tk.Frame):
    def __init__(self, pipe_out):
        self.pipe_out = pipe_out

        self.setup_gui()
        self.run()

    def setup_gui(self):
        self.app = tk.Tk()
        lb1 = tk.Label(self.app, text="Message:")
        lb1.pack()
        self.ent1 = tk.Entry(self.app)
        self.ent1.pack()
        btn1 = tk.Button(self.app, text="Say hello to other process",
                         command=self.btn1_clicked)
        btn1.pack()

    def btn1_clicked(self):
        msg = self.ent1.get()
        self.pipe_out.send(["hello", msg])

    def run(self):
        try:
            self.app.mainloop()
        except KeyboardInterrupt:
            pass


class Process2(object):
    def __init__(self, pipe_in):
        self.pipe_in = pipe_in

        self.run()

    def run(self):
        try:
            while self.pipe_in.poll():
                request = self.pipe_in.recv()
                method = request[0]
                args = request[1:]

                try:
                    getattr(self, method   "_callback")(*args)
                except AttributeError as ae:
                    print("Unknown callback received from pipe", str(ae))

            print("Process 2 done with receiving")
        except KeyboardInterrupt:
            pass

    def hello_callback(self, msg):
        print("Process 1 say\""   msg   "\"")


class Controller(object):
    def __init__(self):
        pipe_proc1_out, pipe_proc2_in = Pipe()

        self.proc1 = Process(
            target=Process1,
            args=(pipe_proc1_out, )
        )

        self.proc2 = Process(
            target=Process2,
            args=(pipe_proc2_in, )
        )

    def run(self):
        try:
            self.proc1.start()
            self.proc2.start()

            while True:
                continue
        except KeyboardInterrupt:
            print("Quitting processes...")
            self.proc1.join(1)
            if self.proc1.is_alive():
                self.proc1.terminate()

            self.proc2.join(1)
            if self.proc2.is_alive():
                self.proc2.terminate()

            print("Finished")


def pipes():
    c = Controller()
    c.run()


if __name__ == "__main__":
    pipes()

Notice that currently the Tkinter window can only be closed if the "parent" process is interrupted via keyboard.

Whenever I click the button and invoke the button's command, my program goes into an error state with the following message:

Exception in Tkinter callback
Traceback (most recent call last):
  File "C:\Users\USER\Anaconda3\envs\THS\lib\tkinter\__init__.py", line 1705, in __call__
    return self.func(*args)
  File "C:\Users\USER\PycharmProjects\PythonPlayground\pipes_advanced.py", line 26, in btn1_clicked
    self.pipe_out.send(["hello", 1, 2])
  File "C:\Users\USER\Anaconda3\envs\THS\lib\multiprocessing\connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "C:\Users\USER\Anaconda3\envs\THS\lib\multiprocessing\connection.py", line 280, in _send_bytes
    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closed

At first I thought that the problem is with the value I'm receiving from the Entry.get() call (my Tkinter skills are rusty). I printed msg and got the text from the widget.

Next thing I tried was to put a constant string as the value of the argument that I sent over the pipe:

def btn1_clicked(self):
    self.pipe_out.send(["hello", "world"])

The same error appeared. Catching the exception BrokenPipeError doesn't really do me any good (except if I want to handle the case when the pipe is broken I guess).

If I do the same for the first version of the program (without Tkinter), it works. This leads me to believe that my problem comes from the way I have integrated Tkinter.

CodePudding user response:

The issue you have is that you poll the the pipe, but the documentation says:

poll([timeout])

Return whether there is any data available to be read.
If timeout is not specified then it will return immediately.

In the first example it works because when starting Process1 you send data to the pipe immediately:

    def run(self):
        try:
            while True:
                print("Sending message to process 2")
                self.pipe_out.send(["hello"])
        except KeyboardInterrupt:
            pass

And you do this continuously so the .poll will return True and the loop in Process2 will continue.

As with tkinter nothing gets sent to the pipe immediately it waits for user to click a button, by the time any of that can happen the Process2 already has called poll and it immediately returned False and it didn't even start that loop. If you notice then it also almost immediately prints in the terminal that

"Process 2 done with receiving"

To solve this issue the easiest seems to use

while self.pipe_in.poll(None):

which per the docs means

"If timeout is None then an infinite timeout is used."

and for something like user interface this seems to be the best fit (from user's perspective at least (or so I think)) so basically your run method in Process2 should look like this:

    def run(self):
        try:
            while self.pipe_in.poll(None):
                request = self.pipe_in.recv()
                method = request[0]
                args = request[1:]

                try:
                    getattr(self, method   "_callback")(*args)
                except AttributeError as ae:
                    print("Unknown callback received from pipe", str(ae))

            print("Process 2 done with receiving")
        except (KeyboardInterrupt, EOFError):
            pass

Also not related to the problem but there seems to be no need to inherit from tk.Frame in Process1 (or object in Process2 (unless you really need to make it compatible with Python2)), you almost could inherit from tk.Tk, that should make it easier to actually use it as the main window since self would be the Tk instance

  • Related