I have a program(python 3.9.10) that has a read queue and a write queue. One thread reads and once read, sends to the write queue and another thread writes.
All works fine unless there is an error. If there is, the threads do not stop.
In the following code I am simulating an error being detected in the read thread and trying to stop the threads from reading/writing so the program exits however the program/threads stay active and the program never finishes. If I remove the error simulation code, the threads stop and the program finishes.
I wish to handle the errors WITHIN the threads and if need be, stop the threads/program without throwing an error up
What am I doing wrong? Thanks
Here is a working example of my issue:
import pandas as pd
import datetime
import traceback
from queue import Queue
from threading import Thread
import time
dlQueue = Queue()
writeQueue = Queue()
dlQDone = False
errorStop = False
def log(text):
text = datetime.datetime.now().strftime("%Y/%m/%d, %H:%M:%S ") text
print(text)
def errorBreak():
global dlQueue
global writeQueue
global errorStop
global dlQDone
dlQueue = Queue()
writeQueue = Queue()
errorStop = True
dlQDone = True
def downloadTable(t, q):
global dlQDone
global errorStop
while True:
if errorStop:
return
nextQ = q.get()
log("READING: " nextQ)
writeQueue.put("Writing " nextQ)
log("DONE READING: " nextQ)
####sumulating an error and need to exit threads###
if nextQ == "Read 7":
log("Breaking Read")
errorBreak()
return
###################################################
q.task_done()
if q.qsize() == 0:
log("Download QUEUE finished")
dlQDone = True
return
def writeTable(t, q):
global errorStop
global dlQDone
while True:
if errorStop:
log("Error Stop return")
return
nextQ = q.get()
log("WRITING: " nextQ)
log("DONE WRITING: " nextQ)
q.task_done()
if dlQDone:
if q.qsize() == 0:
log("Writing QUEUE finished")
return
try:
log("PROCESS STARTING!!")
for i in range(10):
dlQueue.put("Read " str(i))
startTime = time.time()
log("Starting threaded pull....")
dlWorker = Thread(
target=downloadTable,
args=(
"DL",
dlQueue,
),
)
dlWorker.start()
writeWorker = Thread(
target=writeTable,
args=(
"Write",
writeQueue,
),
)
writeWorker.start()
dlQueue.join()
writeQueue.join()
log(f"Finished thread in {str(time.time() - startTime)} seconds") # CANNOT GET HERE
log("Threads: " str(dlWorker.is_alive()) str(writeWorker.is_alive()))
except Exception as error:
log(error)
log(traceback.format_exc())
CodePudding user response:
If I understood you correctly, you want to stop both threads in case there's some error that warrants it; you can do that with a threading.Event
, and changing your queue reads to have a timeout.
import datetime
import time
import queue
import threading
dlQueue = queue.Queue()
writeQueue = queue.Queue()
stop_event = threading.Event()
def log(text):
text = datetime.datetime.now().strftime("%Y/%m/%d, %H:%M:%S ") text
print(text)
def downloadTable(t: str, q: queue.Queue):
while not stop_event.is_set():
try:
nextQ = q.get(timeout=1)
except queue.Empty:
continue
log("READING: " nextQ)
writeQueue.put("Writing " nextQ)
log("DONE READING: " nextQ)
if nextQ == "7":
log("Breaking Read")
stop_event.set()
break
q.task_done()
log("Download thread exiting")
def writeTable(t, q):
while not stop_event.is_set():
try:
nextQ = q.get(timeout=1)
except queue.Empty:
continue
log("WRITING: " nextQ)
log("DONE WRITING: " nextQ)
q.task_done()
log("Write thread exiting")
def main():
log("PROCESS STARTING!!")
for i in range(10):
dlQueue.put(f"{i}")
log("Starting threaded pull....")
dlWorker = threading.Thread(
target=downloadTable,
args=(
"DL",
dlQueue,
),
)
dlWorker.start()
writeWorker = threading.Thread(
target=writeTable,
args=(
"Write",
writeQueue,
),
)
writeWorker.start()
dlWorker.join()
writeWorker.join()
if __name__ == "__main__":
main()