Home > Enterprise >  Python Async Limit Concurrent coroutines per second
Python Async Limit Concurrent coroutines per second

Time:11-27

My use case is the following : I’m using python 3.8

I have an async function analyse_doc that is a wrapper for a http request to a web service. I have approx 1000 docs to analyse as fast as possible. The service allows for 15 transaction per second (and not 15 concurrent request at any second). So first sec I can send 15, then 2nd sec I can send 15 again and so on. If I try to hit the service more than 15 times per sec I get 429 error msg or sometimes 503/504 error (server is busy…)

My question is : is it possible to implement smt in python that effectively sends 15 requests per sec asynchronously then wait 1 sec then do it again until the queue is empty. Also some tasks might fail. Those failing tasks might need a rerun at some point.

So far my code is the following (unbounded parallelism… not even a semaphore) but it handles retry.

tasks = {asyncio.create_task(analyse_async(doc)): doc for doc in documents}
pending = set(tasks)

# Handle retry
while pending:
    #  backoff in case of 429
    time.sleep(1)

    # concurrent call return_when all completed
    finished, pending = await asyncio.wait(
        pending, return_when=asyncio.ALL_COMPLETED
    )

    
    # check  if task has exception and register for new run.
    for task in finished:
        arg = tasks[task]

        if task.exception():
            new_task = asyncio.create_task(analyze_doc(doc))
            tasks[new_task] = doc
            pending.add(new_task)
   

CodePudding user response:

You could try adding another sleep tasks into the mix to drive the request generation. Something like this

import asyncio
import random

ONE_SECOND = 1
CONCURRENT_TASK_LIMIT = 2
TASKS_TO_CREATE = 10

loop =  asyncio.new_event_loop()

work_todo = []
work_in_progress = []

# just creates arbitrary work to do
def create_tasks():
    for i in range(TASKS_TO_CREATE):
        work_todo.append(worker_task(i))

    # muddle this up to see how drain works
    random.shuffle(work_todo)

# represents the actual work
async def worker_task(index):
    print(f"i am worker {index} and i am starting")
    await asyncio.sleep(index)
    print(f"i am worker {index} and i am done")

# gets the next 'concurrent' workload segment (if there is one)
def get_next_tasks():
    todo = []

    i = 0

    while i < CONCURRENT_TASK_LIMIT and len(work_todo) > 0:
        todo.append(work_todo.pop())
        i  = 1

    return todo

# drains down any outstanding tasks and closes the loop
async def are_we_done_yet():
    print('draining')
    
    await asyncio.gather(*work_in_progress)

    loop.stop()
    
    # closes out the program
    print('done')

# puts work on the queue every tick (1 second)
async def work():
    next_tasks = get_next_tasks()
    if len(next_tasks) > 0:
        print(f'found {len(next_tasks)} tasks to do')
        for task in next_tasks:
            # schedules the work, puts it in the in-progress pile
            work_in_progress.append(loop.create_task(task))

        # this is the 'tick' or speed work gets scheduled on
        await asyncio.sleep(ONE_SECOND)
        
        # every 'tick' we add this tasks onto the loop again unless there isn't any more to do...
        loop.create_task(work())
    else:
        # ... if there isn't any to do we just enter drain mode
        await are_we_done_yet()

# bootstrap the process
create_tasks()
loop.create_task(work())
loop.run_forever()


Updated version with a simulated exception

import asyncio
import random

ONE_SECOND = 1
CONCURRENT_TASK_LIMIT = 2
TASKS_TO_CREATE = 10

loop =  asyncio.new_event_loop()

work_todo = []
work_in_progress = []

# just creates arbitrary work to do
def create_tasks():
    for i in range(TASKS_TO_CREATE):
        work_todo.append(worker_task(i))

    # muddle this up to see how drain works
    random.shuffle(work_todo)

