Im starting three Processes (a2,a3,a4) from a Process(a1). These three (sub)Processes need to send data to another process(b1). I cant make this Process(b1) in the (sub)Processes of a2,a3,a4 (this is one function being called three times) because it will spawn three Processes instead of just one. For this reason, the handling of the Queue seems to get difficult. All the functions are looping indefintely. How would i solve this? A simple form of the code is here:
import multiprocessing
from multiprocessing import Queue, Process
def Process_B(): #q4: multiprocessing.Queue
while True:
if q4.qsize() > 0:
var1, var2, var3 = q4.get()
print(var1, var2, var3)
#here is where i cant get the data
def Process_A2_3_4(q123: multiprocessing.Queue):
supProcesses = SubProcesses123()
while True:
if q123.qsize() > 0:
var1, var2, var3 = q123.get()
supProcesses.doSomething(var1, var2, var3)
class SubProcesses123:
def __init__(self, name):
self.name = name
def doSomething(self, var1, var2, var3):
print('doing something')
#this needs to put something in a Queue
#same Queue for all three Processes(a2,a3,a4)
#Data will be received by Function: Process_B
class Process_A:
def __init__(self):
self.a2 = Queue()
self.a3 = Queue()
self.a4 = Queue()
self.startProcess234()
self.doSomething()
def startProcess234(self):
a2 = Process(target=Process_A2_3_4, args=(self.a2, "a2"))
a3 = Process(target=Process_A2_3_4, args=(self.a3, "a3"))
a4 = Process(target=Process_A2_3_4, args=(self.a4, "a4"))
a2.start()
a3.start()
a4.start()
def doSomething(self):
print('doing something')
if __name__ == '__main__':
retrieve = Process(target=Process_A)
retrieve.start()
So a single Queue need to be made which will be used by Processes(a2,a3,a4) to send data to Process_B. The question is how? When i make a Queue in main or in Process_A and feed it to Process_A2_3_4, it does put something in the Queue(at least no errors) but im unable to retrieve the data from the queue in the other process. Any help would be much appreciated.
CodePudding user response:
This is the kind of thing you want to do. Create one queue to feed the input dudes (the "A" processes), and one queue for those dudes to feed their output to the consolidator dude (the "B" process).
import multiprocessing
from multiprocessing import Queue, Process
import time
def Process_B(q4):
while True:
var1, var2, var3 = q4.get()
print("in B", var1, var2, var3)
def Process_A2_3_4(q123, q4, name):
print(name,"starting")
supProcesses = SubProcesses123(q4, name)
while True:
var1, var2, var3 = q123.get()
supProcesses.doSomething(var1, var2, var3)
class SubProcesses123:
def __init__(self, q, name):
self.name = name
self.q = q
def doSomething(self, var1, var2, var3):
print('in SP123', var1, var2, var3)
self.q.put( (var1,var2,var3) )
class Process_A:
def __init__(self):
self.q1 = Queue()
self.q2 = Queue()
self.startProcess234()
for _ in range(20):
self.q1.put( (1,2,3) )
time.sleep(1)
def startProcess234(self):
a2 = Process(target=Process_A2_3_4, args=(self.q1, self.q2, "a2"))
a3 = Process(target=Process_A2_3_4, args=(self.q1, self.q2, "a3"))
a4 = Process(target=Process_A2_3_4, args=(self.q1, self.q2, "a4"))
b = Process(target=Process_B, args=(self.q2,))
a2.start()
a3.start()
a4.start()
b.start()
def doSomething(self):
print('doing something')
if __name__ == '__main__':
retrieve = Process(target=Process_A)
retrieve.start()