Home > Software design >  How to speed up async requests in Python
How to speed up async requests in Python

Time:02-28

I want to download/scrape 50 million log records from a site. Instead of downloading 50 million in one go, I was trying to download it in parts like 10 million at a time using the following code but it's only handling 20,000 at a time (more than that throws an error) so it becomes time-consuming to download that much data. Currently, it takes 3-4 mins to download 20,000 records with the speed of 100%|██████████| 20000/20000 [03:48<00:00, 87.41it/s] so how to speed it up?

import asyncio
import aiohttp
import time
import tqdm
import nest_asyncio

nest_asyncio.apply()


async def make_numbers(numbers, _numbers):
    for i in range(numbers, _numbers):
        yield i


n = 0
q = 10000000


async def fetch():
    # example
    url = "https://httpbin.org/anything/log?id="

    async with aiohttp.ClientSession() as session:
        post_tasks = []
        # prepare the coroutines that poat
        async for x in make_numbers(n, q):
            post_tasks.append(do_get(session, url, x))
        # now execute them all at once

        responses = [await f for f in tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]


async def do_get(session, url, x):
    headers = {
        'Content-Type': "application/x-www-form-urlencoded",
        'Access-Control-Allow-Origin': "*",
        'Accept-Encoding': "gzip, deflate",
        'Accept-Language': "en-US"
    }

    async with session.get(url   str(x), headers=headers) as response:
        data = await response.text()
        print(data)


s = time.perf_counter()
try:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(fetch())
except:
    print("error")

elapsed = time.perf_counter() - s
# print(f"{__file__} executed in {elapsed:0.2f} seconds.")

CodePudding user response:

Bottleneck: number of simultaneous connections

First, the bottleneck is the total number of simultaneous connections in the TCP connector.

That default for aiohttp.TCPConnector is limit=100. On most systems (tested on macOS), you should be able to double that by passing a connector with limit=200:

# async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=200)) as session:

The time taken should decrease significantly. (On macOS: q = 20_000 decreased 43% from 58 seconds to 33 seconds, and q = 10_000 decreased 42% from 31 to 18 seconds.)

The limit you can configure depends on the number of file descriptors that your machine can open. (On macOS: You can run ulimit -n to check, and ulimit -n 1024 to increase to 1024 for the current terminal session, and then change to limit=1000. Compared to limit=100, q = 20_000 decreased 76% to 14 seconds, and q = 10_000 decreased 71% to 9 seconds.)

Supporting 50 million requests: async generators

Next, the reason why 50 million requests appears to hang is simply because of its sheer number.

Just creating 10 million coroutines in post_tasks takes 68-98 seconds (varies greatly on my machine), and then the event loop is further burdened with that many tasks, 99.99% of which are blocked by the TCP connection pool.

We can defer the creation of coroutines using an async generator:

async def make_async_gen(f, n, q):
    async for x in make_numbers(n, q):
        yield f(x)

We need a counterpart to asyncio.as_completed() to handle async_gen and concurrency:

from asyncio import ensure_future, events
from asyncio.queues import Queue

def as_completed_for_async_gen(fs_async_gen, concurrency):
    done = Queue()
    loop = events.get_event_loop()
    # todo = {ensure_future(f, loop=loop) for f in set(fs)}  # -
    todo = set()                                             #  

    def _on_completion(f):
        todo.remove(f)
        done.put_nowait(f)
        loop.create_task(_add_next())  #  

    async def _wait_for_one():
        f = await done.get()
        return f.result()

    async def _add_next():  #  
        try:
            f = await fs_async_gen.__anext__()
        except StopAsyncIteration:
            return
        f = ensure_future(f, loop=loop)
        f.add_done_callback(_on_completion)
        todo.add(f)

    # for f in todo:                           # -
    #     f.add_done_callback(_on_completion)  # -
    # for _ in range(len(todo)):               # -
    #     yield _wait_for_one()                # -
    for _ in range(concurrency):               #  
        loop.run_until_complete(_add_next())   #  
    while todo:                                #  
        yield _wait_for_one()                  #  

Then, we update fetch():

from functools import partial

CONCURRENCY = 200  #  

n = 0
q = 50_000_000

async def fetch():
    # example
    url = "https://httpbin.org/anything/log?id="

    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=CONCURRENCY)) as session:
        # post_tasks = []                                                # -
        # # prepare the coroutines that post                             # -
        # async for x in make_numbers(n, q):                             # -
        #     post_tasks.append(do_get(session, url, x))                 # -
        # Prepare the coroutines generator                               #  
        async_gen = make_async_gen(partial(do_get, session, url), n, q)  #  

        # now execute them all at once                                                                         # -
        # responses = [await f for f in tqdm.asyncio.tqdm.as_completed(post_tasks, total=len(post_tasks))]     # -
        # Now execute them with a specified concurrency                                                        #  
        responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]  #  

Other limitations

With the above, the program can start processing 50 million requests but:

  1. it will still take 8 hours or so with CONCURRENCY = 1000, based on the estimate from tqdm.
  2. your program may run out of memory for responses and crash.

For point 2, you should probably do:

# responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]
for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q):
    response = await f
    
    # Do something with response, such as writing to a local file
    # ...

CodePudding user response:

If it's not the bandwidth that limits you (but I cannot check this), there is a solution less complicated than the celery and rabbitmq but it is not as scalable as the celery and rabbitmq, it will be limited by your number of CPU.

Instead of splitting calls on celery workers, you split them on multiple processes.

I modified the fetch function like this:

async def fetch(start, end):
    # example
    url = "https://httpbin.org/anything/log?id="
    async with aiohttp.ClientSession() as session:
        post_tasks = []
        # prepare the coroutines that poat
        # use start and end arguments here!
        async for x in make_numbers(start, end):
            post_tasks.append(do_get(session, url, x))
        # now execute them all at once

        responses = [await f for f in
                     tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]

and I modified the main processes:

import concurrent.futures
from itertools import count

def one_executor(start, end):
    loop = asyncio.new_event_loop()
    try:
        loop.run_until_complete(fetch(start, end))
    except:
        print("error")


if __name__ == '__main__':

    s = time.perf_counter()
    # Change the value to the number of core you want to use.
    max_worker = 4
    length_by_executor = q // max_worker
    with concurrent.futures.ProcessPoolExecutor(max_workers=max_worker) as executor:
        for index_min in count(0, length_by_executor):
            # no matter with duplicated indexes due to the use of 
            # range in make_number function.
            index_max = min(index_min   length_by_executor, q)
            executor.submit(one_executor, index_min, index_max)
            if index_max == q:
                break

    elapsed = time.perf_counter() - s
    print(f"executed in {elapsed:0.2f} seconds.")

Here the result I get (with the value of q set to 10_000):

1 worker: executed in 13.90 seconds.
2 workers: executed in 7.24 seconds.
3 workers: executed in 6.82 seconds.

I don't work on the tqdm progress bar, with the current solution, two bars will be displayed (but I think tqdm works well with multi processes).

  • Related