Home > Net >  Python manage parallel api calls
Python manage parallel api calls

Time:09-24

I have a pretty straight forward code where I load a list if id's from a file and then iterate through each id in the list and call an api where i pass the id value and dump the api response content into a file.

I would like to speed this process up by doing parallel api calls, however the api server only allows 5 calls max per second. another key consideration is the api pull is slow, on average each call takes 10 seconds to finish.

I would like to be able to have multiple parallel process which have some way of ensuring that no more than 5 calls max occur in a single second.

What is the best way to go about this?

Also I'm using Python 3.9 on Windows

CodePudding user response:

Your identity is being recognized by your IP address or your token information. That means use different methods dose not helps as long as the API recognized you. I don't think multi process can help you. In my opinion you should use proxy list or another token and send each 5 requests with a proxy/token .

CodePudding user response:

You can actually use multiprocessing library to create workers to call API calls parallelly. In your case, you can actually create up to 5 workers to call API calls

CodePudding user response:

You can dribble ids into a work queue every 200 ms for the 5 per second wait limit. This can get complicated because you need a way to retry operations honoring the 5 per second rule and also slow things down if the 5 per second is still too fast.

The following code... okay lets graciously call it pseudocode, creates threads listening on a common queue and reporting results on a response queue. An id is queued and then the code waits to post the next. During that wait, the response queue is read and requests ids that failed. Eventually all ids are processed, and the loops terminate.

Since all of the worker communication is done through queues, this could be adapted to multiple processes as well. If the code doesn't do any real processing of the data, stick with threads.

def worker(worker_id, cmd_q, resp_q):
    while True:
        _id = cmd_q.get()
        if _id is None: # asked to terminate
            return
        try:
            # returns 0 for success, 1 for retry, 2 for flames
            status = do_the_work(_id)
        except (reoverable errors):
            status = 1
        except:
            log_the_bad()
            status = 2
        cmd_q.task_done()
        resp_q.put((worker_id, _id, status))
        
def runner(ids_to_process):
    # wait between commands
    dribble_delay = 1/5
    # prefilled queue of ids needing processing
    id_queue = queue.Queue()
    for _id in ids_to_process:
        id_queue.push(_id)
    # queues for command to and response from threads
    cmd_q = queue.Queue()
    resp_q = queue.Queue()
    num_threads = min(len(ids_to_process), 20) # just a guess
    threads = []
    for worker_id in range(num_threads):
        t = Thread(target=worker, args=(worker_id, cmd_q, resp_q))
        t.start()
        threads.append(t)

    # in process work item count
    work_count = 0

    # send ids to process, get responsess
    while True:
        try:
            # get and send the next id to work on
            cmd = id_queue.get_nowait()
            cmd_q.put(cmd)
            work_count  = 1
        except queue.Empty:
            # nothing in work queue right now
            pass
        if not work_count:
            # no ids left to process
            break
        # use send delay to process responses
        wait_stamp = time.time()   dribble_delay
        while True:
            try:
                worker_id, _id, status = resp_q.get(dribble_delay-time.time())
                if status == 1:
                    # requeue on fail
                    id_queue.put(_id)
                    dribble_delay  = .05 # add more delay back pressure
                work_count -= 1
            except queue.Empty:
                # timeout expired, go to next
                break

        # done, close threads
        for _ in ids_to_process:
            cmd_q.put(None)
        # wait for threads to finish
        for t in threads:
            t.join()
  • Related