I would like to measure the runtime of a service (API) which I can call in parallel.
What is the proper way of benchmarking and using concurrent.futures
(multiprocessing/multithreading)?
We can submit()
tasks then once a task is done we can check it with concurrent.futures.as_completed()
- do we need the as_completed()
part, as that should return the results for us, the submit()
just starts the task?
Based on my dummy code it looks like it is not needed, but then why as_completed()
is not needed? (I thought submit()
puts the task in a queue, and does not wait for response, and with as_completed()
you'll get the completed tasks with results).
Let's cal the service my_job()
:
import concurrent.futures
import time
from typing import List
def _my_job(t: int):
time.sleep(t)
def measure(N_requ: int, workers: int, wait_time: int, include_as_completed: bool):
futures = []
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
for _ in range(N_requ):
f = executor.submit(_my_job, wait_time)
futures.append(f)
if include_as_completed:
for f in concurrent.futures.as_completed(futures):
try:
_ = f.result()
except:
print("This should not happen...")
pass
end_time = time.time()
elapsed_time: float = end_time - start_time
print(f"Elpsed time: {elapsed_time} | n_workers: {workers}, n_requs: {N_requ}, job's wait_time: {wait_time}")
if __name__ == "__main__":
N = 20
n_workers: List[int] = [1, 4, 16, 32]
print("Without as_completed:")
for w in n_workers:
measure(N, workers=w, wait_time=1, include_as_completed=False)
print("With as_completed:")
for w in n_workers:
measure(N, workers=w, wait_time=1, include_as_completed=True)
Results:
Without as_completed:
Elpsed time: 20.05830979347229 | n_workers: 1, n_requs: 20, job's wait_time: 1
Elpsed time: 5.012893199920654 | n_workers: 4, n_requs: 20, job's wait_time: 1
Elpsed time: 2.0075199604034424 | n_workers: 16, n_requs: 20, job's wait_time: 1
Elpsed time: 1.0035851001739502 | n_workers: 32, n_requs: 20, job's wait_time: 1
With as_completed:
Elpsed time: 20.044990062713623 | n_workers: 1, n_requs: 20, job's wait_time: 1
Elpsed time: 5.009936809539795 | n_workers: 4, n_requs: 20, job's wait_time: 1
Elpsed time: 2.0069048404693604 | n_workers: 16, n_requs: 20, job's wait_time: 1
Elpsed time: 1.0069079399108887 | n_workers: 32, n_requs: 20, job's wait_time: 1
CodePudding user response:
You're using the executor as a context manager. At the end of the with
block, you automatically shut down the executor, waiting for all pending futures to complete.
See the docs for Executor.shutdown
, particularly the part about with
behavior.