Home > Software design >  Rate limit api multi process
Rate limit api multi process

Time:09-27

I have a pretty straight forward code where I load a list if id's from a file and then iterate through each id in the list and call an api where i pass the id value and dump the api response content into a file.

I would like to speed this process up by doing parallel api calls, however the api server only allows 5 calls max per second. another key consideration is the api pull is slow, on average each call takes 10 seconds to finish.

I would like to be able to have multiple parallel process which have some way of ensuring that no more than 5 calls max occur in a single second.

This is the current code:

import pandas as pd
import numpy as np
from joblib import Parallel, delayed

ids = pd.read_csv('data.csv')

ids = ids['Id'].values.tolist()

def dump_data(df,idx):

    filename = base_dir '\\' str(idx) '.csv'
    data.to_csv(filename, header= True, index=False) #write data to file

def get_api(idx):

   data = call_some_api(idx)  #api returns data as pandas dataframe, take about 10 secs
   dump_data(df,idx)


Parallel(n_jobs=10, verbose = 50)(delayed(get_api)(idx) for idx in ids) 

I'm currently using joblib, but if there is a better library for this solution it can be used instead.

How can I ensure there will not be more than 5 requests going out at any given second? (while at he same time doing all the requests as fast as possible)

Also I'm using Python 3.9 on Windows

CodePudding user response:

You can actually use multiprocessing library to create workers to call API calls parallelly. In your case, you can actually create up to 5 workers to call API calls

CodePudding user response:

import pandas as pd
import numpy as np
from multiprocessing import Pool
from time import sleep

ids = pd.read_csv('data.csv')

ids = ids['Id'].values.tolist()

def dump_data(df,idx):

    filename = base_dir '\\' str(idx) '.csv'
    data.to_csv(filename, header= True, index=False) #write data to file

def get_api(idx):

   data = call_some_api(idx)  #api returns data as pandas dataframe, take about 10 secs
   dump_data(df,idx)

def multi(idx):
    with Pool(processes=1) as pool:
        results = pool.map(get_api, idx)
        tuple(results)

if __name__ == "__main__":
    n = len(ids)//5
    for i in range(n 1):
        ids_c = ids[5*i:5*(i 1)]
        multi(ids_c)
        sleep(1)

This must work. (tested with:

from multiprocessing import Pool
from time import sleep
from datetime import datetime
def some(n):
    print("OK")
    print(datetime.now().strftime('%S'))
    print(n)

def multi(idx):
    with Pool(processes=1) as pool:
        results = pool.map(some, idx)
        tuple(results)


if __name__ == "__main__":
    idx = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd']
    n = len(idx)//5
    for i in range(n 1):
        idx_c = idx[5*i:5*(i 1)]
        multi(idx_c)
        sleep(1)

result:

OK
04
0
OK
04
1
OK
04
2
OK
04
3
OK
04
4
OK
05
5
OK
05
6
OK
05
7
OK
05
8
OK
05
9
OK
06
a
OK
06
b
OK
06
c
OK
06
d

end of test statement)

Modify processes=1 to any number of processes as desired.

Note that you might face memory errors.

I think it is clear and easy to understand, but feel free to ask about any part that seems difficult.

CodePudding user response:

I have created two classes, RateLimitedProcessPool and RateLimitedThreadPool for mulitprocessing and multithreading respectively based on the algorithm presented in What's a good rate limiting algorithm?. These classes are like the standard mulitprocessing.pool.Pool and multiprocessing.pool.ThreadPool classes except the __init__ methods take two extra keyword arguments rate and per that together specify the maximum rate per second that the apply_async method can be called. For example, values rate=7 and per=3 implies that successive calls to apply_async will throttle so as to only allow a maximum rate of 7 calls every 3 seconds.

The following code demonstrates this with a simple worker function that emulates the OP's situation where the worker function takes 10 seconds to execute and must be limited to a maximum rate of 5 calls per second. We need to invoke this function 20 times and so the best performance we can achieve is a total run time of approximately 13 seconds.

import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps
import time

class RateLimitedPool:
    def __init__(self, rate, per):
        assert isinstance(rate, int) and rate > 0
        assert isinstance(per, int) and per > 0
        self.rate = rate
        self.per = per
        self.allowance = rate
        self.last_check = time.time()

    def _check_allowed(self):
        current = time.time()
        time_passed = current - self.last_check
        self.last_check = current
        self.allowance  = time_passed * (self.rate / self.per)
        if self.allowance > self.rate:
            self.allowance = self.rate # throttle
        if self.allowance < 1.0:
            return False
        else:
            self.allowance -= 1.0
            return True

    def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
        while not self._check_allowed():
            time.sleep(.1) # This can be fine-tuned
        return super().apply_async(func, args, kwds, callback, error_callback)

class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
    def __init__(self, *args, rate=5, per=1, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        RateLimitedPool.__init__(self, rate, per)

class RateLimitedThreadPool(RateLimitedPool, multiprocessing.pool.ThreadPool):
    def __init__(self, *args, rate=5, per=1, **kwargs):
        RateLimitedPool.__init__(self, rate, per)

def threadpool(pool):
    def decorate(f):
        @wraps(f)
        def wrap(*args, **kwargs):
            return pool.apply_async(f, args, kwargs)
        return wrap
    return decorate

def processpool(pool):
    def decorate(f):
        @wraps(f)
        def wrap(*args, **kwargs):
            return pool.apply_async(f, args, kwargs)
        return wrap
    return decorate

########################################


def worker(x):
    """
    Emulate a task that takes 10 seconds to execute.
    Cannot run more than 5 of these per second.
    """
    from datetime import datetime
    print(datetime.now(), 'x = ', x)
    time.sleep(10)

def main():
    args = range(20)
    pool = RateLimitedThreadPool(20, rate=5, per=1) # 5 per second
    start = time.time()
    for x in args:
        pool.apply_async(worker, args=(x,))
    # Wait for all tasks to complete
    pool.close()
    pool.join()
    print('Total elapsed time:', time.time() - start)

if __name__ == '__main__':
    main()

Prints:

2021-09-26 07:33:43.695578 x =  0
2021-09-26 07:33:43.751578 x =  2
2021-09-26 07:33:43.751578 x =  1
2021-09-26 07:33:43.832578 x =  3
2021-09-26 07:33:43.838578 x =  4
2021-09-26 07:33:43.842577 x =  5
2021-09-26 07:33:43.926577 x =  6
2021-09-26 07:33:44.125609 x =  7
2021-09-26 07:33:44.325608 x =  8
2021-09-26 07:33:44.526608 x =  9
2021-09-26 07:33:44.727613 x =  10
2021-09-26 07:33:44.928611 x =  11
2021-09-26 07:33:45.130629 x =  12
2021-09-26 07:33:45.331609 x =  13
2021-09-26 07:33:45.532612 x =  14
2021-09-26 07:33:45.733613 x =  15
2021-09-26 07:33:45.934609 x =  16
2021-09-26 07:33:46.136609 x =  17
2021-09-26 07:33:46.337609 x =  18
2021-09-26 07:33:46.538610 x =  19
Total elapsed time: 13.120546102523804
  • Related