Home > Blockchain >  Python - multiprocessing - API query
Python - multiprocessing - API query

Time:12-29

I am preparing code for querying some endpoints. Code is ok, works quite good but it takes too much time. I would like to use Python multiprocessing module to speed up the process. My main target is to put 12 API queries to be processed in parallel. Once jobs are processed I would like to fetch the result and put them into the list of dictionaries, one response as one dictionary in the list. API response is in json format. I am new to Python and don't have experience in such kind of cases. Code I want to run in parallel below.

def api_query_process(cloud_type, api_name, cloud_account, resource_type):

    url = "xxx"
    payload = {
        "limit": 0,
        "query": f'config from cloud.resource where cloud.type = \'{cloud_type}\' AND api.name = \'{api_name}\' AND '
                 f'cloud.account = \'{cloud_account}\'',
        "timeRange": {
            "relativeTimeType": "BACKWARD",
            "type": "relative",
            "value": {
                "amount": 0,
                "unit": "minute"
            }
        },
        "withResourceJson": True
    }

    headers = {
        "content-type": "application/json; charset=UTF-8",
        "x-redlock-auth": api_token_input
    }

    response = requests.request("POST", url, json=payload, headers=headers)

    result = response.json()
    resource_count = len(result["data"]["items"])

    if resource_count:
        dictionary = dictionary_create(cloud_type, cloud_account, resource_type, resource_count)
        property_list_summary.append(dictionary)
    else:
        dictionary = dictionary_create(cloud_type, cloud_account, resource_type, 0)
        property_list_summary.append(dictionary)

CodePudding user response:

Interesting problem and I think you should think about idempotency. What would happen if you hit the end-point consecutively. You can use multiprocessing with or without lock.

Without Lock:

import multiprocessing

with multiprocessing.Pool(processes=12) as pool:
    jobs = []
    for _ in range(12):
        jobs.append(pool.apply_async(api_query_process(*args))
    for job in jobs:
        job.wait()

With Lock:

import multiprocessing

multiprocessing_lock = multiprocessing.Lock()

def locked_api_query_process(cloud_type, api_name, cloud_account, resource_type):
    with multiprocessing_lock:
        api_query_process(cloud_type, api_name, cloud_account, resource_type)

with multiprocessing.Pool(processes=12) as pool:
    jobs = []
    for _ in range(12):
        jobs.append(pool.apply_async(locked_api_query_process(*args)))
    for job in jobs:
        job.wait()

Can't really do an End-2-End test but hopefully this general setup helps you get it up and running.

CodePudding user response:

Since a HTTP request is an I/O Bound operation, you do not need multiprocessing. You can use threads to get a better performance. Something like the following would help.

  • MAX_WORKERS would say how many requests you want to send in parallel
  • API_INPUTS are all the requests you want to make

Untested code sample:

from concurrent.futures import ThreadPoolExecutor

import requests


API_TOKEN = "xyzz"
MAX_WORKERS = 4
API_INPUTS = (
    ("cloud_type_one", "api_name_one", "cloud_account_one", "resource_type_one"),
    ("cloud_type_two", "api_name_two", "cloud_account_two", "resource_type_two"),
    ("cloud_type_three", "api_name_three", "cloud_account_three", "resource_type_three"),
)


def make_api_query(api_token_input, cloud_type, api_name, cloud_account):
    url = "xxx"
    payload = {
        "limit": 0,
        "query": f'config from cloud.resource where cloud.type = \'{cloud_type}\' AND api.name = \'{api_name}\' AND '
                 f'cloud.account = \'{cloud_account}\'',
        "timeRange": {
            "relativeTimeType": "BACKWARD",
            "type": "relative",
            "value": {
                "amount": 0,
                "unit": "minute"
            }
        },
        "withResourceJson": True
    }
    headers = {
        "content-type": "application/json; charset=UTF-8",
        "x-redlock-auth": api_token_input
    }
    response = requests.request("POST", url, json=payload, headers=headers)
    return response.json()


def main():
    futures = []
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
        for (cloud_type, api_name, cloud_account, resource_type) in API_INPUTS:
            futures.append(
                pool.submit(make_api_query, API_TOKEN, cloud_type, api_name, cloud_account)
            )

    property_list_summary = []

    for future, api_input in zip(futures, API_INPUTS):
        api_response = future.result()
        cloud_type, api_name, cloud_account, resource_type = api_input

        resource_count = len(api_response["data"]["items"])
        dictionary = dictionary_create(cloud_type, cloud_account, resource_type, resource_count)
        property_list_summary.append(dictionary)

CodePudding user response:

I think using async functions would help a lot in speeding this up. Your code is blocking while it waits for a response from the external API. You could use aiohttp instead of requests.

  • Related