Home > Enterprise >  how to use Flask with multiprocessing
how to use Flask with multiprocessing

Time:11-27

Concretely, I'm using Flask to process a request, pseudocode like this:

from flask import Flask, request

app = Flask(__name__)
@app.route("/foo", methods=["POST"])
def foo():
    data = request.get_json()  # {"request_id": "abc", "data": "some text"}
    result_a = do_task_a(data)  # returns {"result_a": "a"}, maybe about 1 second to finish 
    result_b = do_task_b(data)  # returns {"result_b": "b"}, maybe about 1 second to finish
    result_c = do_task_c(data)  # returns {"result_c": "c"}, maybe about 1 second to finish
    result = {
        "result_a": result_a["result_a"],
        "result_b": result_b["result_b"],
        "result_c": result_c["result_c"]}
    return result

app.run(host='0.0.0.0', port=4000, threaded=False)

Here, do_task_a, do_task_b, do_task_c are completely independent subtasks, I know I can use multiprocessing.Process to create processes to finish these three subtasks, and use join() to wait for subtask done, But I don't know it's proper way to create Process for every request?

Maybe I can use multiprocessing.Queue to help, But I don't find a good way.

I search for multiprocessing, but can't figure out a good solution.

CodePudding user response:

I'm not a python guy, but indeed creating processes is sn expensive operation If its possible - create threads they're cheaper than processes.

If you run the request multiple times - you can do even better than that, because creating threads per request is still quite expensive

Even more advanced setup is to create a "pre-loaded" thread pool. Like N threads that you always keep in memory ready for running arriving task.

In terms of technical solution I've found This article that explains how to create thread pools in python 3.2

CodePudding user response:

Thanks to @mark, I found this way can work, but I'm not sure. Can someone check me out? Thanks

from concurrent.futures import ProcessPoolExecutor
import random
import time
from flask import Flask, request

def do_task_a(data):
    time.sleep(random.random())
    res = {"result_a": data["data"]   10}
    return res

def do_task_b(data):
    time.sleep(random.random())
    res = {"result_b": data["data"]   10}
    return res

class Scheduler:
    def __init__(self, names, funcs, pools):
        self.names = names
        self.funcs = funcs
        self.pools = pools
        self.num_executors = len(funcs)

scheduler = Scheduler(
    names=["do_task_a", "do_task_b"],
    funcs=[do_task_a, do_task_b],
    pools=[ProcessPoolExecutor(1), ProcessPoolExecutor(1)]
)

app = Flask(__name__)

@app.route("/foo", methods=["POST"])
def foo():
    data = request.get_json()  # {"request_id": "abc", "data": "some text"}
    task_results = []
    futures = []
    time.sleep(random.random())
    for i in range(scheduler.num_executors):
        future = scheduler.pools[i].submit(scheduler.funcs[i], data)
        futures.append(future)
    for i in range(scheduler.num_executors):
        task_results.append(futures[i].result())
    result = dict()
    for task_result in task_results:
        result.update(task_result)
    result["request_id"] = data["request_id"]
    return result

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=4000, threaded=False)
  • Related