Home > Software design >  How to share a dictionary in python multiprocessing map_async function?
How to share a dictionary in python multiprocessing map_async function?

Time:12-31

I have one list lst in python. I want to call one function f on each item of this list. This function f calls a third-party function g. I also want to measure the time taken by each function call to g by each item in the list lst. I want to speed up the process so I use a multiprocessing pool to parallelize the execution. Currently, I have the following code, but it does not work. I have learned from this post that map_async can only call unary functions. I also want to take advantage of creating multiple processes in map_async and so I don't want to switch to apply_async. Can someone suggest me what is the better alternative here to achieve my goal?

My current solution that does not work:


import multiprocessing as mp

time_metrics = {}

def f(idx):
    global time_metrics
    
    a = time.now()
    g(idx)
    b = time.now()
    time_metrics[idx] = b-a

lst = [1, 2, 3, 4, 5, 6]
pool = mp.Pool(7)
pool.map_async(f, lst)
pool.close()
pool.join()

print(time_metrics)

CodePudding user response:

Multiprocessing doesn't share memory space, it uses process 'forks' to make a clone of the current process state (or only some of it, depending on the type of fork/spawn used and operating system) to a new location in the RAM and assigned to a new process ID which then runs independently. If you want to use regions of shared memory, the task becomes more complicated and I've found shared memory to be slower in some of my old projects than using queues to pass data back to the parent process and store into a dict.

For this task though it doesn't look to me like you need to do any of that, you can just return the time value and then after the pool has finished executing (in sync mode, not async, so that the process pool blocks until all processes are finished with the tasks) you can iterate through and collect the results.

So here is probably the easiest solution:

from datetime import datetime
import multiprocessing as mp

time_metrics = {}

def g(a):
    # placeholder function for whatever you have as g()
    for i in range(5000*a):
        pass


def f(idx):
    # once spawned, a process calling this function cannot edit objects in the memory of the parent process, 
    # unless using the special shared memory objects in the mp class.
    a = datetime.utcnow()
    g(idx)
    b = datetime.utcnow()
    return (idx, b - a)


if __name__ == "__main__":
    lst = [1, 2, 3, 4, 5, 6]
    # don't assign 1 process for each job, use only number of cores your machine has, as rarely any benefit of using more, especially for benchmarking.
    with mp.Pool() as pool:
        # blocks until result is available
        results = pool.map(f, lst)

    for row in results:
        time_metrics[row[0]] = row[1]
    
    print(time_metrics)
    

If you're interested, this could be refactored to use a shared memory dictionary from the multiprocessing library or an instance of mp.Queue to pass the results back to the parent process to collect but it's not necessary for this problem so far as I can see.

Is there any reason why you really need to use the async version of the pool, or is this method sufficient?

If you really want to use map_async, this snippet works:


from datetime import datetime
import multiprocessing as mp

time_metrics = {}

def g(a):
    for i in range(5000*a):
        pass


def f(idx):
    a = datetime.utcnow()
    g(idx)
    b = datetime.utcnow()
    return (idx, b - a)


def append_res(result: tuple):
    for row in result:
        time_metrics[row[0]] = row[1]


if __name__ == "__main__":
    lst = [1, 2, 3, 4, 5, 6]
    # don't assign 1 process for each job, use only number of cores your machine has, as rarely any benefit of using more, especially for benchmarking.
    with mp.Pool() as pool:
        # doesn't block until result is available.
        # callback is applied to list of results when all the tasks are complete
        results = pool.map_async(f, lst, callback = append_res)
        # wait for result to become available, otherwise parent process will exit the context manager and processes will not complete
        results.wait()
    
    print(time_metrics)

I'm not 100% sure on the behaviour of .map_async() compared to .map(), .map() will apply the function to the iterable in order and won't start a new task until the previous task allocated to the process is finished. This makes it useful of benchmarking as you require as long as each CPU cores on the machine are not juggling lots more python processes than there are cores as this just increases overhead and load, and will give you inaccurate benchmarks. With map_async, generally with async functions the order in which individual results become available may not be the order in which they're allocated, which implies to me that all the tasks are allocated to the process pool at the same time which could produce competition for the CPU resources between tasks and may produce inaccurate benchmarks, though I could do with someone to provide confirmation on this in the comments.

  • Related