I am trying to write an iterator which moves on to the next step in the iteration while awaiting an IO bound task. To roughly demonstrate what I'm trying to do in code
for i in iterable:
await io_bound_task() # move on to next step in iteration
# do more stuff when task is complete
I initially tried running with a simple for
loop, with a sleep
simulating an IO bound task
import asyncio
import random
async def main() -> None:
for i in range(3):
print(f"starting task {i}")
result = await io_bound_task(i)
print(f"finished task {result}")
async def io_bound_task(i: int) -> int:
await asyncio.sleep(random.random())
return i
asyncio.run(main())
here the code runs synchronously and outputs
starting task 0
finished task 0
starting task 1
finished task 1
starting task 2
finished task 2
which I assume is because the for
loop is blocking. So I think an asynchronous for
loop is the way to proceed? so I try using an asynchronous iterator
from __future__ import annotations
import asyncio
import random
class AsyncIterator:
def __init__(self, max_value: int) -> None:
self.max_value = max_value
self.count = 0
def __aiter__(self) -> AsyncIterator:
return self
async def __anext__(self) -> int:
if self.count == self.max_value:
raise StopAsyncIteration
self.count = 1
return self.count
async def main() -> None:
async for i in AsyncIterator(3):
print(f"starting task {i}")
result = await io_bound_task(i)
print(f"finished task {result}")
async def io_bound_task(i: int) -> int:
await asyncio.sleep(random.random())
return i
asyncio.run(main())
but this also seems to run synchronously and results in the output
starting task 1
finished task 1
starting task 2
finished task 2
starting task 3
finished task 3
every time. So I think the asynchronous iterator is not doing what I assumed it would do? At this point I'm stuck. Is it an issue with my understanding of the asynchronous iterator? Can someone give me some pointers as to how to achieve what I'm trying to do?
I'm new to working with async, so apologies if I'm doing something stupid. Any help is appreciated. Thanks.
I'm on python 3.8.10
if that is a relevant detail.
CodePudding user response:
The thing that you are looking for is called a task, and can be created using the asyncio.create_task
function. All the approaches you tried involved awaiting the coroutine io_bound_task(i)
, and await means something like "wait for this to complete before continuing". If you wrap your coroutine in a task, then it will run in the background rather than you having to wait for it to complete before continuing.
Here is a version of your code using tasks:
import asyncio
import random
async def main() -> None:
tasks = []
for i in range(3):
print(f"starting task {i}")
tasks.append(asyncio.create_task(io_bound_task(i)))
for task in tasks:
result = await task
print(f"finished task {result}")
async def io_bound_task(i: int) -> int:
await asyncio.sleep(random.random())
return i
asyncio.run(main())
Output:
starting task 0
starting task 1
starting task 2
finished task 0
finished task 1
finished task 2
You can also use asyncio.gather
(if you need all results before continuing) or asyncio.wait
for awaiting multiple tasks, rather than a loop. For example if task 2 completes before task 0 and you don't want to wait for task 0, you could do:
async def main() -> None:
pending = []
for i in range(3):
print(f"starting task {i}")
pending.append(asyncio.create_task(io_bound_task(i)))
while pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for task in done:
result = await task
print(f"finished task {result}")