Home > OS >  How to share a list of multidimensional arrays between python processes?
How to share a list of multidimensional arrays between python processes?

Time:05-10

I am trying to speed up my code by splitting the job among several python processes. In the single-threaded version of the code, I am looping through a code that accumulates the result in several matrices of different dimensions. Since there's no data sharing between each iteration, I can divide the task among several processes, each one having its own local set of matrices to accumulate the result. When all the processes are done, I combine the matrices of all the processes.

My idea of solving the issue is to pass a list of the same matrices to each process such that each process writes to this matrix when it's done. My question is, how do I pass this list of numpy array matrices to the processes? This seems like a straightforward thing to do except that it seems I can only pass a 1D array to the processes. Although a temporary solution would be to flatten all the numpy arrays and keep track of where each one begins and ends, is there a way where I simply pass a list of the matrices to the processes?

CodePudding user response:

For 1D arrays, there are previous answers show how to do that with shared memory. This post for example. For multidimensional arrays, a similar approach can be used since reshaping an array does not copy its content. You just need the shape (and possibly the strides) of the array to reshape them and operate on the reshaped array. Thus, you need to send the buffer and the shape to the processes so then you can convert the buffer back to a multidimensional Numpy array.

CodePudding user response:

Here is a solution that does not require Python version >= 3.8 and just uses multiprocessing.Array. The idea is to use such a shared array as the backing store for a numpy array.

In this example we have each process in the pool initialize a global variable np_array and then we do not have to explicitly pass the shared array to each worker function. This avoids the worker functions from having to concern themselves with re-creating a numpy array from the shared array. Moreover, this re-creation only has to be done N times where N is the pool size rather than M times where M is the number of tasks submitted to the pool. If you find global variables an anathema, then the alternative is to explicitly pass the shared array as an argument to each worker function and have it re-create the numpy array from it.

import numpy as np
from multiprocessing import Array, Pool

def np_array_from_shared_array(shared_array, shape, is_locked_array=True):
    shared_array_obj = shared_array.get_obj() if is_locked_array else shared_array
    return np.frombuffer(shared_array_obj, dtype=np.float64).reshape(shape[0], shape[1])

def init_pool_processes(shared_array, shape, is_locked_array):
    """
    Init each pool process.
    The numpy array is created from the passed shared array and a global
    variable is initialized with a reference to it.
    """
    global np_array
    np_array = np_array_from_shared_array(shared_array, shape, is_locked_array)

def change_array(i, j):
    np_array[i, j]  = 100

if __name__ == '__main__':
    data = np.array([[1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7], [7.7, 6.6, 5.5, 4.4, 3.3, 2.2, 1.1]])
    shape = data.shape
    # Specify lock=True if multiple processs will be updating the same
    # array element.
    # Each task will specify a unique element, so no locking is required:
    NEEDS_LOCKING = False
    shared_array = Array('d', shape[0] * shape[1], lock=NEEDS_LOCKING)
    # Wrap np_array as an numpy array so we can easily manipulates its data.
    np_array = np_array_from_shared_array(shared_array, shape, NEEDS_LOCKING)
    # Copy data to our shared array.
    np.copyto(np_array, data)

    # Before
    print(np_array)

    # Init each process in the pool with shared_array:
    pool = Pool(initializer=init_pool_processes, initargs=(shared_array, shape, NEEDS_LOCKING))
    result = pool.starmap(change_array, ((i, j) for i in range(shape[0]) for j in range(shape[1])))
    pool.close()
    pool.join()

    # After:
    print(np_array)

Prints:

[[1.1 2.2 3.3 4.4 5.5 6.6 7.7]
 [7.7 6.6 5.5 4.4 3.3 2.2 1.1]]
[[101.1 102.2 103.3 104.4 105.5 106.6 107.7]
 [107.7 106.6 105.5 104.4 103.3 102.2 101.1]]
  • Related