# represents the actual work
async def worker_task(index):
    try:
        if index % 9 == 0:
            print('simulating error')
            raise NotImplementedError("some error happened")
        
        print(f"i am worker {index} and i am starting")
        await asyncio.sleep(index)
        print(f"i am worker {index} and i am done")
    except:
        # put this work back on the pile (fudge the index so it doesn't throw this time)
        work_todo.append(worker_task(index   1))
        

# gets the next 'concurrent' workload segment (if there is one)
def get_next_tasks():
    todo = []

    i = 0

    while i < CONCURRENT_TASK_LIMIT and len(work_todo) > 0:
        todo.append(work_todo.pop())
        i  = 1

    return todo

# drains down any outstanding tasks and closes the loop
async def are_we_done_yet():
    print('draining')
    
    await asyncio.gather(*work_in_progress)

    if (len(work_todo)) > 0:
        loop.create_task(work())
        print('found some retries')
    else:
        loop.stop()
        # closes out the program
        print('done')
    
    

# puts work on the queue every tick (1 second)
async def work():
    next_tasks = get_next_tasks()
    if len(next_tasks) > 0:
        print(f'found {len(next_tasks)} tasks to do')
        for task in next_tasks:
            # schedules the work, puts it in the in-progress pile
            work_in_progress.append(loop.create_task(task))

        # this is the 'tick' or speed work gets scheduled on
        await asyncio.sleep(ONE_SECOND)
        
        # every 'tick' we add this tasks onto the loop again unless there isn't any more to do...
        loop.create_task(work())
    else:
        # ... if there isn't any to do we just enter drain mode
        await are_we_done_yet()

# bootstrap the process
create_tasks()
loop.create_task(work())
loop.run_forever()

This just simulates something going wrong and re-queues the failed task. If the error happens after the main work method has finished it won't get re-queued so in the are-we-there-yet method it would need to check and rerun any failed tasks - this isn't particularly optimal as it'll wait to drain before checking everything else but gives you an idea of an implementation

CodePudding user response:

You can do it with the help of 3rd party library.

bucketratelimiter

Please find example how to use it:

import asyncio
from typing import NamedTuple
from datetime import datetime
import time

from bucketratelimiter import AsyncioBucketTimeRateLimiter


TASKS_TO_COMPLETE = [(i, i, i) for i in range(20)]
WORKER_NUM = 30


class TestResult(NamedTuple):
    res: int
    start: str
    end: str


LIMITER = AsyncioBucketTimeRateLimiter(
    max_size=4,
    recovery_time=1,
)  # declare rate limiter here, We allow only 4 / 1 operations per second at max rate


@LIMITER  # use limiter as a decorator to rate limit some_func
async def some_func(x: int, y: int, z: int, *, time_to_complete: float = 1) -> TestResult:
    """Imagine it is a fetch function and we would like to implement rate limiter to it."""
    format_time = "%H:%M:%S"
    start = datetime.utcnow().strftime(format_time)
    await asyncio.sleep(time_to_complete)
    end = datetime.utcnow().strftime(format_time)
    result = x   y   z - z - y  # = x

    return TestResult(result, start, end)


async def worker(q: asyncio.Queue) -> None:
    """Workers which do some stuff."""
    while True:
        item = await q.get()
        res = await some_func(*item)
        print(f"Result: {res.res} | {res.start} - {res.end}")
        q.task_done()


async def main_entry_point() -> None:
    """Main entry point of our asyncio app."""
    q = asyncio.Queue()
    for task in TASKS_TO_COMPLETE:
        await q.put(task)

    tasks_to_cancel = []
    # use LIMITER as context manager to ensure its correct activation and end of work
    async with LIMITER:
        for w in [worker(q) for _ in range(1, WORKER_NUM   1)]:
            tasks_to_cancel.append(asyncio.create_task(w))

        await q.join()

    [t.cancel() for t in tasks_to_cancel]  # cancel "daemon" tasks


if __name__ == '__main__':
    start_t = time.monotonic()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main_entry_point())
    print(f"Time passed: {time.monotonic() - start_t}")

  • Related