I have a function which constantly yields some objects, say 1 per second and a handler which works 2 seconds and handles this objects. For example:
from time import sleep
import asyncio
from datetime import datetime
def generator():
i = 0
while True:
yield i
i = 1
sleep(1)
def handler(number):
sleep(2)
if number % 2 == 0:
print(str(number) ' is even')
else:
print(str(number) ' is odd')
for number in generator():
handler(number)
So, for example '2 is even' is printed 6 seconds after the program starts. How do I reduce this time to 4 seconds ( 2 seconds for generator 2 seconds for handler) using asyncio? I need to set up asynchronous handling of the numbers.
CodePudding user response:
You need couple of changes here:
your
generator
currently is a "generator", change it to "asynchronous generator" so that you can useasync for
. This way it can give the control back to eventloop.Use async version of
sleep
inasyncio
library:asyncio.sleep
.time.sleep
doesn't cooperate with other tasks.Change your
handler
sync function to a "coroutine".
import asyncio
async def generator():
i = 0
while True:
yield i
i = 1
await asyncio.sleep(1)
async def handler(number):
await asyncio.sleep(2)
if number % 2 == 0:
print(str(number) " is even")
else:
print(str(number) " is odd")
async def main():
async for number in generator():
asyncio.create_task(handler(number))
asyncio.run(main())
Now, your first task is main
, asyncio.run()
automatically creates it as Task. Then when this task is running, it iterates asynchronously through the generator()
. The values are received then for each number, you create a new Task out of handler
coroutine.
This way the sleep times are overlapped. When it waits for new number for 1 second, it also actually waits for handler()
one second. Then when the number is received, one second of handler()
task is already passed, it only needs 1 second.
You can see the number of tasks if you want:
async def main():
async for number in generator():
print(f"Number of all tasks: {len(asyncio.all_tasks())}")
asyncio.create_task(handler(number))
Because each handler sleeps 2
seconds, and your number generator sleeps 1
seconds, You see that in every iteration 2
Tasks are exist in event loop. Change await asyncio.sleep(1)
to await asyncio.sleep(0.5)
in generator
coroutine, you will see that 4
tasks are in event loop in every iteration.
Answer to the comment:
Can I do the same thing if I use API which doesn't let me create asynchronous generator, but just a normal one? Can I still asynchronously handle yielded objects?
Yes you can. Just note that if you don't have asynchronous generator, you can't use async for
, which means your iteration is synchronous. But, you have to do a little trick for it to work. When your main()
task is being executed, it constantly get a value from generator
generator and creates a Task for it, but it doesn't give a chance to other tasks to run. You need await asyncio.sleep(0)
:
import asyncio
import time
def generator():
i = 0
while True:
yield i
i = 1
time.sleep(1)
async def handler(number):
await asyncio.sleep(2)
if number % 2 == 0:
print(str(number) " is even")
else:
print(str(number) " is odd")
async def main():
for number in generator():
print(f"Number of all tasks: {len(asyncio.all_tasks())}")
asyncio.create_task(handler(number))
await asyncio.sleep(0)
asyncio.run(main())