I am building a API with FastAPI served via uvicorn. The API has endpoints that make use of python multiprocessing lib.
An endpoint spawns several Processes for a CPU bound tasks to perform them in parrallel. Here is a high level code logic overview:
import multiprocessing as mp
class Compute:
def single_compute(self, single_comp_data):
# Computational Task CPU BOUND
global queue
queue.put(self.compute(single_comp_data))
def multi_compute(self, task_ids):
# Prepare for Compuation
output = {}
processes = []
global queue
queue = mp.Queue()
# Start Test Objs Computation
for tid in task_ids:
# Load task data here, to make use of object in memory cache
single_comp_data = self.load_data_from_cache(tid)
p = mp.Process(target=self.single_compute, args=single_comp_data)
p.start()
processes.append(p)
# Collect Parallel Computation
for p in processes:
result = queue.get()
output[result["tid"]]= result
p.join()
return output
Here is the simple API code:
from fastapi import FastAPI, Response
import json
app = FastAPI()
#comp holds an in memory cache, thats why its created in global scope
comp = Compute()
@app.get("/compute")
def compute(task_ids):
result = comp.multi_compute(task_ids)
return Response(content=json.dumps(result, default=str), media_type="application/json")
When run with multiple workers like this:
uvicorn compute_api:app --host 0.0.0.0 --port 7000 --workers 2
I am getting this python error
TypeError: can't pickle _thread.lock objects
With only 1 worker process it is fine. The program runs on UNIX/LINUX OS.
Could someone explain to me why the forking of a new process is not possible with multiple uvicorn processes here and why I am running into this tread lock?
In the end what should be achieved is simple:
uvicorn process that spawns multiple other processes (child processes via fork) with memory copy of that uvicorn process. To perform cpu bound task.
CodePudding user response:
TypeError: can't pickle _thread.lock objects
stems from whatever data you're passing into your subprocess in
p = mp.Process(target=self.single_compute, args=single_comp_data)
containing an unpickleable object.
All args/kwargs sent to a multiprocessing
subprocess (be it via Process, or the higher-level methods in Pool
) must be pickleable, and similarly the return value of the function run must be pickleable so it can be sent back to the parent process.
If you're on UNIX and using the fork
start method for multiprocessing (which is the default on Linux, but not on macOS), you can also take advantage of copy-on-write memory semantics to avoid the copy "down" to the child processes by making the data available, e.g. via instance state, a global variable, ..., before spawning the subprocess, and having it fetch it by reference, instead of passing the data itself down as an argument.
This example is using imap_unordered
for performance (the assumption being there's no need to process the ids in order), and would return a dict mapping an input ID to the result it creates.
class Compute:
_cache = {} # could be an instance variable too but whatever
def get_data(self, id):
if id not in self._cache:
self._cache[id] = get_data_from_somewhere(id)
return self._cache[id]
def compute_item(self, id):
data = self.get_data(id)
result = 42 # ... do heavy computation here ...
return (id, result)
def compute_result(self, ids) -> dict:
for id in ids:
self.get_data(id) # populate in parent process
with multiprocessing.Pool() as p:
return dict(p.imap_unordered(self.compute_item, ids))