Context:
I have 3 3D arrays ("precursor
arrays") that I am upsampling with an Inverse Distance Weighting method. To do that, I calculate a 3D weights
array that I use in a for loop on each point of my precursor
arrays.
Each 2D slice of my weights
array is used to calculate a partial
array. Once I generate all 28 of them, they are summed to give one final host
array.
I would like to parallelize this for loop in order to reduce my computing time. I tried doing it but I can not manage to update correctly my host
arrays.
Question:
How could I parallelize my main function (last section of my code) ?
EDIT: Or is there a way I could "slice" my i
for loop (for example one core running between i = 0 to 5, and one core running on i = 6 to 9) ?
Summary:
- 3
precursor
arrays (temperatures, precipitations, snow): 10x4x7 (10
is a time dimension) - 1
weight
array (w): 28x1101x2101 - 28x3
partial
arrays: 1101x2101 - 3
host
arrays (temp, prec, Eprec): 1101x2101
Here is my code (runable as it is aside from the MAIN ALGORITHM PARALLEL
section, please see the MAIN ALGORITHM NOT PARALLEL
section at the end for the non-parallelized version of my code):
import numpy as np
import multiprocessing as mp
import time
#%% ------ Create data ------ ###
temperatures = np.random.rand(10,4,7)*100
precipitation = np.random.rand(10,4,7)
snow = np.random.rand(10,4,7)
# Array of altitudes to "adjust" the temperatures
alt = np.random.rand(4,7)*1000
#%% ------ Functions to run in parallel ------ ###
# This function upsamples the precursor arrays and creates the partial arrays
def interpolator(i, k, mx, my):
T = ((temperatures[i,mx,my]-272.15) (-alt[mx, my] * -6/1000)) * w[k,:,:]
P = (precipitation[i,mx,my])*w[k,:,:]
S = (snow[i,mx,my])*w[k,:,:]
return(T, P, S)
# We add each partial array to each other to create the host array
def get_results(results):
global temp, prec, Eprec
temp = results[0]
prec = results[1]
Eprec = results[2]
#%% ------ IDW Interpolation ------ ###
# We create a weight matrix that we use to upsample our temperatures, precipitations and snow matrices
# This part is not that important, it works well as it is
MX,MY = np.shape(temperatures[0])
N = 300
T = np.zeros([N*MX 1, N*MY 1])
# create NxM inverse distance weight matrices based on Gaussian interpolation
x = np.arange(0,N*MX 1)
y = np.arange(0,N*MY 1)
X,Y = np.meshgrid(x,y)
k = 0
w = np.zeros([MX*MY,N*MX 1,N*MY 1])
for mx in range(MX):
for my in range(MY):
# Gaussian
add_point = np.exp(-((mx*N-X.T)**2 (my*N-Y.T)**2)/N**2)
w[k,:,:] = add_point
k = 1
sum_weights = np.sum(w, axis=0)
for k in range(MX*MY):
w[k,:,:] /= sum_weights
#%% ------ MAIN ALGORITHM PARALLEL ------ ###
if __name__ == '__main__':
# Create an empty array to use as a template
dummy = np.zeros((w.shape[1], w.shape[2]))
# Start a timer
ts = time.time()
# Iterate over the time dimension
for i in range(temperatures.shape[0]):
# Initialize the host arrays
temp = dummy.copy()
prec = dummy.copy()
Eprec = dummy.copy()
# Create the pool based on my amount of cores
pool = mp.Pool(mp.cpu_count())
# Loop through every weight slice, for every cell of the temperatures, precipitations and snow arrays
for k in range(0,w.shape[0]):
for mx in range(MX):
for my in range(MY):
# Upsample the temperatures, precipitations and snow arrays by adding the contribution of each weight slice
pool.apply_async(interpolator, args = (i, k, mx, my), callback = get_results)
pool.close()
pool.join()
# Print the time spent on the loop
print("Time spent: ", time.time()-ts)
#%% ------ MAIN ALGORITHM NOT PARALLEL ------ ###
if __name__ == '__main__':
# Create an empty array to use as a template
dummy = np.zeros((w.shape[1], w.shape[2]))
ts = time.time()
for i in range(temperatures.shape[0]):
# Create empty host arrays
temp = dummy.copy()
prec = dummy.copy()
Eprec = dummy.copy()
k = 0
for mx in range(MX):
for my in range(MY):
get_results(interpolator(i, k, mx, my))
k = 1
print("Time spent:", time.time()-ts)
CodePudding user response:
The problem with multiprocessing is that it creates many new processes taht execute the code before the main (ie. before if __name__ == '__main__'
). This causes a very slow initialization (since all process does it) and a huge amount of RAM being used for nothing. You certainly should move everything in the main or if possible in functions (which generally results in a faster execution and is a good software engineering practice anyway, especially for parallel codes). Even with this, there is another huge problem with multiprocessing: inter-process communication is slow. One solution is to use a multi-threaded approach made possible by using Numba or Cython (you can disable the GIL with them as opposed to basic CPython threads). In fact, they are often simpler to use than multiprocessing. However, you should be more careful though since parallel access are unprotected and data-races can appear in bogus parallel codes.
In your case, the computation is mostly memory-bound. This means multiprocessing is pretty useless. In fact, parallelism is barely useful here unless you are running this code on a computing server with a high-throughput. Indeed, the memory is a shared resource and using more computing core does not help much since 1 core can almost saturate the memory bandwidth on a regular PC (while few cores are needed on computing servers).
The key to speed up memory-bound codes is to avoid creating temporary arrays and use cache-friendly algorithms. In your case, T
, P
and S
are filled just to be read later so to update the temp
, prec
and Eprec
arrays. This temporary step is pretty expensive and necessary here (especially filling the arrays). Removing this will increase the arithmetic intensity resulting in a code that will certainly be faster in sequential and that can better scale on multiple cores. This is the case on my machine.
Here is an example of code using Numba so to parallelize the code:
import numba as nb
# get_results interpolator
@nb.njit('void(float64[:,::1], float64[:,::1], float64[:,::1], float64[:,:,::1], int_, int_, int_, int_)', parallel=True)
def interpolate_and_get_results(temp, prec, Eprec, w, i, k, mx, my):
factor1 = ((temperatures[i,mx,my]-272.15) (-alt[mx, my] * -6/1000))
factor2 = precipitation[i,mx,my]
factor3 = snow[i,mx,my]
for i in nb.prange(w.shape[1]):
for j in range(w.shape[2]):
val = w[k, i, j]
temp[i, j] = factor1 * val
prec[i, j] = factor2 * val
Eprec[i, j] = factor3 * val
# Example of usage:
interpolate_and_get_results(temp, prec, Eprec, w, i, k, mx, my)
Note the string in nb.njit
is called a signature and specify the type to the JIT so it can compile it eagerly.
This code is 4.6 times faster on my 6-core machine (while it was barely faster without the merge of get_results
and interpolator
). In fact, it is 3.8 times faster in sequential so threads does not help much since the computation is still memory-bound. Indeed, the cost of the multiply-add is negligible compared to the memory reads/writes.