Home > Net >  What causes asyncio to slow down in my script?
What causes asyncio to slow down in my script?

Time:12-14

I wrote this script to bulk fetch status codes. It works fine but it's rate of completion speed drops massively when given a list of 1,000,000 URLs vs a list of 10,000 URLs. For 10,000 URLs, the rate is around 10,000 URLs/8 minutes but when given a 1,000,000 URLs, the rate drops to 10,000 URLs/20 minutes. Is this just a feature of async and/or how would I go about fixing it?

def config_logger(name, file):
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    handler = logging.FileHandler(file)
    logger.addHandler(handler)
    return logger, handler


def create_two_loggers(directory, file):
    status_codes, status_codes_handler = config_logger("status_codes", f"{directory}/status_codes.csv")
    valid_status_codes, valid_status_codes_handler = config_logger("valid_status_codes", f"{directory}/{file}")
    return status_codes, valid_status_codes


async def get(url, sem, session, status_codes, valid_status_codes):
    try:
        async with sem, session.head(url=url, timeout=20, raise_for_status=True) as r:
            status = r.status
            if r.status <= 308:
                status_codes.info(f"{url},{status}")
                valid_status_codes.info(url)
            if r.status > 308:
                status_codes.error(f"{url},{status}")
    except Exception:
        status_codes.error(f"{url},{np.nan}")


async def main(directory, file, urls):
    status_codes, valid_status_codes = create_two_loggers(directory, file)
    sem = asyncio.BoundedSemaphore(50)
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=200), headers=headers) as session:
        await tqdm.gather(*[get(url, sem, session, status_codes, valid_status_codes) for url in urls])


def fetch_status_codes(directory, file, urls):
    asyncio.run(main(directory, file, urls))


if __name__ == "__main__":
    fetch_status_codes()

CodePudding user response:

This evidently isn't related to the HTTP requests.

A barebones example based on yours which just grabs the semaphore, yields the coroutine and releases it:

import asyncio
import time


async def get(x, sem):
    async with sem:
        await asyncio.sleep(0)


async def main(n):
    t0 = time.time()
    sem = asyncio.BoundedSemaphore(150)
    await asyncio.gather(*[get(x, sem) for x in range(n)])
    dur = time.time() - t0
    print(f"{n:8d} took {dur:6.2f} => {n / dur:.0f} iter/s")


if __name__ == "__main__":
    for x in (1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000):
        asyncio.run(main(x))

This prints out

    1000 took   0.02 => 50076 iter/s
    5000 took   0.11 => 46626 iter/s
   10000 took   0.20 => 49599 iter/s
   50000 took   1.39 => 35861 iter/s
  100000 took   3.21 => 31189 iter/s
  500000 took  16.19 => 30878 iter/s
 1000000 took  32.72 => 30559 iter/s

(on my machine).

Changing this so there aren't as many futures to await for at the same time, i.e. chunking the input tasks list and working on smaller chunks:

import asyncio
import time


async def get(x, sem):
    async with sem:
        await asyncio.sleep(0)


def chunks(iterable, chunk_size):
    chunk = []
    for item in iterable:
        chunk.append(item)
        if len(chunk) == chunk_size:
            yield chunk
            chunk = []
    if chunk:
        yield chunk


async def main(n):
    t0 = time.time()
    sem = asyncio.BoundedSemaphore(150)
    for chunk in chunks(range(n), 1500):
        await asyncio.gather(*[get(x, sem) for x in chunk])
    dur = time.time() - t0
    print(f"{n:8d} took {dur:6.2f} => {n / dur:.0f} iter/s")


if __name__ == "__main__":
    for x in (1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000):
        asyncio.run(main(x))

makes things quite a bit faster.

    1000 took   0.02 => 53099 iter/s
    5000 took   0.09 => 54106 iter/s
   10000 took   0.17 => 57405 iter/s
   50000 took   0.87 => 57394 iter/s
  100000 took   1.74 => 57484 iter/s
  500000 took   9.19 => 54427 iter/s
 1000000 took  19.15 => 52210 iter/s
  • Related