Home > OS >  How to gather results from Python's Thread class with an input and output queue?
How to gather results from Python's Thread class with an input and output queue?

Time:09-21

I'm trying to learn a bit about threading in python. I'm aware there are various other pools and processes or Pipes that I can use that might be easier but I'm interested in using the threading module.

from threading import Thread
from queue import Queue

class SimulationThread(Thread):

    def __init__(self, input_queue: Queue, results_queue: Queue):
        Thread.__init__(self)
        self.input_queue = input_queue
        self.results_queue = results_queue

    def run(self) -> None:
        try:
            data = self.input_queue.get() # will be replaced with simulation data
            self.results_queue.put(data)
        finally:
            return self.input_queue.task_done()

N = 10 # number of simulations to run
NP = 8 # number of threads to use
input_queue = Queue()
results_queue = Queue()

for x in range(NP):
    worker = SimulationThread(input_queue, results_queue)
    worker.daemon = True
    worker.start()

for i in range(N):
    input_queue.put(i)

Now I've tried a few different things to gather up the results:

# always returns 0
print(results_queue.get())
#hangs
results_queue.join()
# does nothing, I'm quessing queue is not yet populated
while not results_queue.empty():
    print(results_queue.get())
# prints nothing
ret = results_queue.get()
while ret is None:
    ret = results_queue.get()
    print(ret)
# finally prints out the results, but in order of 1 - 7. No 8 or 9.
ret = results_queue.get()
while ret is not None:
    ret = results_queue.get()
    print(ret)
    if results_queue.empty():
        ret = None

While is where I stop and ask for some help. How can I get all NP threads processing all N numbers at the same time?

CodePudding user response:

By default Queue.get will block if necessary until an item is available. You add 10 items to the input_queue queue but then you create only 8 threads.

Your threads should continuously collect and process items from the queue until they are stopped. You can try something like this:

test.py:

from queue import Queue
from threading import Thread

N = 10  # number of simulations to run
NP = 8  # number of threads to use


class SimulationThread(Thread):
    def __init__(self, input_queue, results_queue):
        super().__init__()

        self.input_queue = input_queue
        self.results_queue = results_queue

    def run(self):
        for data in iter(self.input_queue.get, "STOP"):
            self.results_queue.put(data * 2)


def main():
    input_queue = Queue()
    results_queue = Queue()

    for i in range(N):
        input_queue.put(i)

    for _ in range(NP):
        SimulationThread(input_queue, results_queue).start()

    for i in range(N):
        print(i, results_queue.get())

    for _ in range(NP):
        input_queue.put("STOP")


if __name__ == "__main__":
    main()

Test:

$ python test.py
0 0
1 2
2 4
3 6
4 8
5 10
6 12
7 14
8 16
9 18

CodePudding user response:

You probably want to use a JoinableQueue. Each task, as it finishes its job, calls .task_done() on the queue from input is fetched.

Your main thread then calls queue.join() on that same queue. This will not return until there have been as many calls to task_done() as there have been items added to the queue.

  • Related