Home > Enterprise >  Using Multiprocessing in Uvicorn with multiple workers (thread lock)
Using Multiprocessing in Uvicorn with multiple workers (thread lock)

Time:09-04

I am building a API with FastAPI served via uvicorn. The API has endpoints that make use of python multiprocessing lib.

An endpoint spawns several Processes for a CPU bound tasks to perform them in parrallel. Here is a high level code logic overview:

import multiprocessing as mp

class Compute:
    
    def single_compute(self, single_comp_data):
        # Computational Task CPU BOUND
        global queue
        queue.put(self.compute(single_comp_data))

    def multi_compute(self, task_ids):
        # Prepare for Compuation
        output = {}
        processes = []
        global queue
        queue = mp.Queue()
        
        # Start Test Objs Computation
        for tid in task_ids:
            # Load  task data here, to make use of object in memory cache
            single_comp_data = self.load_data_from_cache(tid)
            p = mp.Process(target=self.single_compute, args=single_comp_data)
            p.start()
            processes.append(p)

        # Collect Parallel Computation
        for p in processes:
            result = queue.get()
            output[result["tid"]]= result
            p.join()

        return output

Here is the simple API code:

from fastapi import FastAPI, Response
import json


app = FastAPI()
#comp holds an in memory cache, thats why its created in global scope
comp = Compute()

@app.get("/compute")
def compute(task_ids):
    result = comp.multi_compute(task_ids)
    return Response(content=json.dumps(result, default=str), media_type="application/json")

When run with multiple workers like this:

uvicorn compute_api:app --host 0.0.0.0 --port 7000 --workers 2

I am getting this python error

TypeError: can't pickle _thread.lock objects

With only 1 worker process it is fine. The program runs on UNIX/LINUX OS.

Could someone explain to me why the forking of a new process is not possible with multiple uvicorn processes here and why I am running into this tread lock?

In the end what should be achieved is simple:

uvicorn process that spawns multiple other processes (child processes via fork) with memory copy of that uvicorn process. To perform cpu bound task.

CodePudding user response:

TypeError: can't pickle _thread.lock objects

stems from whatever data you're passing into your subprocess in

p = mp.Process(target=self.single_compute, args=single_comp_data)

containing an unpickleable object.

All args/kwargs sent to a multiprocessing subprocess (be it via Process, or the higher-level methods in Pool) must be pickleable, and similarly the return value of the function run must be pickleable so it can be sent back to the parent process.

If you're on UNIX and using the fork start method for multiprocessing (which is the default on Linux, but not on macOS), you can also take advantage of copy-on-write memory semantics to avoid the copy "down" to the child processes by making the data available, e.g. via instance state, a global variable, ..., before spawning the subprocess, and having it fetch it by reference, instead of passing the data itself down as an argument.

This example is using imap_unordered for performance (the assumption being there's no need to process the ids in order), and would return a dict mapping an input ID to the result it creates.

class Compute:
    _cache = {}  # could be an instance variable too but whatever

    def get_data(self, id):
        if id not in self._cache:
            self._cache[id] = get_data_from_somewhere(id)
        return self._cache[id]

    def compute_item(self, id):
        data = self.get_data(id)
        result = 42  # ... do heavy computation here ...
        return (id, result)

    def compute_result(self, ids) -> dict:
        for id in ids:
             self.get_data(id)  # populate in parent process
        with multiprocessing.Pool() as p:
             return dict(p.imap_unordered(self.compute_item, ids))
  • Related