I'm having troubles with Python threads and queues. I wrote a worker which does some tasks in a database. Each task comes from a queue. Whenever he's done he should write the final result (rows inserted) to the queue, which is then handled when all tasks are done.
Here's a code sample:
class Worker(threading.Thread):
def __init__(self, queue_in, queue_out, **kwargs):
super().__init__(**kwargs)
self.queue_in = queue_in
self.queue_out = queue_out
def run(self):
while not self.queue_in.empty():
task = self.queue_in.get()
table_name, rows_inserted = insert_rows(task)
self.queue_in.task_done()
self.queue_out.put((table_name, rows_inserted))
def do_db_stuff():
queue_in = queue.Queue()
queue_out = queue.Queue()
# here I read names of data files, not relevant
for file in files:
queue_in.put(file)
threads = [Worker(queue_in, queue_out) for _ in range(3)]
for t in threads:
t.start()
queue_in.join()
# queue_out.join() # this ends in an infinite loop when uncommented
# Here I do some statistics stuff with the items from queue_out
Result is that when I put 3 files to be processed from Worker, I only have 2 items in the queue_out. This happens for any mount of files (=tasks), the worker working on the last task cannot put his result to queue_out. And I really do not have any clue why.
I also tried to put a queue_out.join()
right after queue_in.join()
but this leads to an infinite loop, which can only be interrupted by CTRL-C:
^CTraceback (most recent call last):
File "testfile.py", line 91, in <module>
run()
File "testfile.py", line 72, in run
queue_out.join()
File "/usr/lib/python3.6/queue.py", line 83, in join
self.all_tasks_done.wait()
File "/usr/lib/python3.6/threading.py", line 295, in wait
waiter.acquire()
KeyboardInterrupt
Adding a loop for all threads to join ends in the same problem (infinite loop).
Any ideas how to solve this? Maybe adding a timeout somewhere?
Regards, Thomas
CodePudding user response:
I'm not able to reproduce this, when just using a placeholder for the insert_rows
function.
def insert_rows(task):
sleep(5)
return 'test', 'tests'
...
for file in ['a', 'b', 'c']:
queue_in.put(file)
...
queue_in.join()
print(queue_out.qsize())
# -> 3
CodePudding user response:
I found a solution - it's all about task_done()
.
queue_in.join()
waits for all threads to finish their tasks and task_done()
tells it. Now, when calling task_done()
too early the last task may want to put an item to queue queue_out
but at that time all other tasks are already done, the program continues and this one last item is missed.
So I moved task_done()
at the very last end, right behind after self.queue_out.put((table_name, rows_inserted))
and - tadaaaa - the item is now in queue_out
for further processing.