Home > Blockchain >  asynchronous iteration, how to move to next step of iteration while waiting for a task to complete?
asynchronous iteration, how to move to next step of iteration while waiting for a task to complete?

Time:03-20

I am trying to write an iterator which moves on to the next step in the iteration while awaiting an IO bound task. To roughly demonstrate what I'm trying to do in code

for i in iterable:
    await io_bound_task()   # move on to next step in iteration
    # do more stuff when task is complete

I initially tried running with a simple for loop, with a sleep simulating an IO bound task

import asyncio
import random


async def main() -> None:
    for i in range(3):
        print(f"starting task {i}")
        result = await io_bound_task(i)
        print(f"finished task {result}")


async def io_bound_task(i: int) -> int:
    await asyncio.sleep(random.random())
    return i


asyncio.run(main())

here the code runs synchronously and outputs

starting task 0
finished task 0
starting task 1
finished task 1
starting task 2
finished task 2

which I assume is because the for loop is blocking. So I think an asynchronous for loop is the way to proceed? so I try using an asynchronous iterator

from __future__ import annotations

import asyncio
import random


class AsyncIterator:
    def __init__(self, max_value: int) -> None:
        self.max_value = max_value
        self.count = 0

    def __aiter__(self) -> AsyncIterator:
        return self

    async def __anext__(self) -> int:
        if self.count == self.max_value:
            raise StopAsyncIteration
        self.count  = 1
        return self.count


async def main() -> None:
    async for i in AsyncIterator(3):
        print(f"starting task {i}")
        result = await io_bound_task(i)
        print(f"finished task {result}")


async def io_bound_task(i: int) -> int:
    await asyncio.sleep(random.random())
    return i


asyncio.run(main())

but this also seems to run synchronously and results in the output

starting task 1
finished task 1
starting task 2
finished task 2
starting task 3
finished task 3

every time. So I think the asynchronous iterator is not doing what I assumed it would do? At this point I'm stuck. Is it an issue with my understanding of the asynchronous iterator? Can someone give me some pointers as to how to achieve what I'm trying to do?

I'm new to working with async, so apologies if I'm doing something stupid. Any help is appreciated. Thanks.

I'm on python 3.8.10 if that is a relevant detail.

CodePudding user response:

The thing that you are looking for is called a task, and can be created using the asyncio.create_task function. All the approaches you tried involved awaiting the coroutine io_bound_task(i), and await means something like "wait for this to complete before continuing". If you wrap your coroutine in a task, then it will run in the background rather than you having to wait for it to complete before continuing.

Here is a version of your code using tasks:

import asyncio
import random


async def main() -> None:
    tasks = []
    for i in range(3):
        print(f"starting task {i}")
        tasks.append(asyncio.create_task(io_bound_task(i)))

    for task in tasks:
        result = await task
        print(f"finished task {result}")


async def io_bound_task(i: int) -> int:
    await asyncio.sleep(random.random())
    return i


asyncio.run(main())

Output:

starting task 0
starting task 1
starting task 2
finished task 0
finished task 1
finished task 2

You can also use asyncio.gather (if you need all results before continuing) or asyncio.wait for awaiting multiple tasks, rather than a loop. For example if task 2 completes before task 0 and you don't want to wait for task 0, you could do:

async def main() -> None:
    pending = []
    for i in range(3):
        print(f"starting task {i}")
        pending.append(asyncio.create_task(io_bound_task(i)))

    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        for task in done:
            result = await task
            print(f"finished task {result}")
  • Related