Home > database >  Queue does not reveive the last item from threads
Queue does not reveive the last item from threads

Time:09-15

I'm having troubles with Python threads and queues. I wrote a worker which does some tasks in a database. Each task comes from a queue. Whenever he's done he should write the final result (rows inserted) to the queue, which is then handled when all tasks are done.

Here's a code sample:

class Worker(threading.Thread):
    def __init__(self, queue_in, queue_out, **kwargs):
        super().__init__(**kwargs)
        self.queue_in = queue_in
        self.queue_out = queue_out
    def run(self):
        while not self.queue_in.empty():
            task = self.queue_in.get()
            table_name, rows_inserted = insert_rows(task)
            self.queue_in.task_done()
            self.queue_out.put((table_name, rows_inserted))

def do_db_stuff():
    queue_in = queue.Queue()
    queue_out = queue.Queue()
    # here I read names of data files, not relevant
    for file in files:
        queue_in.put(file)
    threads = [Worker(queue_in, queue_out) for _ in range(3)]
    for t in threads:
        t.start()
    queue_in.join()
    # queue_out.join() # this ends in an infinite loop when uncommented
    # Here I do some statistics stuff with the items from queue_out

Result is that when I put 3 files to be processed from Worker, I only have 2 items in the queue_out. This happens for any mount of files (=tasks), the worker working on the last task cannot put his result to queue_out. And I really do not have any clue why.

I also tried to put a queue_out.join() right after queue_in.join() but this leads to an infinite loop, which can only be interrupted by CTRL-C:

^CTraceback (most recent call last):
  File "testfile.py", line 91, in <module>
    run()
  File "testfile.py", line 72, in run
    queue_out.join()
  File "/usr/lib/python3.6/queue.py", line 83, in join
    self.all_tasks_done.wait()
  File "/usr/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()
KeyboardInterrupt

Adding a loop for all threads to join ends in the same problem (infinite loop).

Any ideas how to solve this? Maybe adding a timeout somewhere?

Regards, Thomas

CodePudding user response:

I'm not able to reproduce this, when just using a placeholder for the insert_rows function.


def insert_rows(task):
    sleep(5)
    return 'test', 'tests'


...

for file in ['a', 'b', 'c']:
    queue_in.put(file)
...

queue_in.join()
print(queue_out.qsize()) 
# -> 3

CodePudding user response:

I found a solution - it's all about task_done().

queue_in.join() waits for all threads to finish their tasks and task_done() tells it. Now, when calling task_done() too early the last task may want to put an item to queue queue_out but at that time all other tasks are already done, the program continues and this one last item is missed.

So I moved task_done() at the very last end, right behind after self.queue_out.put((table_name, rows_inserted)) and - tadaaaa - the item is now in queue_out for further processing.

  • Related