Home > database >  Why is create_task() needed to create a queue of coroutines using asyncio gather?
Why is create_task() needed to create a queue of coroutines using asyncio gather?

Time:10-13

I have the following code running in an event loop where I'm downloading a large number of files using asyncio and restricting the number of files downloaded using asyncio.queue:

download_tasks = asyncio.Queue()
for file in files:
    # download_file() is an async function that downloads a file from Microsoft blob storage
    # that is basically await blob.download_blob()
    download_tasks.put_nowait(asyncio.create_task(download_file(file=file))

async def worker():
    while not download_tasks.empty():
        return await download_tasks.get_nowait() 

worker_limit = 10
# each call to download_file() returns a pandas dataframe
df_list = await asyncio.gather(*[worker() for _ in range(worker_limit)], return_exceptions=True)
df = pd.concat(df_list)  

This code seems to run fine, but I originally had the for loop defined as:

for file in files:
    # download_file() is an async function that downloads a file from Microsoft blob storage
    # that is basically await blob.download_blob()
    download_tasks.put_nowait(download_file(file=file)

With this code, the result is the same but I get the following warning:

RuntimeWarning: coroutine 'download_file' was never awaited

Looking at asyncio examples, sometimes I see create_task() used when creating a list or queue of coroutines to be run in gather and sometimes I don't. Why is it needed in my case and what's the best practice for using it?

Edit: As @user2357112supportsMonica discourteously pointed out, the return statement within worker() doesn't really make sense. The point of this code is to limit concurrency because I may have to download thousands at a time and would like to limit it to 10 at a time using the queue. So my actual question is, how can I use gather to return all my results using this queue implementation?

Edit 2: I seemed to have found an easy solution that works using a semaphore instead of a queue with the following code adapted from this answer https://stackoverflow.com/a/61478547/4844593:

download_tasks = []
for file in files:
    download_tasks.append(download_file(file=file))

async def gather_with_concurrency(n, *tasks):
    semaphore = asyncio.Semaphore(n)
    async def sem_task(task):
        async with semaphore:
            return await task
    return await asyncio.gather(*(sem_task(task) for task in tasks))
    
df_list = await gather_with_concurrency(10, *download_tasks)
return pd.concat(df_list)  

CodePudding user response:

As "user2357112 supports Monica" notes, the original issue probably comes from the workers having a return so each worker will download one file then quit, meaning any coroutines after the first 10 will be ignored and never awaited (you can probably see that if you log information about download_tasks after the supposed completion of your processing).

The create_tasks defeats that because it will immediately schedule the downloading at the same time (defeating the attempted rate limiting / workers pool), then the incorrect worker code will just ignore anything after the first 10 items.

Anyway the difference between coroutines (e.g. bare async functions) and tasks is that tasks are independently scheduled. That is, once you've created a task it lives its life independently and you don't have to await it if you don't want its result. That is similar to Javascript's async functions.

coroutines, however, don't do anything until they are awaited, they will only progress if they are explicitelly polled and that is only done by awaiting them (directly or indirectly e.g. gather or wait will await/poll the objects they wrap).

  • Related