Home > Software design >  How to stop threads using Queue()
How to stop threads using Queue()

Time:09-16

I have a program(python 3.9.10) that has a read queue and a write queue. One thread reads and once read, sends to the write queue and another thread writes.

All works fine unless there is an error. If there is, the threads do not stop.

In the following code I am simulating an error being detected in the read thread and trying to stop the threads from reading/writing so the program exits however the program/threads stay active and the program never finishes. If I remove the error simulation code, the threads stop and the program finishes.

I wish to handle the errors WITHIN the threads and if need be, stop the threads/program without throwing an error up

What am I doing wrong? Thanks

Here is a working example of my issue:

import pandas as pd
import datetime
import traceback
from queue import Queue
from threading import Thread
import time



dlQueue = Queue()
writeQueue = Queue()
dlQDone = False
errorStop = False

def log(text):
    text = datetime.datetime.now().strftime("%Y/%m/%d, %H:%M:%S ")   text
    print(text)

def errorBreak():
    global dlQueue
    global writeQueue 
    global errorStop
    global dlQDone 
    dlQueue = Queue()
    writeQueue = Queue()
    errorStop = True
    dlQDone = True
    


def downloadTable(t, q):
    global dlQDone
    global errorStop
    while True:
        if errorStop:
            return
        nextQ = q.get()
        log("READING: "   nextQ)
        writeQueue.put("Writing "   nextQ)
        log("DONE READING: "   nextQ)
        ####sumulating an error and need to exit threads###
        if nextQ == "Read 7":
            log("Breaking Read")
            errorBreak()
            return
        ###################################################
        q.task_done()
        if q.qsize() == 0:
            log("Download QUEUE finished")
            dlQDone = True
            return
        

def writeTable(t, q):
    global errorStop
    global dlQDone
    while True:
        if errorStop:
            log("Error Stop return")
            return
        nextQ = q.get()
        log("WRITING: "   nextQ)

        log("DONE WRITING: "   nextQ)
            
        q.task_done()
        if dlQDone:
            if q.qsize() == 0:
                log("Writing QUEUE finished")
                return


try: 
    
    log("PROCESS STARTING!!")   

    
    for i in range(10):
        dlQueue.put("Read "   str(i))


    startTime = time.time()
    log("Starting threaded pull....")
    dlWorker = Thread(
        target=downloadTable,
        args=(
            "DL",
            dlQueue,
        ),
    )
    dlWorker.start()
    writeWorker = Thread(
        target=writeTable,
        args=(
            "Write",
            writeQueue,
        ),
    )
    writeWorker.start()

    dlQueue.join()
    writeQueue.join()

    log(f"Finished thread in {str(time.time() - startTime)} seconds") # CANNOT GET HERE
    log("Threads: "   str(dlWorker.is_alive())   str(writeWorker.is_alive()))

except Exception as error:
    log(error)
    log(traceback.format_exc())
    

CodePudding user response:

If I understood you correctly, you want to stop both threads in case there's some error that warrants it; you can do that with a threading.Event, and changing your queue reads to have a timeout.

import datetime
import time
import queue
import threading

dlQueue = queue.Queue()
writeQueue = queue.Queue()
stop_event = threading.Event()


def log(text):
    text = datetime.datetime.now().strftime("%Y/%m/%d, %H:%M:%S ")   text
    print(text)


def downloadTable(t: str, q: queue.Queue):
    while not stop_event.is_set():
        try:
            nextQ = q.get(timeout=1)
        except queue.Empty:
            continue
        log("READING: "   nextQ)
        writeQueue.put("Writing "   nextQ)
        log("DONE READING: "   nextQ)
        if nextQ == "7":
            log("Breaking Read")
            stop_event.set()
            break
        q.task_done()
    log("Download thread exiting")


def writeTable(t, q):
    while not stop_event.is_set():
        try:
            nextQ = q.get(timeout=1)
        except queue.Empty:
            continue
        log("WRITING: "   nextQ)

        log("DONE WRITING: "   nextQ)

        q.task_done()
    log("Write thread exiting")


def main():
    log("PROCESS STARTING!!")

    for i in range(10):
        dlQueue.put(f"{i}")

    log("Starting threaded pull....")
    dlWorker = threading.Thread(
        target=downloadTable,
        args=(
            "DL",
            dlQueue,
        ),
    )
    dlWorker.start()
    writeWorker = threading.Thread(
        target=writeTable,
        args=(
            "Write",
            writeQueue,
        ),
    )
    writeWorker.start()
    dlWorker.join()
    writeWorker.join()


if __name__ == "__main__":
    main()
  • Related