I want to download/scrape 50 million log records from a site. Instead of downloading 50 million in one go, I was trying to download it in parts like 10 million at a time using the following code but it's only handling 20,000 at a time (more than that throws an error) so it becomes time-consuming to download that much data. Currently, it takes 3-4 mins to download 20,000 records with the speed of 100%|██████████| 20000/20000 [03:48<00:00, 87.41it/s]
so how to speed it up?
import asyncio
import aiohttp
import time
import tqdm
import nest_asyncio
nest_asyncio.apply()
async def make_numbers(numbers, _numbers):
for i in range(numbers, _numbers):
yield i
n = 0
q = 10000000
async def fetch():
# example
url = "https://httpbin.org/anything/log?id="
async with aiohttp.ClientSession() as session:
post_tasks = []
# prepare the coroutines that poat
async for x in make_numbers(n, q):
post_tasks.append(do_get(session, url, x))
# now execute them all at once
responses = [await f for f in tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]
async def do_get(session, url, x):
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Access-Control-Allow-Origin': "*",
'Accept-Encoding': "gzip, deflate",
'Accept-Language': "en-US"
}
async with session.get(url str(x), headers=headers) as response:
data = await response.text()
print(data)
s = time.perf_counter()
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch())
except:
print("error")
elapsed = time.perf_counter() - s
# print(f"{__file__} executed in {elapsed:0.2f} seconds.")
CodePudding user response:
Bottleneck: number of simultaneous connections
First, the bottleneck is the total number of simultaneous connections in the TCP connector.
That default for aiohttp.TCPConnector
is limit=100
. On most systems (tested on macOS), you should be able to double that by passing a connector
with limit=200
:
# async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=200)) as session:
The time taken should decrease significantly. (On macOS: q = 20_000
decreased 43% from 58 seconds to 33 seconds, and q = 10_000
decreased 42% from 31 to 18 seconds.)
The limit
you can configure depends on the number of file descriptors that your machine can open. (On macOS: You can run ulimit -n
to check, and ulimit -n 1024
to increase to 1024 for the current terminal session, and then change to limit=1000
. Compared to limit=100
, q = 20_000
decreased 76% to 14 seconds, and q = 10_000
decreased 71% to 9 seconds.)
Supporting 50 million requests: async generators
Next, the reason why 50 million requests appears to hang is simply because of its sheer number.
Just creating 10 million coroutines in post_tasks
takes 68-98 seconds (varies greatly on my machine), and then the event loop is further burdened with that many tasks, 99.99% of which are blocked by the TCP connection pool.
We can defer the creation of coroutines using an async generator:
async def make_async_gen(f, n, q):
async for x in make_numbers(n, q):
yield f(x)
We need a counterpart to asyncio.as_completed()
to handle async_gen
and concurrency
:
from asyncio import ensure_future, events
from asyncio.queues import Queue
def as_completed_for_async_gen(fs_async_gen, concurrency):
done = Queue()
loop = events.get_event_loop()
# todo = {ensure_future(f, loop=loop) for f in set(fs)} # -
todo = set() #
def _on_completion(f):
todo.remove(f)
done.put_nowait(f)
loop.create_task(_add_next()) #
async def _wait_for_one():
f = await done.get()
return f.result()
async def _add_next(): #
try:
f = await fs_async_gen.__anext__()
except StopAsyncIteration:
return
f = ensure_future(f, loop=loop)
f.add_done_callback(_on_completion)
todo.add(f)
# for f in todo: # -
# f.add_done_callback(_on_completion) # -
# for _ in range(len(todo)): # -
# yield _wait_for_one() # -
for _ in range(concurrency): #
loop.run_until_complete(_add_next()) #
while todo: #
yield _wait_for_one() #
Then, we update fetch()
:
from functools import partial
CONCURRENCY = 200 #
n = 0
q = 50_000_000
async def fetch():
# example
url = "https://httpbin.org/anything/log?id="
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=CONCURRENCY)) as session:
# post_tasks = [] # -
# # prepare the coroutines that post # -
# async for x in make_numbers(n, q): # -
# post_tasks.append(do_get(session, url, x)) # -
# Prepare the coroutines generator #
async_gen = make_async_gen(partial(do_get, session, url), n, q) #
# now execute them all at once # -
# responses = [await f for f in tqdm.asyncio.tqdm.as_completed(post_tasks, total=len(post_tasks))] # -
# Now execute them with a specified concurrency #
responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)] #
Other limitations
With the above, the program can start processing 50 million requests but:
- it will still take 8 hours or so with
CONCURRENCY = 1000
, based on the estimate fromtqdm
. - your program may run out of memory for
responses
and crash.
For point 2, you should probably do:
# responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]
for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q):
response = await f
# Do something with response, such as writing to a local file
# ...
CodePudding user response:
If it's not the bandwidth that limits you (but I cannot check this), there is a solution less complicated than the celery and rabbitmq but it is not as scalable as the celery and rabbitmq, it will be limited by your number of CPU.
Instead of splitting calls on celery workers, you split them on multiple processes.
I modified the fetch
function like this:
async def fetch(start, end):
# example
url = "https://httpbin.org/anything/log?id="
async with aiohttp.ClientSession() as session:
post_tasks = []
# prepare the coroutines that poat
# use start and end arguments here!
async for x in make_numbers(start, end):
post_tasks.append(do_get(session, url, x))
# now execute them all at once
responses = [await f for f in
tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]
and I modified the main processes:
import concurrent.futures
from itertools import count
def one_executor(start, end):
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(fetch(start, end))
except:
print("error")
if __name__ == '__main__':
s = time.perf_counter()
# Change the value to the number of core you want to use.
max_worker = 4
length_by_executor = q // max_worker
with concurrent.futures.ProcessPoolExecutor(max_workers=max_worker) as executor:
for index_min in count(0, length_by_executor):
# no matter with duplicated indexes due to the use of
# range in make_number function.
index_max = min(index_min length_by_executor, q)
executor.submit(one_executor, index_min, index_max)
if index_max == q:
break
elapsed = time.perf_counter() - s
print(f"executed in {elapsed:0.2f} seconds.")
Here the result I get (with the value of q
set to 10_000
):
1 worker: executed in 13.90 seconds.
2 workers: executed in 7.24 seconds.
3 workers: executed in 6.82 seconds.
I don't work on the tqdm
progress bar, with the current solution, two bars will be displayed (but I think tqdm works well with multi processes).