Home > OS >  Share and manipulate multiple numpy arrays through multiprocessing
Share and manipulate multiple numpy arrays through multiprocessing

Time:06-22

I'm trying to make use of multiprocessing to speed up my array-based calculations. General workflow is as follows:

  • I have three arrays:
    • id_array is holding IDs of an array that belong together
    • class_array is a classified array (just integer representing class values from an image classification)
    • prob_array has the probability for these classes
  • based on the segments I want to:
    • find the class majority within each segment
    • average the probabilities within the segment, but only for the "pixels" that have the class majority

Here is my non-parallel example, which works fine:

import numpy as np

id_array = np.array([[1, 1, 2, 2, 2],
                     [1, 1, 2, 2, 4],
                     [3, 3, 4, 4, 4],
                     [3, 3, 4, 4, 4]])
class_array = np.array([[7, 7, 6, 8, 8],
                        [5, 7, 7, 8, 8],
                        [8, 8, 5, 5, 8],
                        [9, 9, 8, 7, 7]])
prob_array = np.array([[0.7, 0.3, 0.9, 0.5, 0.1],
                       [0.4, 0.6, 0.3, 0.5, 0.9],
                       [0.8, 0.6, 0.2, 0.2, 0.3],
                       [0.4, 0.4, 0.6, 0.3, 0.7]])

all_ids = np.unique(                     )
dst_classes = np.zeros_like(class_array)
dst_probs = np.zeros_like(prob_array)

for my_id in all_ids:
    segment = np.where(id_array == my_id)
    class_data = class_array[segment]
    # get majority of classes within segment
    majority = np.bincount(class_data.flatten()).argmax()
    # get probabilities within segment
    prob_data = prob_array[segment]
    # get probabilities within segment where class equals majority
    majority_probs = prob_data[np.where(class_data == majority)]
    # get median of these probabilities
    median_prob = np.nanmedian(majority_probs)
    # write values
    dst_classes[segment] = majority
    dst_probs[segment] = median_prob

print(dst_classes)
print(dst_probs)

The problem is that my real data have something like 4 million segments and this then takes a week to compute. So I followed this tutorial and came up with this:

import numpy as np
import multiprocessing as mp

WORKER_DICT = dict()
NODATA = 0

def shared_array_from_np_array(data_array, init_value=None):
    raw_array = mp.RawArray(np.ctypeslib.as_ctypes_type(data_array.dtype), data_array.size)
    shared_array = np.frombuffer(raw_array, dtype=data_array.dtype).reshape(data_array.shape)
    if init_value:
        np.copyto(shared_array, np.full_like(data_array, init_value))
        return raw_array, shared_array
    else:
        np.copyto(shared_array, data_array)
        return raw_array, shared_array

def init_worker(id_array, class_array, prob_array, class_results, prob_results):
    WORKER_DICT['id_array'] = id_array
    WORKER_DICT['class_array'] = class_array
    WORKER_DICT['prob_array'] = prob_array
    WORKER_DICT['class_results'] = class_results
    WORKER_DICT['prob_results'] = prob_results
    WORKER_DICT['shape'] = id_array.shape
    mp.freeze_support()

def worker(id):
    id_array = WORKER_DICT['id_array']
    class_array = WORKER_DICT['class_array']
    prob_array = WORKER_DICT['prob_array']
    class_result = WORKER_DICT['class_results']
    prob_result = WORKER_DICT['prob_results']
    # array indices for "id"
    segment = np.where(id_array == id)
    # get data at these indices, mask nodata values
    class_data = np.ma.masked_equal(class_array[segment], NODATA)
    # get majority value
    majority_class = np.bincount(class_data.flatten()).argmax()
    # get probabilities
    probs = prob_array[segment]
    majority_probs = probs[np.where(class_array[segment] == majority_class)]
    med_majority_probs = np.nanmedian(majority_probs)
    class_result[segment] = majority_class
    prob_result[segment] = med_majority_probs
    return

if __name__ == '__main__':
    # segment IDs
    id_ra, id_array = shared_array_from_np_array(np.array(
        [[1, 1, 2, 2, 2],
         [1, 1, 2, 2, 4],
         [3, 3, 4, 4, 4],
         [3, 3, 4, 4, 4]]))
    # classification
    cl_ra, class_array = shared_array_from_np_array(np.array(
        [[7, 7, 6, 8, 8],
         [5, 7, 7, 8, 8],
         [8, 8, 5, 5, 8],
         [9, 9, 8, 7, 7]]))
    # probabilities
    pr_ra, prob_array = shared_array_from_np_array(np.array(
        [[0.7, 0.3, 0.9, 0.5, 0.1],
         [0.4, 0.6, 0.3, 0.5, 0.9],
         [0.8, 0.6, 0.2, 0.2, 0.3],
         [0.4, 0.4, 0.6, 0.3, 0.7]]))
    cl_res, class_results = shared_array_from_np_array(class_array, 0)
    pr_res, prob_results = shared_array_from_np_array(prob_array, 0.)
    unique_ids = np.unique(id_array)
    init_args = (id_ra, cl_ra, pr_ra, cl_res, pr_res, id_array.shape)
    with mp.Pool(processes=2, initializer=init_worker, initargs=init_args) as pool:
        pool.map_async(worker, unique_ids)
    print('Majorities:', cl_res)
    print('Probabilities:', pr_res)

But I do not see how I can now get my results and whether they are correct. I tried

np.frombuffer(cl_res)
np.frombuffer(pr_res)

but that gives me only 10 values for cl_res (there should be 20) and they seem completely random, while pr_res has the exact same values as prob_array.

I have tried making use of other examples around here, like this, but can't get them to work either. That looks like a similar problem, but it already required a lot of knowledge how multiprocessing really works and I don't have that (total beginner with multiprocessing).

CodePudding user response:

Several things to fix:

  • You need to create the numpy arrays in init_worker(), which should also take a shape argument:
def init_worker(id_ra, cl_ra, pr_ra, cl_res, pr_res, shape):
    WORKER_DICT['id_array'] = np.ctypeslib.as_array(id_ra, shape)
    WORKER_DICT['class_array'] = np.ctypeslib.as_array(cl_ra, shape)
    WORKER_DICT['prob_array'] = np.ctypeslib.as_array(pr_ra, shape)
    WORKER_DICT['class_results'] = np.ctypeslib.as_array(cl_res, shape)
    WORKER_DICT['prob_results'] = np.ctypeslib.as_array(pr_res, shape)
  • You should check if init_value is not None instead of just init_value in shared_array_from_np_array(), as 0 evaluates to False.
  • mp.freeze_support() should only be called immediately after if __name__ == '__main__', as per its docs.
  • pool.map_async() returns an AsyncResult object that needs to be waited on; you probably want pool.map(), which blocks until the processing is done.
  • Related