I am preparing code for querying some endpoints. Code is ok, works quite good but it takes too much time. I would like to use Python multiprocessing
module to speed up the process. My main target is to put 12 API queries to be processed in parallel. Once jobs are processed I would like to fetch the result and put them into the list of dictionaries, one response as one dictionary in the list. API response is in json
format. I am new to Python and don't have experience in such kind of cases.
Code I want to run in parallel below.
def api_query_process(cloud_type, api_name, cloud_account, resource_type):
url = "xxx"
payload = {
"limit": 0,
"query": f'config from cloud.resource where cloud.type = \'{cloud_type}\' AND api.name = \'{api_name}\' AND '
f'cloud.account = \'{cloud_account}\'',
"timeRange": {
"relativeTimeType": "BACKWARD",
"type": "relative",
"value": {
"amount": 0,
"unit": "minute"
}
},
"withResourceJson": True
}
headers = {
"content-type": "application/json; charset=UTF-8",
"x-redlock-auth": api_token_input
}
response = requests.request("POST", url, json=payload, headers=headers)
result = response.json()
resource_count = len(result["data"]["items"])
if resource_count:
dictionary = dictionary_create(cloud_type, cloud_account, resource_type, resource_count)
property_list_summary.append(dictionary)
else:
dictionary = dictionary_create(cloud_type, cloud_account, resource_type, 0)
property_list_summary.append(dictionary)
CodePudding user response:
Interesting problem and I think you should think about idempotency. What would happen if you hit the end-point consecutively. You can use multiprocessing with or without lock.
Without Lock:
import multiprocessing
with multiprocessing.Pool(processes=12) as pool:
jobs = []
for _ in range(12):
jobs.append(pool.apply_async(api_query_process(*args))
for job in jobs:
job.wait()
With Lock:
import multiprocessing
multiprocessing_lock = multiprocessing.Lock()
def locked_api_query_process(cloud_type, api_name, cloud_account, resource_type):
with multiprocessing_lock:
api_query_process(cloud_type, api_name, cloud_account, resource_type)
with multiprocessing.Pool(processes=12) as pool:
jobs = []
for _ in range(12):
jobs.append(pool.apply_async(locked_api_query_process(*args)))
for job in jobs:
job.wait()
Can't really do an End-2-End test but hopefully this general setup helps you get it up and running.
CodePudding user response:
Since a HTTP request is an I/O Bound operation, you do not need multiprocessing. You can use threads to get a better performance. Something like the following would help.
MAX_WORKERS
would say how many requests you want to send in parallelAPI_INPUTS
are all the requests you want to make
Untested code sample:
from concurrent.futures import ThreadPoolExecutor
import requests
API_TOKEN = "xyzz"
MAX_WORKERS = 4
API_INPUTS = (
("cloud_type_one", "api_name_one", "cloud_account_one", "resource_type_one"),
("cloud_type_two", "api_name_two", "cloud_account_two", "resource_type_two"),
("cloud_type_three", "api_name_three", "cloud_account_three", "resource_type_three"),
)
def make_api_query(api_token_input, cloud_type, api_name, cloud_account):
url = "xxx"
payload = {
"limit": 0,
"query": f'config from cloud.resource where cloud.type = \'{cloud_type}\' AND api.name = \'{api_name}\' AND '
f'cloud.account = \'{cloud_account}\'',
"timeRange": {
"relativeTimeType": "BACKWARD",
"type": "relative",
"value": {
"amount": 0,
"unit": "minute"
}
},
"withResourceJson": True
}
headers = {
"content-type": "application/json; charset=UTF-8",
"x-redlock-auth": api_token_input
}
response = requests.request("POST", url, json=payload, headers=headers)
return response.json()
def main():
futures = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
for (cloud_type, api_name, cloud_account, resource_type) in API_INPUTS:
futures.append(
pool.submit(make_api_query, API_TOKEN, cloud_type, api_name, cloud_account)
)
property_list_summary = []
for future, api_input in zip(futures, API_INPUTS):
api_response = future.result()
cloud_type, api_name, cloud_account, resource_type = api_input
resource_count = len(api_response["data"]["items"])
dictionary = dictionary_create(cloud_type, cloud_account, resource_type, resource_count)
property_list_summary.append(dictionary)
CodePudding user response:
I think using async functions would help a lot in speeding this up. Your code is blocking while it waits for a response from the external API. You could use aiohttp instead of requests.