I am newbie to asyncio programming and trying to understand producer/consumer behavior.I have one producer and 3 consumers and below is my code snippet
import asyncio
async def producer(q:asyncio.Queue) -> None:
for i in range(3):
print("Producer sleep for 1 seconds")
await asyncio.sleep(1)
print("Producer done sleeping")
await q.put("Message" str(i))
print("Message added to queue")
return
async def consumer(n: int, q:asyncio.Queue) -> None:
while True:
print(f"Consumer{n} going to wait for message")
i = await q.get()
await asyncio.sleep(3)
print(f"Consumer{n} got the message {i}")
q.task_done()
return
async def main():
q = asyncio.Queue()
t1 = asyncio.create_task(producer(q))
t2 = [asyncio.create_task(consumer(n, q)) for n in range(3)]
await asyncio.gather(t1)
await q.join()
if __name__ == "__main__":
asyncio.run(main())
When I run this I get below output
Producer sleep for 1 seconds
Consumer0 going to wait for message
Consumer1 going to wait for message
Consumer2 going to wait for message
Producer done sleeping
Message added to queue
Producer sleep for 1 seconds
Producer done sleeping <<<<<<<<<<<<<<<<<<<<<<
Message added to queue
Producer sleep for 1 seconds
Producer done sleeping
Message added to queue
Consumer0 got the message Message0
Consumer0 going to wait for message
Consumer1 got the message Message1
Consumer1 going to wait for message
Consumer2 got the message Message2
Consumer2 going to wait for message
I expect one of the consumer will wake up and read the message in above marked(<<<) place, but that's not happening. Why is that? I thought when the producer goes to sleep it will give up the control and the consumer waiting at q.get() will get woken up (since q has one message to process) but that's not happening. What am I missing here?
CodePudding user response:
You got it right, actually.
Reason is simply because of wrong line order on consumer:
async def consumer(n: int, q:asyncio.Queue) -> None:
while True:
print(f"Consumer{n} going to wait for message")
i = await q.get()
await asyncio.sleep(3) # << here
print(f"Consumer{n} got the message {i}")
q.task_done()
return
Consumer is sleeping 3 seconds before printing out. But in producer, you sleep 1 seconds first, then wait 2 seconds more in upcoming 2 loops.
So 3 seconds sleep After consumer got first message is long enough to make all consumer message to be printed out after producer task is finished.
Therefore if we change sleep and print order, then output will be what you've expected.
Producer sleep for 1 seconds
Consumer0 going to wait for message
Consumer1 going to wait for message
Consumer2 going to wait for message
Producer done sleeping
Message added to queue
Producer sleep for 1 seconds
Consumer0 got the message Message0 <<<
Producer done sleeping
Message added to queue
Producer sleep for 1 seconds
Consumer1 got the message Message1
Producer done sleeping
Message added to queue
Consumer2 got the message Message2
Consumer0 going to wait for message
Consumer1 going to wait for message
Consumer2 going to wait for message
Apart from that, few minor things to point out just in case if you missed, you can use await t1
instead of await asyncio.gather(t1)
.