Home > Back-end >  How to do multiprocessing of image augmentations of large quantity?
How to do multiprocessing of image augmentations of large quantity?

Time:09-28

I'm trying to do image augmentations for which I'm getting images from large amount of folders doing this in sequel take lots of time so I'm running the same script in different terminal inorder to complete augmentations very quickly by providing start and end value of list index as shown in below code

do_augmentations(args):
   total_count = len(all_folders)
   split_no = total_count //2
   start = split_no
   if split_no  == 0:
    split_no = 1
   end = total_count
   for folder in all_folders[start:end]:
    allImgs = list(paths.list_images(folder))
    count = len(allImgs)
    for img in allImages:
        augmentations(img)
        cv2.imwrite(img)
def main():
   all_folders= os.walk(folderpath)
   do_augmentations(all_folders)

I was wondering can we do this approach utilizing multiple CPU cores with multithreading and multiprocessing packages in python in parallel fashion instead of running on sequential as it takes long time , here I'm mentioning start and end value of folder number to run separately in multiple terminals to run faster . I tried using multiprocessing library to implement this in parallel but this run in sequential manner as before below is the code of my approach to solve this

from multiprocessing import pool
from multiprocessing.dummy import Pool as ThreadPool
do_augmentations(args):
   all_folers = args[0]
   bpath = args[1]
   for folder in all_folders:
    allImgs = list(paths.list_images(folder))
    count = len(allImgs)
    for img in allImages:
        augmentations(img)
        cv2.imwrite(img)
def main():
   bpath = 'img_foldr'
   all_folders= os.walk(folderpath)
   pool = ThreadPool(4)
   pool.map(do_augmentations,[[all_folders,bpath]])

running this it does image processing one folder at a time in loop instead in parallel for many folder simultaneously I don't understand what I'm doing wrong , any help or suggestion to solve this will be very helpful thanks in advance

Update: I tried answer given by Jan Wilamowski as below

from multiprocessing import Pool

do_augmentations(args):
   all_folders = args[0]
   print(len(list(all_folders))
   bpath = args[1]
   for folder in all_folders:
    allImgs = list(paths.list_images(folder))
    count = len(allImgs)
    for img in allImages:
        augmentations(img)
        cv2.imwrite(img)
def main():
   bpath = 'img_folder'
   all_folders= os.walk(folderpath)
   allImages = chain(paths.list_images(folder) folder in all_folders)
   prin(len(list(allImages)))
   print(allImages)
   pool = Pool(4)
   pool.map(do_augmentations,[[allImages,bpath]]) 

It'll print output as below

12

<itertoolchain object at 0x000d0dfdafdffdf>

0

and It'll never execute for loop in do_augmentations() as list value is 0 but when I print list value outside the function I'm getting 12 which is number of sub folders , what am I doing wrong here? why am I not able to fetch all images in folder and pass it to do_augmentations() func

CodePudding user response:

Have your function work on a single folder and pass the folder list to pool.map(). Also, use a process pool to avoid problems with the GIL (as pointed out by several commenters):

from multiprocessing import Pool

do_augmentations(folder):
    allImgs = list(paths.list_images(folder))
    count = len(allImgs)
    for img in allImages:
        augmentations(img)
        cv2.imwrite(img)

def main():
    bpath = 'img_foldr'
    all_folders = os.walk(folderpath)
    pool = Pool(4)
    pool.map(do_augmentations, all_folders)

You could also break it down further and have your function work on a single image, giving more consistent performance:

from itertools import chain

def augment(image):
    augmentations(image)
    cv2.imwrite(image)

def main():
    all_folders = os.walk(folderpath)
    all_images = chain(paths.list_images(folder) for folder in all_folders)
    pool = Pool(4)
    pool.map(augment, all_images)

Note however that disk I/O can be a bottleneck so don't expect a linear performance improvement.

CodePudding user response:

Let me give you my simple multiprocessing recipy for any task not just augmentation.

This is a top down view.

import os 
import multiprocessing
from workers import batch_function

no_of_cpus = os.cpu_count()
all_folders= os.walk(folderpath)
input_list = get_all_the_files_from_folder(all_folders) #This is specific to your file structures

mp_dict= split_input_list(number_of_splits=int(no_of_cpus), input_list= input_list)

pool = multiprocessing.Pool()
results = pool.map(workers.batch_function, mp_dict) # Call Image Augmentations

First I would make a list of all the data that needs to be preprocessed and then split it in the number of process I want by means of a function split_input_list.

If you don't need to return anything from the batch function you don't need the results variable but in essence you will have a list of results from each process that you can iterate by means of for res in results: no matter what you return in batch_function.

def split_input_list(process_number, d):
    dict_list = []
    pn = process_number
    for i in range(pn - 1):
        start = len(d) // pn * i
        finish = len(d) // pn * (i   1)

        split_dict = dict(list(d.items())[start:finish])
        print(len(split_dict))
        dict_list.append(split_dict)

    last_dict = dict(list(d.items())[finish:])
    dict_list.append(last_dict)

    print(len(last_dict))
    return dict_list

Then in a separate workers.py file I usually have multiple batch_function that can accomplish certain tasks. In this case for augmentations I would do something similar to :

def batch_function(mp_list):

    local_split = mp_list[0]

    for k,_ in local_split.items():
        augmentations(k)

    ...
    #return what_you_need

Also if you don't have an impressive amount of RAM and a CPU with 32 cores expect some crashing from lack of memory.

  • Related