I have a pretty straight forward code where I load a list if id's from a file and then iterate through each id in the list and call an api where i pass the id value and dump the api response content into a file.
I would like to speed this process up by doing parallel api calls, however the api server only allows 5 calls max per second. another key consideration is the api pull is slow, on average each call takes 10 seconds to finish.
I would like to be able to have multiple parallel process which have some way of ensuring that no more than 5 calls max occur in a single second.
This is the current code:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
ids = pd.read_csv('data.csv')
ids = ids['Id'].values.tolist()
def dump_data(df,idx):
filename = base_dir '\\' str(idx) '.csv'
data.to_csv(filename, header= True, index=False) #write data to file
def get_api(idx):
data = call_some_api(idx) #api returns data as pandas dataframe, take about 10 secs
dump_data(df,idx)
Parallel(n_jobs=10, verbose = 50)(delayed(get_api)(idx) for idx in ids)
I'm currently using joblib, but if there is a better library for this solution it can be used instead.
How can I ensure there will not be more than 5 requests going out at any given second? (while at he same time doing all the requests as fast as possible)
Also I'm using Python 3.9 on Windows
CodePudding user response:
You can actually use multiprocessing library to create workers to call API calls parallelly. In your case, you can actually create up to 5 workers to call API calls
CodePudding user response:
import pandas as pd
import numpy as np
from multiprocessing import Pool
from time import sleep
ids = pd.read_csv('data.csv')
ids = ids['Id'].values.tolist()
def dump_data(df,idx):
filename = base_dir '\\' str(idx) '.csv'
data.to_csv(filename, header= True, index=False) #write data to file
def get_api(idx):
data = call_some_api(idx) #api returns data as pandas dataframe, take about 10 secs
dump_data(df,idx)
def multi(idx):
with Pool(processes=1) as pool:
results = pool.map(get_api, idx)
tuple(results)
if __name__ == "__main__":
n = len(ids)//5
for i in range(n 1):
ids_c = ids[5*i:5*(i 1)]
multi(ids_c)
sleep(1)
This must work. (tested with:
from multiprocessing import Pool
from time import sleep
from datetime import datetime
def some(n):
print("OK")
print(datetime.now().strftime('%S'))
print(n)
def multi(idx):
with Pool(processes=1) as pool:
results = pool.map(some, idx)
tuple(results)
if __name__ == "__main__":
idx = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd']
n = len(idx)//5
for i in range(n 1):
idx_c = idx[5*i:5*(i 1)]
multi(idx_c)
sleep(1)
result:
OK
04
0
OK
04
1
OK
04
2
OK
04
3
OK
04
4
OK
05
5
OK
05
6
OK
05
7
OK
05
8
OK
05
9
OK
06
a
OK
06
b
OK
06
c
OK
06
d
end of test statement)
Modify processes=1
to any number of processes as desired.
Note that you might face memory errors.
I think it is clear and easy to understand, but feel free to ask about any part that seems difficult.
CodePudding user response:
I have created two classes, RateLimitedProcessPool
and RateLimitedThreadPool
for mulitprocessing and multithreading respectively based on the algorithm presented in What's a good rate limiting algorithm?. These classes are like the standard mulitprocessing.pool.Pool
and multiprocessing.pool.ThreadPool
classes except the __init__
methods take two extra keyword arguments rate and per that together specify the maximum rate per second that the apply_async
method can be called. For example, values rate=7 and per=3 implies that successive calls to apply_async
will throttle so as to only allow a maximum rate of 7 calls every 3 seconds.
The following code demonstrates this with a simple worker function that emulates the OP's situation where the worker function takes 10 seconds to execute and must be limited to a maximum rate of 5 calls per second. We need to invoke this function 20 times and so the best performance we can achieve is a total run time of approximately 13 seconds.
import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps
import time
class RateLimitedPool:
def __init__(self, rate, per):
assert isinstance(rate, int) and rate > 0
assert isinstance(per, int) and per > 0
self.rate = rate
self.per = per
self.allowance = rate
self.last_check = time.time()
def _check_allowed(self):
current = time.time()
time_passed = current - self.last_check
self.last_check = current
self.allowance = time_passed * (self.rate / self.per)
if self.allowance > self.rate:
self.allowance = self.rate # throttle
if self.allowance < 1.0:
return False
else:
self.allowance -= 1.0
return True
def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
while not self._check_allowed():
time.sleep(.1) # This can be fine-tuned
return super().apply_async(func, args, kwds, callback, error_callback)
class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
class RateLimitedThreadPool(RateLimitedPool, multiprocessing.pool.ThreadPool):
def __init__(self, *args, rate=5, per=1, **kwargs):
RateLimitedPool.__init__(self, rate, per)
def threadpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
def processpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
########################################
def worker(x):
"""
Emulate a task that takes 10 seconds to execute.
Cannot run more than 5 of these per second.
"""
from datetime import datetime
print(datetime.now(), 'x = ', x)
time.sleep(10)
def main():
args = range(20)
pool = RateLimitedThreadPool(20, rate=5, per=1) # 5 per second
start = time.time()
for x in args:
pool.apply_async(worker, args=(x,))
# Wait for all tasks to complete
pool.close()
pool.join()
print('Total elapsed time:', time.time() - start)
if __name__ == '__main__':
main()
Prints:
2021-09-26 07:33:43.695578 x = 0
2021-09-26 07:33:43.751578 x = 2
2021-09-26 07:33:43.751578 x = 1
2021-09-26 07:33:43.832578 x = 3
2021-09-26 07:33:43.838578 x = 4
2021-09-26 07:33:43.842577 x = 5
2021-09-26 07:33:43.926577 x = 6
2021-09-26 07:33:44.125609 x = 7
2021-09-26 07:33:44.325608 x = 8
2021-09-26 07:33:44.526608 x = 9
2021-09-26 07:33:44.727613 x = 10
2021-09-26 07:33:44.928611 x = 11
2021-09-26 07:33:45.130629 x = 12
2021-09-26 07:33:45.331609 x = 13
2021-09-26 07:33:45.532612 x = 14
2021-09-26 07:33:45.733613 x = 15
2021-09-26 07:33:45.934609 x = 16
2021-09-26 07:33:46.136609 x = 17
2021-09-26 07:33:46.337609 x = 18
2021-09-26 07:33:46.538610 x = 19
Total elapsed time: 13.120546102523804