I'm using python 3.7 on ubuntu 20.04 OS. My problem statement is similar to that of producer and consumer problem, where there is a pair of reader and writer processes. My reader process calls Queue.get in an infinite loop, (As per documentation, the Queue.get is blocking call until any data is put into the queue by another process).
Making this call raises EOFError.
reader.py
import multiprocessing as mp
def reader(queue):
while True:
data = queue.get()
# Do something
queue = mp.Manager().Queue()
p = mp.Process(target=reader, args=(queue,))
p.start()
# Prepare some data to send
queue.put(some_data)
# Do my own tasks
Running This results in
Traceback (most recent call last):
File "/usr/src/app/src/processor.py", line 775, in classification_manager
slot_data = classification_queue.get()
File "<string>", line 2, in get
File "/usr/lib/python3.7/multiprocessing/managers.py", line 819, in _callmethod
kind, result = conn.recv()
File "/usr/lib/python3.7/multiprocessing/connection.py", line 250, in recv
buf = self._recv_bytes()
File "/usr/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.7/multiprocessing/connection.py", line 383, in _recv
raise EOFError
EOFError
2022-02-18 20:45:01,528 classification_1 INFO Waiting for Data!
2022-02-18 20:45:01,528 classification_1 ERROR BrokenPipeError
Traceback (most recent call last):
File "/usr/src/app/src/processor.py", line 775, in classification_manager
slot_data = classification_queue.get()
File "<string>", line 2, in get
File "/usr/lib/python3.7/multiprocessing/managers.py", line 818, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/lib/python3.7/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header buf)
File "/usr/lib/python3.7/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
CodePudding user response:
The threading equivalent to your code works fine:
import threading as th
from queue import Queue
from time import sleep
def reader(queue):
while True:
data = queue.get()
print ("Reader saw",data)
# Do something
queue = Queue()
p = th.Thread(target=reader, args=(queue,))
p.start()
queue.put("Expect More Data")
call_count = 0
while True:
sleep(2)
call_count = 1
queue.put(call_count)
CodePudding user response:
I we just elaborate your code (which isn't runnable as it stands) then there's no problem at all.
import multiprocessing as mp
def reader(queue):
while (data := queue.get()) != 'stop':
print(data)
def main():
queue = mp.Manager().Queue()
p = mp.Process(target=reader, args=(queue,))
p.start()
for some_data in ['Hello', 'world', 'stop']:
queue.put(some_data)
p.join()
if __name__ == '__main__':
main()
Output:
Hello
world