I have a pretty straight forward code where I load a list if id's from a file and then iterate through each id in the list and call an api where i pass the id value and dump the api response content into a file.
I would like to speed this process up by doing parallel api calls, however the api server only allows 5 calls max per second. another key consideration is the api pull is slow, on average each call takes 10 seconds to finish.
I would like to be able to have multiple parallel process which have some way of ensuring that no more than 5 calls max occur in a single second.
What is the best way to go about this?
Also I'm using Python 3.9 on Windows
CodePudding user response:
Your identity is being recognized by your IP address or your token information. That means use different methods dose not helps as long as the API recognized you. I don't think multi process can help you. In my opinion you should use proxy list or another token and send each 5 requests with a proxy/token .
CodePudding user response:
You can actually use multiprocessing library to create workers to call API calls parallelly. In your case, you can actually create up to 5 workers to call API calls
CodePudding user response:
You can dribble ids into a work queue every 200 ms for the 5 per second wait limit. This can get complicated because you need a way to retry operations honoring the 5 per second rule and also slow things down if the 5 per second is still too fast.
The following code... okay lets graciously call it pseudocode, creates threads listening on a common queue and reporting results on a response queue. An id is queued and then the code waits to post the next. During that wait, the response queue is read and requests ids that failed. Eventually all ids are processed, and the loops terminate.
Since all of the worker communication is done through queues, this could be adapted to multiple processes as well. If the code doesn't do any real processing of the data, stick with threads.
def worker(worker_id, cmd_q, resp_q):
while True:
_id = cmd_q.get()
if _id is None: # asked to terminate
return
try:
# returns 0 for success, 1 for retry, 2 for flames
status = do_the_work(_id)
except (reoverable errors):
status = 1
except:
log_the_bad()
status = 2
cmd_q.task_done()
resp_q.put((worker_id, _id, status))
def runner(ids_to_process):
# wait between commands
dribble_delay = 1/5
# prefilled queue of ids needing processing
id_queue = queue.Queue()
for _id in ids_to_process:
id_queue.push(_id)
# queues for command to and response from threads
cmd_q = queue.Queue()
resp_q = queue.Queue()
num_threads = min(len(ids_to_process), 20) # just a guess
threads = []
for worker_id in range(num_threads):
t = Thread(target=worker, args=(worker_id, cmd_q, resp_q))
t.start()
threads.append(t)
# in process work item count
work_count = 0
# send ids to process, get responsess
while True:
try:
# get and send the next id to work on
cmd = id_queue.get_nowait()
cmd_q.put(cmd)
work_count = 1
except queue.Empty:
# nothing in work queue right now
pass
if not work_count:
# no ids left to process
break
# use send delay to process responses
wait_stamp = time.time() dribble_delay
while True:
try:
worker_id, _id, status = resp_q.get(dribble_delay-time.time())
if status == 1:
# requeue on fail
id_queue.put(_id)
dribble_delay = .05 # add more delay back pressure
work_count -= 1
except queue.Empty:
# timeout expired, go to next
break
# done, close threads
for _ in ids_to_process:
cmd_q.put(None)
# wait for threads to finish
for t in threads:
t.join()