Home > other >  How to parallel the following code using Multiprocessing in Python
How to parallel the following code using Multiprocessing in Python

Time:12-29

I have a function generate(file_path) which returns an integer index and a numpy array. The simplified of generate function is as follows:

def generate(file_path):
  temp = np.load(file_path)
  #get index from the string file_path
  idx = int(file_path.split["_"][0])
  #do some mathematical operation on temp
  result = operate(temp)
  return idx, result

I need to glob through a directory and collect the results of generate(file_path) into a hdf5 file. My serialization code is as follows:

for path in glob.glob(directory):
    idx, result = generate(path)

    hdf5_file["results"][idx,:] = result
    
hdf5_file.close()

I hope to write a multi-thread or multi-process code to speed up the above code. How could I modify it? Pretty thanks!

My try is to modify my generate function and to modify my "main" as follows:

def generate(file_path):
    temp = np.load(file_path)
    #get index from the string file_path
    idx = int(file_path.split["_"][0])
    #do some mathematical operation on temp
    result = operate(temp)
      
    hdf5_path = "./result.hdf5"
    hdf5_file = h5py.File(hdf5_path, 'w')
    hdf5_file["results"][idx,:] = result

    hdf5_file.close()

if __name__ == '__main__':
    ##construct hdf5 file
    hdf5_path = "./output.hdf5"
    hdf5_file = h5py.File(hdf5_path, 'w')
    hdf5_file.create_dataset("results", [2000,15000], np.uint8)

    hdf5_file.close()

    path_ = "./compute/*"
    p = Pool(mp.cpu_count())
    p.map(generate, glob.glob(path_))
    hdf5_file.close()
   
    print("finished")

However, it does not work. It will throw error

KeyError: "Unable to open object (object 'results' doesn't exist)"

CodePudding user response:

I detected some errors in initialising the dataset after examining your code;

You produced the hdf5 file with the path ""./result.hdf5" inside the generate function.

However, I think you neglected to create a "results" dataset beneath that file, as that is what is causing the Object Does Not Exist issue.

Kindly reply if you still face the same issue with error message

CodePudding user response:

You can use a thread or process pool to execute multiple function calls concurrently. Here is an example which uses a process pool:

from concurrent.futures import ProcessPoolExecutor
from time import sleep


def generate(file_path: str) -> int:
    sleep(1.0)
    return file_path.split("_")[1]


def main():
    file_paths = ["path_1", "path_2", "path_3"]
    
    with ProcessPoolExecutor() as pool:
        results = pool.map(generate, file_paths)
        
        for result in results:
            # Write to the HDF5 file
            print(result)
    

if __name__ == "__main__":
    main()

Note that you should not write to the same HDF5 file concurrently, i.e. the file writing should not append in the generate function.

  • Related