Home > Enterprise >  Parallelize three nested loops
Parallelize three nested loops

Time:05-27

Context:

I have 3 3D arrays ("precursor arrays") that I am upsampling with an Inverse Distance Weighting method. To do that, I calculate a 3D weights array that I use in a for loop on each point of my precursor arrays.

Each 2D slice of my weights array is used to calculate a partial array. Once I generate all 28 of them, they are summed to give one final host array.

I would like to parallelize this for loop in order to reduce my computing time. I tried doing it but I can not manage to update correctly my host arrays.

Question:

How could I parallelize my main function (last section of my code) ?

EDIT: Or is there a way I could "slice" my i for loop (for example one core running between i = 0 to 5, and one core running on i = 6 to 9) ?

Summary:

  • 3 precursor arrays (temperatures, precipitations, snow): 10x4x7 (10 is a time dimension)
  • 1 weight array (w): 28x1101x2101
  • 28x3 partial arrays: 1101x2101
  • 3 host arrays (temp, prec, Eprec): 1101x2101

Here is my code (runable as it is aside from the MAIN ALGORITHM PARALLEL section, please see the MAIN ALGORITHM NOT PARALLEL section at the end for the non-parallelized version of my code):

import numpy as np
import multiprocessing as mp
import time



#%% ------ Create data ------ ###
temperatures = np.random.rand(10,4,7)*100
precipitation = np.random.rand(10,4,7)
snow = np.random.rand(10,4,7)

# Array of altitudes to "adjust" the temperatures
alt = np.random.rand(4,7)*1000



#%% ------ Functions to run in parallel ------ ###

# This function upsamples the precursor arrays and creates the partial arrays 
def interpolator(i, k, mx, my):
    
    T = ((temperatures[i,mx,my]-272.15)   (-alt[mx, my] * -6/1000)) * w[k,:,:]
    P = (precipitation[i,mx,my])*w[k,:,:]
    S = (snow[i,mx,my])*w[k,:,:]
    
    return(T, P, S)

# We add each partial array to each other to create the host array
def get_results(results):
    global temp, prec, Eprec
    temp  = results[0]
    prec  = results[1]
    Eprec  = results[2]



#%% ------ IDW Interpolation ------ ###

# We create a weight matrix that we use to upsample our temperatures, precipitations and snow matrices
# This part is not that important, it works well as it is

MX,MY = np.shape(temperatures[0])
N = 300

T = np.zeros([N*MX 1, N*MY 1])

# create NxM inverse distance weight matrices based on Gaussian interpolation

x = np.arange(0,N*MX 1)
y = np.arange(0,N*MY 1)
X,Y = np.meshgrid(x,y)

k = 0
w =  np.zeros([MX*MY,N*MX 1,N*MY 1])
for mx in range(MX):
    for my in range(MY):
        
        # Gaussian
        add_point = np.exp(-((mx*N-X.T)**2 (my*N-Y.T)**2)/N**2)
        w[k,:,:]  = add_point
        k  = 1
        
sum_weights = np.sum(w, axis=0)
for k in range(MX*MY):
    w[k,:,:] /= sum_weights
    
    

#%% ------ MAIN ALGORITHM PARALLEL ------ ###

if __name__ == '__main__':
    
    # Create an empty array to use as a template
    dummy = np.zeros((w.shape[1], w.shape[2]))
    
    # Start  a timer
    ts = time.time()
    
    # Iterate over the time dimension
    for i in range(temperatures.shape[0]):
        
        # Initialize the host arrays
        temp = dummy.copy()
        prec = dummy.copy()
        Eprec = dummy.copy()
    
        # Create the pool based on my amount of cores
        pool = mp.Pool(mp.cpu_count())
        
        # Loop through every weight slice, for every cell of the temperatures, precipitations and snow arrays
        for k in range(0,w.shape[0]):
            for mx in range(MX):
                for my in range(MY):
                    
                    # Upsample the temperatures, precipitations and snow arrays by adding the contribution of each weight slice
                    pool.apply_async(interpolator, args = (i, k, mx, my), callback = get_results)
        
        pool.close()
        pool.join()
    
    # Print the time spent on the loop
    print("Time spent: ", time.time()-ts)
    
    

#%% ------ MAIN ALGORITHM NOT PARALLEL ------ ###

if __name__ == '__main__':
    
    # Create an empty array to use as a template
    dummy = np.zeros((w.shape[1], w.shape[2]))
    ts = time.time()
    
    for i in range(temperatures.shape[0]):
        
        # Create empty host arrays
        temp = dummy.copy()
        prec = dummy.copy()
        Eprec = dummy.copy()
        
        k = 0
        for mx in range(MX):
            for my in range(MY):
                
                get_results(interpolator(i, k, mx, my))
                k  = 1
                
    print("Time spent:", time.time()-ts)

CodePudding user response:

The problem with multiprocessing is that it creates many new processes taht execute the code before the main (ie. before if __name__ == '__main__'). This causes a very slow initialization (since all process does it) and a huge amount of RAM being used for nothing. You certainly should move everything in the main or if possible in functions (which generally results in a faster execution and is a good software engineering practice anyway, especially for parallel codes). Even with this, there is another huge problem with multiprocessing: inter-process communication is slow. One solution is to use a multi-threaded approach made possible by using Numba or Cython (you can disable the GIL with them as opposed to basic CPython threads). In fact, they are often simpler to use than multiprocessing. However, you should be more careful though since parallel access are unprotected and data-races can appear in bogus parallel codes.

In your case, the computation is mostly memory-bound. This means multiprocessing is pretty useless. In fact, parallelism is barely useful here unless you are running this code on a computing server with a high-throughput. Indeed, the memory is a shared resource and using more computing core does not help much since 1 core can almost saturate the memory bandwidth on a regular PC (while few cores are needed on computing servers).

The key to speed up memory-bound codes is to avoid creating temporary arrays and use cache-friendly algorithms. In your case, T, P and S are filled just to be read later so to update the temp, prec and Eprec arrays. This temporary step is pretty expensive and necessary here (especially filling the arrays). Removing this will increase the arithmetic intensity resulting in a code that will certainly be faster in sequential and that can better scale on multiple cores. This is the case on my machine.

Here is an example of code using Numba so to parallelize the code:

import numba as nb

# get_results   interpolator
@nb.njit('void(float64[:,::1], float64[:,::1], float64[:,::1], float64[:,:,::1], int_, int_, int_, int_)', parallel=True)
def interpolate_and_get_results(temp, prec, Eprec, w, i, k, mx, my):
    factor1 = ((temperatures[i,mx,my]-272.15)   (-alt[mx, my] * -6/1000))
    factor2 = precipitation[i,mx,my]
    factor3 = snow[i,mx,my]
    for i in nb.prange(w.shape[1]):
        for j in range(w.shape[2]):
            val = w[k, i, j]
            temp[i, j]  = factor1 * val
            prec[i, j]  = factor2 * val
            Eprec[i, j]  = factor3 * val

# Example of usage:
interpolate_and_get_results(temp, prec, Eprec, w, i, k, mx, my)

Note the string in nb.njit is called a signature and specify the type to the JIT so it can compile it eagerly.

This code is 4.6 times faster on my 6-core machine (while it was barely faster without the merge of get_results and interpolator). In fact, it is 3.8 times faster in sequential so threads does not help much since the computation is still memory-bound. Indeed, the cost of the multiply-add is negligible compared to the memory reads/writes.

  • Related