I'm trying to make use of multiprocessing to speed up my array-based calculations. General workflow is as follows:
- I have three arrays:
id_array
is holding IDs of an array that belong togetherclass_array
is a classified array (just integer representing class values from an image classification)prob_array
has the probability for these classes
- based on the segments I want to:
- find the class majority within each segment
- average the probabilities within the segment, but only for the "pixels" that have the class majority
Here is my non-parallel example, which works fine:
import numpy as np
id_array = np.array([[1, 1, 2, 2, 2],
[1, 1, 2, 2, 4],
[3, 3, 4, 4, 4],
[3, 3, 4, 4, 4]])
class_array = np.array([[7, 7, 6, 8, 8],
[5, 7, 7, 8, 8],
[8, 8, 5, 5, 8],
[9, 9, 8, 7, 7]])
prob_array = np.array([[0.7, 0.3, 0.9, 0.5, 0.1],
[0.4, 0.6, 0.3, 0.5, 0.9],
[0.8, 0.6, 0.2, 0.2, 0.3],
[0.4, 0.4, 0.6, 0.3, 0.7]])
all_ids = np.unique( )
dst_classes = np.zeros_like(class_array)
dst_probs = np.zeros_like(prob_array)
for my_id in all_ids:
segment = np.where(id_array == my_id)
class_data = class_array[segment]
# get majority of classes within segment
majority = np.bincount(class_data.flatten()).argmax()
# get probabilities within segment
prob_data = prob_array[segment]
# get probabilities within segment where class equals majority
majority_probs = prob_data[np.where(class_data == majority)]
# get median of these probabilities
median_prob = np.nanmedian(majority_probs)
# write values
dst_classes[segment] = majority
dst_probs[segment] = median_prob
print(dst_classes)
print(dst_probs)
The problem is that my real data have something like 4 million segments and this then takes a week to compute. So I followed this tutorial and came up with this:
import numpy as np
import multiprocessing as mp
WORKER_DICT = dict()
NODATA = 0
def shared_array_from_np_array(data_array, init_value=None):
raw_array = mp.RawArray(np.ctypeslib.as_ctypes_type(data_array.dtype), data_array.size)
shared_array = np.frombuffer(raw_array, dtype=data_array.dtype).reshape(data_array.shape)
if init_value:
np.copyto(shared_array, np.full_like(data_array, init_value))
return raw_array, shared_array
else:
np.copyto(shared_array, data_array)
return raw_array, shared_array
def init_worker(id_array, class_array, prob_array, class_results, prob_results):
WORKER_DICT['id_array'] = id_array
WORKER_DICT['class_array'] = class_array
WORKER_DICT['prob_array'] = prob_array
WORKER_DICT['class_results'] = class_results
WORKER_DICT['prob_results'] = prob_results
WORKER_DICT['shape'] = id_array.shape
mp.freeze_support()
def worker(id):
id_array = WORKER_DICT['id_array']
class_array = WORKER_DICT['class_array']
prob_array = WORKER_DICT['prob_array']
class_result = WORKER_DICT['class_results']
prob_result = WORKER_DICT['prob_results']
# array indices for "id"
segment = np.where(id_array == id)
# get data at these indices, mask nodata values
class_data = np.ma.masked_equal(class_array[segment], NODATA)
# get majority value
majority_class = np.bincount(class_data.flatten()).argmax()
# get probabilities
probs = prob_array[segment]
majority_probs = probs[np.where(class_array[segment] == majority_class)]
med_majority_probs = np.nanmedian(majority_probs)
class_result[segment] = majority_class
prob_result[segment] = med_majority_probs
return
if __name__ == '__main__':
# segment IDs
id_ra, id_array = shared_array_from_np_array(np.array(
[[1, 1, 2, 2, 2],
[1, 1, 2, 2, 4],
[3, 3, 4, 4, 4],
[3, 3, 4, 4, 4]]))
# classification
cl_ra, class_array = shared_array_from_np_array(np.array(
[[7, 7, 6, 8, 8],
[5, 7, 7, 8, 8],
[8, 8, 5, 5, 8],
[9, 9, 8, 7, 7]]))
# probabilities
pr_ra, prob_array = shared_array_from_np_array(np.array(
[[0.7, 0.3, 0.9, 0.5, 0.1],
[0.4, 0.6, 0.3, 0.5, 0.9],
[0.8, 0.6, 0.2, 0.2, 0.3],
[0.4, 0.4, 0.6, 0.3, 0.7]]))
cl_res, class_results = shared_array_from_np_array(class_array, 0)
pr_res, prob_results = shared_array_from_np_array(prob_array, 0.)
unique_ids = np.unique(id_array)
init_args = (id_ra, cl_ra, pr_ra, cl_res, pr_res, id_array.shape)
with mp.Pool(processes=2, initializer=init_worker, initargs=init_args) as pool:
pool.map_async(worker, unique_ids)
print('Majorities:', cl_res)
print('Probabilities:', pr_res)
But I do not see how I can now get my results and whether they are correct. I tried
np.frombuffer(cl_res)
np.frombuffer(pr_res)
but that gives me only 10 values for cl_res
(there should be 20) and they seem completely random, while pr_res
has the exact same values as prob_array
.
I have tried making use of other examples around here, like this, but can't get them to work either. That looks like a similar problem, but it already required a lot of knowledge how multiprocessing really works and I don't have that (total beginner with multiprocessing).
CodePudding user response:
Several things to fix:
- You need to create the numpy arrays in
init_worker()
, which should also take ashape
argument:
def init_worker(id_ra, cl_ra, pr_ra, cl_res, pr_res, shape):
WORKER_DICT['id_array'] = np.ctypeslib.as_array(id_ra, shape)
WORKER_DICT['class_array'] = np.ctypeslib.as_array(cl_ra, shape)
WORKER_DICT['prob_array'] = np.ctypeslib.as_array(pr_ra, shape)
WORKER_DICT['class_results'] = np.ctypeslib.as_array(cl_res, shape)
WORKER_DICT['prob_results'] = np.ctypeslib.as_array(pr_res, shape)
- You should check if
init_value is not None
instead of justinit_value
inshared_array_from_np_array()
, as 0 evaluates to False. mp.freeze_support()
should only be called immediately afterif __name__ == '__main__'
, as per its docs.pool.map_async()
returns an AsyncResult object that needs to be waited on; you probably wantpool.map()
, which blocks until the processing is done.