Home > Software engineering >  Multiprocessing in Python: Pool and Process with shared array
Multiprocessing in Python: Pool and Process with shared array

Time:07-25

After browsing through many discussions on the same/similar topics, I still can't solve my problem, hence I would like to post it below.

The following is a MWE for what I would like to parallelize, which is to solve a set of independent linear equations (nI mat)x=y parametrized by n=0,1,2 with fixed arrays mat and y. Note that the arrays are declared to be global with the hope that they can be accessed by different processes/pools (see below). But I don't think it works and this is the core of the question: How to share big numpy arrays for different processes/pools to avoid communication overhead?

import numpy as np 
import time 
import os


N = 2000
num = 3 
global mat 
mat = np.random.rand(N, N)
global y 
y = np.random.rand(N,1)

# Functions to be parallelized num of times
def fun(n): 
    print(f"{n}-th job is run on child {os.getpid()} of parent {os.getppid()}")
    newmat = n * np.eye(N)   mat
    
    return np.linalg.solve(newmat, y)

# Approach 1: no parallel
def main():
    start_time = time.time()
    res = []
    for i in range(num):
        res.append(fun(i))
    print(f"Approach 1: Time elapsed = {time.time()-start_time} sec")
    return res
main()

I tried the following three approaches to parallelize it: Pool, Process and Process with Array and numpy.frombuffer. See below.

from multiprocessing import Process, set_start_method, Queue, Pool, cpu_count, Array, RawArray
set_start_method('fork') 

# Approach 2: with pool 
def main2():
    start_time = time.time()
    pool = Pool(cpu_count())
    res = pool.map(fun, range(num))
    print(f"Approach 2: Time elapsed = {time.time()-start_time} sec")
    pool.close()
    return res    
main2()

# Approach 3: with process
def fun2(i, output):
    output.put(fun(i))

def main3():
    start_time = time.time()
    output = Queue()
    processes = [Process(target=fun2, args=(i, output)) for i in range(num)]
    # Run processes
    for p in processes:
        p.start()

    # Exit the completed processes
    for p in processes:
        p.join()            

    res = [output.get() for _ in processes]
    
    print(f"Approach 3: Time elapsed = {time.time()-start_time} sec")
    
    return res       
main3()

# Approach 4: with process with Array, numpy.frombuffer, 
def fun3(n, output, mat, y):
    print(f"{n}-th job is run on child {os.getpid()} of parent {os.getppid()}")
    mat2 = np.frombuffer(mat.get_obj())
    newmat = n * np.eye(N)   mat2.reshape((N, N))
    output.put(np.linalg.solve(newmat, y))

def main4():
    mat2 = Array('d', mat.flatten())
    y2 = Array('d', y)
    start_time = time.time()
    output = Queue()
    processes = [Process(target=fun3, args=(i, output, mat2, y2)) for i in range(num)]

    # Run processes
    for p in processes:
        p.start()

    # Exit the completed processes
    for p in processes:
        p.join()            

    res = [output.get() for _ in processes]
    
    print(f"Approach 4: Time elapsed = {time.time()-start_time} sec")
    
    return res
main4()

Neither of these approaches works and I got

0-th job is run on child 8818 of parent 3421
1-th job is run on child 8818 of parent 3421
2-th job is run on child 8818 of parent 3421
Approach 1: Time elapsed = 0.2891273498535156 sec
0-th job is run on child 8819 of parent 8818
1-th job is run on child 8820 of parent 8818
2-th job is run on child 8821 of parent 8818
Approach 2: Time elapsed = 3.6278929710388184 sec
0-th job is run on child 8832 of parent 8818
1-th job is run on child 8833 of parent 8818
2-th job is run on child 8834 of parent 8818
Approach 3: Time elapsed = 4.243804931640625 sec
0-th job is run on child 8843 of parent 8818
1-th job is run on child 8844 of parent 8818
2-th job is run on child 8845 of parent 8818
Approach 4: Time elapsed = 4.745251893997192 sec

This summarizes all the approaches I have seen so far. I am aware of that there is a SharedMemory in Multiprocessing, which it is not available to python 3.7.2. If that could solve the problem, I would be very happy to see how it works.

Really thanks for anyone to read through the whole post, and any helps are appreciated. And in case it is important, I am using a Mac with Apple M1 chip, macOS Monterey.

Update 1: per @AKX's point, I removed the print(n-th job) line, and make N=10000, and the results are

Approach 1: Time elapsed = 23.812573194503784 sec
Approach 2: Time elapsed = 126.91087889671326 sec

for Approach 3, it has taken for around 5 minutes which I have to cut it off. So the time overhead is pretty large for large N.

CodePudding user response:

np.linalg.solve should already be executed in parallel function implemented in LAPACK. In fact, this is the case on my (Linux Windows) machine. Indeed, it calls LAPACK functions like dtrsm and dlaswp and the main computational function, dgemm, implemented in BLAS libraries. This last function should take >90% of the time and is heavily optimized and parallelized as long as you use a fast BLAS implementation. Numpy use OpenBLAS by default on most systems which is very good (and parallel). The Intel MKL is also a good alternative supporting LAPACK (certainly better on Intel hardware). If the computation is not parallel on your machine, this is a problem and you should check your BLAS implementation as it may be very inefficient.

The thing is parallelizing a code already parallel make it slower because running more threads than available core put a lot of pressure on the OS scheduler and the BLAS functions are not optimized for such a case. If you really want to parallelize the function you need to configure the BLAS so to use 1 thread but this is likely be less efficient than letting the BLAS does the parallelization (mainly because Python is not great for writting fast parallel applications due to pickling and the global interpreter lock). This is one reason your parallel approaches are (much) slower than the first sequential approach.

CodePudding user response:

To answer this question, we need to talk about how multiprocessing works.

On UNIX-like platforms, multiprocessing uses the fork system call. This creates an almost perfect copy of the parent process, copying mat and y for you. In this case, sharing them doesn't make much sense because modern UNIX-like operating systems tend to use copy-on-write for memory pages where possible.

On platforms like macOS and ms-windows, it starts a new Python instance and imports your code into it by default. Although you can use fork on macOS. So there it wil re-create mat and y in every instance.

If you want to share data in this case, multiprocessing has several mechanisms for that, like Array and Manager. The latter should be able to share numpy arrays with some glue. And you have to keep in mind that there is some overhead associated with using them; their use case is geared toward modification of shared data, so it has mechanisms to deal with that. Which you don't need in your case.

Since you are using fork, I don't think that sharing the data will be much faster. And if the data is larger than a page of memory the operating system should use copy-on-write sharing, so it would not save you memory either.

As an alternative, you could write mat and y to a file in the parent process, and read them in the worker processes.

Or you could use a read-only mmap. But in that case you would still have to convert it to a numpy array.

  • Related