My use case is the following : I’m using python 3.8
I have an async function analyse_doc that is a wrapper for a http request to a web service. I have approx 1000 docs to analyse as fast as possible. The service allows for 15 transaction per second (and not 15 concurrent request at any second). So first sec I can send 15, then 2nd sec I can send 15 again and so on. If I try to hit the service more than 15 times per sec I get 429 error msg or sometimes 503/504 error (server is busy…)
My question is : is it possible to implement smt in python that effectively sends 15 requests per sec asynchronously then wait 1 sec then do it again until the queue is empty. Also some tasks might fail. Those failing tasks might need a rerun at some point.
So far my code is the following (unbounded parallelism… not even a semaphore) but it handles retry.
tasks = {asyncio.create_task(analyse_async(doc)): doc for doc in documents}
pending = set(tasks)
# Handle retry
while pending:
# backoff in case of 429
time.sleep(1)
# concurrent call return_when all completed
finished, pending = await asyncio.wait(
pending, return_when=asyncio.ALL_COMPLETED
)
# check if task has exception and register for new run.
for task in finished:
arg = tasks[task]
if task.exception():
new_task = asyncio.create_task(analyze_doc(doc))
tasks[new_task] = doc
pending.add(new_task)
CodePudding user response:
You could try adding another sleep
tasks into the mix to drive the request generation. Something like this
import asyncio
import random
ONE_SECOND = 1
CONCURRENT_TASK_LIMIT = 2
TASKS_TO_CREATE = 10
loop = asyncio.new_event_loop()
work_todo = []
work_in_progress = []
# just creates arbitrary work to do
def create_tasks():
for i in range(TASKS_TO_CREATE):
work_todo.append(worker_task(i))
# muddle this up to see how drain works
random.shuffle(work_todo)
# represents the actual work
async def worker_task(index):
print(f"i am worker {index} and i am starting")
await asyncio.sleep(index)
print(f"i am worker {index} and i am done")
# gets the next 'concurrent' workload segment (if there is one)
def get_next_tasks():
todo = []
i = 0
while i < CONCURRENT_TASK_LIMIT and len(work_todo) > 0:
todo.append(work_todo.pop())
i = 1
return todo
# drains down any outstanding tasks and closes the loop
async def are_we_done_yet():
print('draining')
await asyncio.gather(*work_in_progress)
loop.stop()
# closes out the program
print('done')
# puts work on the queue every tick (1 second)
async def work():
next_tasks = get_next_tasks()
if len(next_tasks) > 0:
print(f'found {len(next_tasks)} tasks to do')
for task in next_tasks:
# schedules the work, puts it in the in-progress pile
work_in_progress.append(loop.create_task(task))
# this is the 'tick' or speed work gets scheduled on
await asyncio.sleep(ONE_SECOND)
# every 'tick' we add this tasks onto the loop again unless there isn't any more to do...
loop.create_task(work())
else:
# ... if there isn't any to do we just enter drain mode
await are_we_done_yet()
# bootstrap the process
create_tasks()
loop.create_task(work())
loop.run_forever()
Updated version with a simulated exception
import asyncio
import random
ONE_SECOND = 1
CONCURRENT_TASK_LIMIT = 2
TASKS_TO_CREATE = 10
loop = asyncio.new_event_loop()
work_todo = []
work_in_progress = []
# just creates arbitrary work to do
def create_tasks():
for i in range(TASKS_TO_CREATE):
work_todo.append(worker_task(i))
# muddle this up to see how drain works
random.shuffle(work_todo)
# represents the actual work
async def worker_task(index):
try:
if index % 9 == 0:
print('simulating error')
raise NotImplementedError("some error happened")
print(f"i am worker {index} and i am starting")
await asyncio.sleep(index)
print(f"i am worker {index} and i am done")
except:
# put this work back on the pile (fudge the index so it doesn't throw this time)
work_todo.append(worker_task(index 1))
# gets the next 'concurrent' workload segment (if there is one)
def get_next_tasks():
todo = []
i = 0
while i < CONCURRENT_TASK_LIMIT and len(work_todo) > 0:
todo.append(work_todo.pop())
i = 1
return todo
# drains down any outstanding tasks and closes the loop
async def are_we_done_yet():
print('draining')
await asyncio.gather(*work_in_progress)
if (len(work_todo)) > 0:
loop.create_task(work())
print('found some retries')
else:
loop.stop()
# closes out the program
print('done')
# puts work on the queue every tick (1 second)
async def work():
next_tasks = get_next_tasks()
if len(next_tasks) > 0:
print(f'found {len(next_tasks)} tasks to do')
for task in next_tasks:
# schedules the work, puts it in the in-progress pile
work_in_progress.append(loop.create_task(task))
# this is the 'tick' or speed work gets scheduled on
await asyncio.sleep(ONE_SECOND)
# every 'tick' we add this tasks onto the loop again unless there isn't any more to do...
loop.create_task(work())
else:
# ... if there isn't any to do we just enter drain mode
await are_we_done_yet()
# bootstrap the process
create_tasks()
loop.create_task(work())
loop.run_forever()
This just simulates something going wrong and re-queues
the failed task. If the error happens after the main work
method has finished it won't get re-queued so in the are-we-there-yet
method it would need to check and rerun any failed tasks - this isn't particularly optimal as it'll wait to drain before checking everything else but gives you an idea of an implementation
CodePudding user response:
You can do it with the help of 3rd party library.
Please find example how to use it:
import asyncio
from typing import NamedTuple
from datetime import datetime
import time
from bucketratelimiter import AsyncioBucketTimeRateLimiter
TASKS_TO_COMPLETE = [(i, i, i) for i in range(20)]
WORKER_NUM = 30
class TestResult(NamedTuple):
res: int
start: str
end: str
LIMITER = AsyncioBucketTimeRateLimiter(
max_size=4,
recovery_time=1,
) # declare rate limiter here, We allow only 4 / 1 operations per second at max rate
@LIMITER # use limiter as a decorator to rate limit some_func
async def some_func(x: int, y: int, z: int, *, time_to_complete: float = 1) -> TestResult:
"""Imagine it is a fetch function and we would like to implement rate limiter to it."""
format_time = "%H:%M:%S"
start = datetime.utcnow().strftime(format_time)
await asyncio.sleep(time_to_complete)
end = datetime.utcnow().strftime(format_time)
result = x y z - z - y # = x
return TestResult(result, start, end)
async def worker(q: asyncio.Queue) -> None:
"""Workers which do some stuff."""
while True:
item = await q.get()
res = await some_func(*item)
print(f"Result: {res.res} | {res.start} - {res.end}")
q.task_done()
async def main_entry_point() -> None:
"""Main entry point of our asyncio app."""
q = asyncio.Queue()
for task in TASKS_TO_COMPLETE:
await q.put(task)
tasks_to_cancel = []
# use LIMITER as context manager to ensure its correct activation and end of work
async with LIMITER:
for w in [worker(q) for _ in range(1, WORKER_NUM 1)]:
tasks_to_cancel.append(asyncio.create_task(w))
await q.join()
[t.cancel() for t in tasks_to_cancel] # cancel "daemon" tasks
if __name__ == '__main__':
start_t = time.monotonic()
loop = asyncio.get_event_loop()
loop.run_until_complete(main_entry_point())
print(f"Time passed: {time.monotonic() - start_t}")