Home > OS >  Multiprocessing pool map for a BIG array computation go very slow than expected
Multiprocessing pool map for a BIG array computation go very slow than expected

Time:06-20

I've experienced some difficulties when using multiprocessing Pool in python3. I want to do BIG array calculation by using pool.map. Basically, I've a 3D array which I need to do computation for 10 times and it generates 10 output files sequentially. This task can be done 3 times i,e, in the output we get 3*10=30 output files(*.txt). To do this, I've prepared the following script for small array calculation (a sample problem). However, when I use this script for a BIG array calculation or array come out from a series of files, then this piece of code (maybe pool) capture the memory, and it does not save any .txt file at the destination directory. There is no error message when I run the file with command mpirun python3 sample_prob_func.py Can anybody suggest what is the problem in the sample script and how to write code to get rid of stuck? I've not received any error message, but don't know where the problem occurs. Any help is appreciated. Thanks!

import numpy as np
import multiprocessing as mp
from scipy import signal
import matplotlib.pyplot as plt
import contextlib
import os, glob, re
import random
import cmath, math
import time
import pdb

#File Storing path
save_results_to = 'File saving path'

arr_x = [0, 8.49, 0.0, -8.49, -12.0, -8.49, -0.0, 8.49, 12.0]
arr_y = [0, 8.49, 12.0, 8.49, 0.0, -8.49, -12.0, -8.49, -0.0]
N=len(arr_x)

np.random.seed(12345)
total_rows = 5000
arr = np.reshape(np.random.rand(total_rows*N),(total_rows, N))
arr1 = np.reshape(np.random.rand(total_rows*N),(total_rows, N))
arr2 = np.reshape(np.random.rand(total_rows*N),(total_rows, N))

# Finding cross spectral density (CSD)
def my_func1(data):
    # Do something here
    return  array1


t0 = time.time()
my_data1 = my_func1(arr)
my_data2 = my_func1(arr1)
my_data3 = my_func1(arr2)

print('Time required {} seconds to execute CSD--For loop'.format(time.time()-t0))
mydata_list  = [my_data1,my_data3,my_data3]


def my_func2(data2):
    # Do something here
    return from_data2



start_freq = 100
stop_freq  = 110
freq_range= np.around(np.linspace(start_freq,stop_freq,11)/10, decimals=2)
no_of_freq = len(freq_range)

list_arr =[]

def my_func3(csd):
    list_csd=[]
    for fr_count in range(start_freq, stop_freq):
        csd_single = csd[:,:, fr_count]
        list_csd.append(csd_single)
    print('Shape of list is :', np.array(list_csd).shape)
    return list_csd

def parallel_function(BIG_list_data):
    with contextlib.closing(mp.Pool(processes=10)) as pool:
       dft= pool.map(my_func2, BIG_list_data)
       pool.close()
       pool.join()
    data_arr = np.array(dft)
    print('shape of data :', data_arr.shape)
    return data_arr

count_day = 1
count_hour =0
for count in range(3):
    count_hour  =1
    list_arr = my_func3(mydata_list[count])  # Load Numpy files
    print('Array shape is :', np.array(arr).shape)
    t0 = time.time()
    data_dft = parallel_function(list_arr)
    print('The hour number={} data is processing... '.format(count_hour))
    print('Time in parallel:', time.time() - t0)
    for i in range(no_of_freq-1): # (11-1=10)
        jj = freq_range[i]
        #print('The hour_number {} and frequency number {} data is processing... '.format(count_hour, jj))
        dft_1hr_complx = data_dft[i,:,:]
        np.savetxt(save_results_to   f'csd_Day_{count_day}_Hour_{count_hour}_f_{jj}_hz.txt',  dft_1hr_complx.view(float))

CodePudding user response:

As @JérômeRichard suggested,to aware your job scheduler you need to define the number of processors will engage to perform this task. So, the following command could help you: ncpus = int(os.getenv('SLURM_CPUS_PER_TASK', 1))

You need to use this line inside your python script. Also, inside the parallel_function use with contextlib.closing(mp.Pool(ncpus=10)) as pool: instead of with contextlib.closing(mp.Pool(processes=10)) as pool:. Thanks

  • Related