Home > other >  Python multiprocessing.Queue.get raises EOFError on first call
Python multiprocessing.Queue.get raises EOFError on first call

Time:02-19

I'm using python 3.7 on ubuntu 20.04 OS. My problem statement is similar to that of producer and consumer problem, where there is a pair of reader and writer processes. My reader process calls Queue.get in an infinite loop, (As per documentation, the Queue.get is blocking call until any data is put into the queue by another process).

Making this call raises EOFError.

reader.py

import multiprocessing as mp

def reader(queue):
    while True:
        data = queue.get()
        # Do something

queue = mp.Manager().Queue()
p = mp.Process(target=reader, args=(queue,))
p.start()
# Prepare some data to send
queue.put(some_data)
# Do my own tasks

Running This results in

    Traceback (most recent call last):
  File "/usr/src/app/src/processor.py", line 775, in classification_manager
    slot_data = classification_queue.get()
  File "<string>", line 2, in get
  File "/usr/lib/python3.7/multiprocessing/managers.py", line 819, in _callmethod
    kind, result = conn.recv()
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 383, in _recv
    raise EOFError
EOFError
2022-02-18 20:45:01,528 classification_1 INFO     Waiting for Data!
2022-02-18 20:45:01,528 classification_1 ERROR    BrokenPipeError
Traceback (most recent call last):
  File "/usr/src/app/src/processor.py", line 775, in classification_manager
    slot_data = classification_queue.get()
  File "<string>", line 2, in get
  File "/usr/lib/python3.7/multiprocessing/managers.py", line 818, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header   buf)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

CodePudding user response:

The threading equivalent to your code works fine:

import threading as th
from queue import Queue
from time import sleep

def reader(queue):
    while True:
        data = queue.get()
        print ("Reader saw",data)
        # Do something

queue = Queue()
p = th.Thread(target=reader, args=(queue,))
p.start()

queue.put("Expect More Data")

call_count = 0
while True:
    sleep(2)
    call_count  = 1
    queue.put(call_count)

CodePudding user response:

I we just elaborate your code (which isn't runnable as it stands) then there's no problem at all.

import multiprocessing as mp

def reader(queue):
    while (data := queue.get()) != 'stop':
        print(data)

def main():
    queue = mp.Manager().Queue()
    p = mp.Process(target=reader, args=(queue,))
    p.start()
    for some_data in ['Hello', 'world', 'stop']:
        queue.put(some_data)
    p.join()


if __name__ == '__main__':
    main()

Output:

Hello
world
  • Related