I wrote this script to bulk fetch status codes. It works fine but it's rate of completion speed drops massively when given a list of 1,000,000 URLs vs a list of 10,000 URLs. For 10,000 URLs, the rate is around 10,000 URLs/8 minutes but when given a 1,000,000 URLs, the rate drops to 10,000 URLs/20 minutes. Is this just a feature of async and/or how would I go about fixing it?
def config_logger(name, file):
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
handler = logging.FileHandler(file)
logger.addHandler(handler)
return logger, handler
def create_two_loggers(directory, file):
status_codes, status_codes_handler = config_logger("status_codes", f"{directory}/status_codes.csv")
valid_status_codes, valid_status_codes_handler = config_logger("valid_status_codes", f"{directory}/{file}")
return status_codes, valid_status_codes
async def get(url, sem, session, status_codes, valid_status_codes):
try:
async with sem, session.head(url=url, timeout=20, raise_for_status=True) as r:
status = r.status
if r.status <= 308:
status_codes.info(f"{url},{status}")
valid_status_codes.info(url)
if r.status > 308:
status_codes.error(f"{url},{status}")
except Exception:
status_codes.error(f"{url},{np.nan}")
async def main(directory, file, urls):
status_codes, valid_status_codes = create_two_loggers(directory, file)
sem = asyncio.BoundedSemaphore(50)
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=200), headers=headers) as session:
await tqdm.gather(*[get(url, sem, session, status_codes, valid_status_codes) for url in urls])
def fetch_status_codes(directory, file, urls):
asyncio.run(main(directory, file, urls))
if __name__ == "__main__":
fetch_status_codes()
CodePudding user response:
This evidently isn't related to the HTTP requests.
A barebones example based on yours which just grabs the semaphore, yields the coroutine and releases it:
import asyncio
import time
async def get(x, sem):
async with sem:
await asyncio.sleep(0)
async def main(n):
t0 = time.time()
sem = asyncio.BoundedSemaphore(150)
await asyncio.gather(*[get(x, sem) for x in range(n)])
dur = time.time() - t0
print(f"{n:8d} took {dur:6.2f} => {n / dur:.0f} iter/s")
if __name__ == "__main__":
for x in (1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000):
asyncio.run(main(x))
This prints out
1000 took 0.02 => 50076 iter/s
5000 took 0.11 => 46626 iter/s
10000 took 0.20 => 49599 iter/s
50000 took 1.39 => 35861 iter/s
100000 took 3.21 => 31189 iter/s
500000 took 16.19 => 30878 iter/s
1000000 took 32.72 => 30559 iter/s
(on my machine).
Changing this so there aren't as many futures to await for at the same time, i.e. chunking the input tasks list and working on smaller chunks:
import asyncio
import time
async def get(x, sem):
async with sem:
await asyncio.sleep(0)
def chunks(iterable, chunk_size):
chunk = []
for item in iterable:
chunk.append(item)
if len(chunk) == chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
async def main(n):
t0 = time.time()
sem = asyncio.BoundedSemaphore(150)
for chunk in chunks(range(n), 1500):
await asyncio.gather(*[get(x, sem) for x in chunk])
dur = time.time() - t0
print(f"{n:8d} took {dur:6.2f} => {n / dur:.0f} iter/s")
if __name__ == "__main__":
for x in (1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000):
asyncio.run(main(x))
makes things quite a bit faster.
1000 took 0.02 => 53099 iter/s
5000 took 0.09 => 54106 iter/s
10000 took 0.17 => 57405 iter/s
50000 took 0.87 => 57394 iter/s
100000 took 1.74 => 57484 iter/s
500000 took 9.19 => 54427 iter/s
1000000 took 19.15 => 52210 iter/s