Home > other >  Share queue between processes
Share queue between processes

Time:06-21

I am pretty new to multiprocessing in python and trying to achieve something which should be a rather common thing to do. But I cannot find an easy way when searching the web.

I want to put data in a queue and then make this queue available to different consumer functions. Of course when getting an element from the queue, all consumer functions should get the same element. The following example should make clear what I want to achieve:

from multiprocessing import Process, Queue

def producer(q):

    for i in range(10):
        
        q.put(i)
    
    q.put(None)

def consumer1(q):
    while True:
        data = q.get()
        
        if data is None:   
            break
   
        print(data)

def consumer2(q):

    while True:
        data = q.get()
        
        if data is None:   
            break
   
        print(data)

def main():
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer1, args=(q,))
    p3 = Process(target=consumer2, args=(q,))
    p1.start()
    p2.start()
    p3.start()
    p1.join()
    p2.join()
    p3.join()


if __name__ == '__main__':
    main()

Since the script is not terminating and I only get the print output of one function I guess this is not the way to do it. I think sharing a queue implies some things to consider? It works fine when using only one consumer function. Appreciate the help!

CodePudding user response:

Your question exemplifies the misunderstanding

"all consumer functions should get the same element"

That's just not how queues work. Queues are automatically managed (there's quite a lot under the hood) such if one item is put in, only one item can be taken out. That item is not duplicated to all consumers. It seems like you actually need two separate queues to guarantee that each consumer gets each input without competing against the other consumer:

from multiprocessing import Process, Queue

def producer(q1, q2):

    for i in range(10):
        
        q1.put(i)
        q2.put(i)
    
    q1.put(None)
    q2.put(None)

def consumer1(q):
    while True:
        data = q.get()
        
        if data is None:   
            break
   
        print(data)

def consumer2(q):

    while True:
        data = q.get()
        
        if data is None:   
            break
   
        print(data)

def main():
    q1 = Queue()
    q2 = Queue()
    p1 = Process(target=producer,  args=(q1, q2))
    p2 = Process(target=consumer1, args=(q1,))
    p3 = Process(target=consumer2, args=(q2,))
    p1.start()
    p2.start()
    p3.start()
    p1.join()
    p2.join()
    p3.join()


if __name__ == '__main__':
    main()
  • Related