I'm trying to learn a bit about threading in python. I'm aware there are various other pools and processes or Pipes that I can use that might be easier but I'm interested in using the threading module.
from threading import Thread
from queue import Queue
class SimulationThread(Thread):
def __init__(self, input_queue: Queue, results_queue: Queue):
Thread.__init__(self)
self.input_queue = input_queue
self.results_queue = results_queue
def run(self) -> None:
try:
data = self.input_queue.get() # will be replaced with simulation data
self.results_queue.put(data)
finally:
return self.input_queue.task_done()
N = 10 # number of simulations to run
NP = 8 # number of threads to use
input_queue = Queue()
results_queue = Queue()
for x in range(NP):
worker = SimulationThread(input_queue, results_queue)
worker.daemon = True
worker.start()
for i in range(N):
input_queue.put(i)
Now I've tried a few different things to gather up the results:
# always returns 0
print(results_queue.get())
#hangs
results_queue.join()
# does nothing, I'm quessing queue is not yet populated
while not results_queue.empty():
print(results_queue.get())
# prints nothing
ret = results_queue.get()
while ret is None:
ret = results_queue.get()
print(ret)
# finally prints out the results, but in order of 1 - 7. No 8 or 9.
ret = results_queue.get()
while ret is not None:
ret = results_queue.get()
print(ret)
if results_queue.empty():
ret = None
While is where I stop and ask for some help. How can I get all NP threads processing all N numbers at the same time?
CodePudding user response:
By default Queue.get will block if necessary until an item is available. You add 10 items to the input_queue
queue but then you create only 8 threads.
Your threads should continuously collect and process items from the queue until they are stopped. You can try something like this:
test.py:
from queue import Queue
from threading import Thread
N = 10 # number of simulations to run
NP = 8 # number of threads to use
class SimulationThread(Thread):
def __init__(self, input_queue, results_queue):
super().__init__()
self.input_queue = input_queue
self.results_queue = results_queue
def run(self):
for data in iter(self.input_queue.get, "STOP"):
self.results_queue.put(data * 2)
def main():
input_queue = Queue()
results_queue = Queue()
for i in range(N):
input_queue.put(i)
for _ in range(NP):
SimulationThread(input_queue, results_queue).start()
for i in range(N):
print(i, results_queue.get())
for _ in range(NP):
input_queue.put("STOP")
if __name__ == "__main__":
main()
Test:
$ python test.py
0 0
1 2
2 4
3 6
4 8
5 10
6 12
7 14
8 16
9 18
CodePudding user response:
You probably want to use a JoinableQueue
. Each task, as it finishes its job, calls .task_done()
on the queue from input is fetched.
Your main thread then calls queue.join()
on that same queue. This will not return until there have been as many calls to task_done()
as there have been items added to the queue.