Home > Net >  multiprocess slower with numpy array
multiprocess slower with numpy array

Time:08-18

class MyClass():
    def __init__(self, audio_file_path):
        self.audio_file_path = audio_file_path
        ***other variables

    def sliding_window_function(audio_file_path):
        y, sr = librosa.load(audio_file_path)
        timestamps = np.arange(0, len(y)) / sr
        aby = np.abs(y)
        del(y)
        stamps_in_a_second = timestamps.shape[0]/librosa.get_duration(y=aby, sr=sr) 
        del(timestamps)
        scan_window_size = int(self.wanted_window_length*stamps_in_a_second)        
        qth_amp = np.quantile(aby, self.wanted_quantile_threshold)            
        adj_qth_amp = qth_amp*scan_window_size                                    
        window_sum = sum(aby[:scan_window_size])
        wanted_time_stamps = [[i, window_sum]
                            for i in range(len(aby) - scan_window_size)
                            if (window_sum := window_sum - aby[i]   aby[scan_window_size i]) > adj_qth_amp ]
        del(aby)

    def process_audio():
        wanted_time_stamps = sliding_window_function(self.audio_file_path)
        for time_stamp in wanted_time_stamps:
            #dosomething

def main(file_path):
    myclass = MyClass(file_path)
    myclass.process_audio()

if __name__ == "__main__":

    pool = multiprocessing.Pool()
    for file in file_path:
        try:
            pool.apply_async(main, args=(file,))
        except:
            pass
    pool.close()
    pool.join()

I have multiple audio file that needs to be processed. which

  1. read the audio file with librosa(an audio library)

  2. do some numpy array computation with sliding windows to find regional highs

  3. use result from 2 to modify the audio file, and video file(which I did not include in the code because it was not relavent to the issue I'm facing)

there are two reasons why I choose multiproccessing.Pool:

  1. I believe that sliding window, and audio processing(mainly audio/video processing) is considered more of a cpu loaded work, which would benefit from multiprocessing.
  2. Pool allows me to limit workers working concurrently at once, which allows me to allocate my computation powers according to my needs.

My problem:

1.after initializing the Pool, and running the code, everything is running on for extremely long and all stuck on the sliding window function:

                        for i in range(len(aby) - scan_window_size)
                        if (window_sum := window_sum - aby[i]   aby[scan_window_size i]) > adj_qth_amp ] 

specifically

for i in range(len(aby) - scan_window_size)
if (window_sum := window_sum - aby[i]   aby[scan_window_size i]) > adj_qth_amp

my cpu usage and power usage is still up, but every process in the pool seems to hang on these two lines when I send a KeyboardInterupt(its the same thing when using for loop append instead of list comprehension)

  1. Something strange I noticed is that my memory usage starts high, but after a few hours, they drop much lower than the theoretically should be. aby should be a array that is close to 1GB but each process is only using 500MB of RAM. but my code hasn't got to del(aby)

CodePudding user response:

I'd encourage you to use numpy intrinsics where possible, they're much faster than multiprocessing and will already be multithreaded where sensible.

For example:

import numpy as np

# 100M values
x = np.random.uniform(size=100_000_000)

windowed_sum = np.convolve(x, np.ones_like(x, shape=10), 'valid')
ix, = np.where(windowed_sum > 9)

will likely be >100 times faster than doing the work as you were doing by pulling individual values out of Numpy and into Python. I'd also be tempted to do the parallelism outside of Python where possible, it tends to make things easier to reason about and debug.

See Python Running cumulative sum with a given window for other ways of calculating a running sum efficiently.

  • Related