Using python, I was trying to demonstrate how a producer/consumer multi-threading scenario could bring to a deadlock when consumer thread ends up waiting on an empty queue that will stay empty for the rest of the execution and 'till the end of it and how to solve this avoiding starvation or a sudden "dirty interruption" of the program.
So I took code from the producer/consumer threading using a queue on this nice RealPython article, here's original code excerpts:
def consumer(queue, event):
"""Pretend we're saving a number in the database."""
while not event.is_set() or not queue.empty():
message = queue.get()
logging.info(
"Consumer storing message: %s (size=%d)", message, queue.qsize()
)
logging.info("Consumer received event. Exiting")
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
pipeline = queue.Queue(maxsize=10)
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)
time.sleep(0.1)
logging.info("Main: about to set event")
event.set()
I noticed that, however unlikely to occur, the code as it is would bring to the situation I described in the case 'main' thread sets the 'event', and 'producer' comes to its end while the 'consumer' is still waiting to get a message from the queue.
To solve things in the case of a single 'consumer' a simple check for the queue not being empty before the 'get' instruction calling would suffice (for instance:
if (not q.empty()): message = q.get()
). BUT the problem, however, would still persist in a multi-consumer scenario 'cause the thread could swap immediately after the queue emptiness check and another consumer (a second one) could get the message leaving the queue empty so that swapping back to previous consumer (the first one) it would call the get on an empty queue and... that's it.
I wanted to go for a solution that would potentially work even in an hypothetical multi-consumer scenario. So I modified the 'consumer' code this way, substantially adding a timeout on the queue get instruction and managing the exception:
def consumer(q, event, n):
while not event.is_set() or not q.empty():
print("Consumer" n ": Q-get")
try:
time.sleep(0.1) #(I don't really need this, I just want to force a consumer-thread swap at this precise point :=> showing that, just as expected, things will work well in a multi-consumer scenario also)
message = q.get(True,1)
except queue.Empty:
print("Consumer" n ": looping on empty queue")
time.sleep(0.1) #(I don't really need this at all... just hoping -unfortunately without success- _main_ to swap on ThreadPoolExecutor)
continue
logging.info("Consumer%s storing message: %s (size=%d)", n,message,q.qsize())
print("Consumer" n ": ended")
and also modified the "main" part to make it put a message in the queue and to make it spawn a second consumer instead of a producer...
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
pipeline = queue.Queue(maxsize=10)
event = threading.Event()
pipeline.put("XxXxX")
print("Let's start (ThreadPoolExecutor)")
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
#executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event, '1')
executor.submit(consumer, pipeline, event, '2')
print("(_main_ won't even get this far... Why?)") #!#
time.sleep(2)
logging.info("Main: about to set event")
event.set()
(please mind that my purpose here is thwarting the consumer deadlock risk and show it is voided actually, in this phase I don't really need the producer, that's why I made the code not spawn it)
Now, the problem is, I can't understand why, all seems working well if threads are spawned with
threading.Thread(...).start()
, for instance:
print("Let's start (simple Thread)")
for i in range(1,3): threading.Thread(target=consumer, args=(pipeline, event, str(i))).start()
whereas using concurrent.futures.ThreadPoolExecutor
for it seems making the 'main' thread to never resume (seems like it not even get to its sleep call), so that the execution never ends resulting in an infinite consumer looping...
Can you help me understand why this "difference"? Knowing this is important for me and I think almost certainly would help me understand if it can be solved somehow or if I'll be necessarily forced to not use the ThreadPoolExecutor, so... Thank you in advance for your precious help on this!
CodePudding user response:
The problem is that you put event.set()
outside the with
block that manages the ThreadPoolExecutor
. When used with with
, on exiting the with
, ThreadPoolExecutor
performs the equivalent of .shutdown(wait=True)
. So you're waiting for the workers to finish, which they won't, because you haven't yet set the event
.
If you want to be able to tell it to shutdown when it can, but not wait immediately, you could do:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
try:
#executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event, '1')
executor.submit(consumer, pipeline, event, '2')
executor.shutdown(wait=False) # No new work can be submitted after this point, and
# workers will opportunistically exit if no work left
time.sleep(2)
logging.info("Main: about to set event")
finally:
event.set() # Set the event *before* we block awaiting shutdown
# Done in a finally block to ensure its set even on exception
# so the with doesn't try to block in a case where
# it will never finish
# Blocks for full shutdown here; after this, the pool is definitely finished and cleaned up