Concretely, I'm using Flask to process a request, pseudocode like this:
from flask import Flask, request
app = Flask(__name__)
@app.route("/foo", methods=["POST"])
def foo():
data = request.get_json() # {"request_id": "abc", "data": "some text"}
result_a = do_task_a(data) # returns {"result_a": "a"}, maybe about 1 second to finish
result_b = do_task_b(data) # returns {"result_b": "b"}, maybe about 1 second to finish
result_c = do_task_c(data) # returns {"result_c": "c"}, maybe about 1 second to finish
result = {
"result_a": result_a["result_a"],
"result_b": result_b["result_b"],
"result_c": result_c["result_c"]}
return result
app.run(host='0.0.0.0', port=4000, threaded=False)
Here, do_task_a
, do_task_b
, do_task_c
are completely independent subtasks, I know I can use multiprocessing.Process
to create processes to finish these three subtasks, and use join()
to wait for subtask done, But I don't know it's proper way to create Process
for every request?
Maybe I can use multiprocessing.Queue
to help, But I don't find a good way.
I search for multiprocessing, but can't figure out a good solution.
CodePudding user response:
I'm not a python guy, but indeed creating processes is sn expensive operation If its possible - create threads they're cheaper than processes.
If you run the request multiple times - you can do even better than that, because creating threads per request is still quite expensive
Even more advanced setup is to create a "pre-loaded" thread pool. Like N threads that you always keep in memory ready for running arriving task.
In terms of technical solution I've found This article that explains how to create thread pools in python 3.2
CodePudding user response:
Thanks to @mark, I found this way can work, but I'm not sure. Can someone check me out? Thanks
from concurrent.futures import ProcessPoolExecutor
import random
import time
from flask import Flask, request
def do_task_a(data):
time.sleep(random.random())
res = {"result_a": data["data"] 10}
return res
def do_task_b(data):
time.sleep(random.random())
res = {"result_b": data["data"] 10}
return res
class Scheduler:
def __init__(self, names, funcs, pools):
self.names = names
self.funcs = funcs
self.pools = pools
self.num_executors = len(funcs)
scheduler = Scheduler(
names=["do_task_a", "do_task_b"],
funcs=[do_task_a, do_task_b],
pools=[ProcessPoolExecutor(1), ProcessPoolExecutor(1)]
)
app = Flask(__name__)
@app.route("/foo", methods=["POST"])
def foo():
data = request.get_json() # {"request_id": "abc", "data": "some text"}
task_results = []
futures = []
time.sleep(random.random())
for i in range(scheduler.num_executors):
future = scheduler.pools[i].submit(scheduler.funcs[i], data)
futures.append(future)
for i in range(scheduler.num_executors):
task_results.append(futures[i].result())
result = dict()
for task_result in task_results:
result.update(task_result)
result["request_id"] = data["request_id"]
return result
if __name__ == "__main__":
app.run(host='0.0.0.0', port=4000, threaded=False)