Home > Software engineering >  Python - multiprocessing multiple folders
Python - multiprocessing multiple folders

Time:09-22

As a newbie to the programming, I'm still exploring the concepts of multiprocessing and multithreading.

I've written small script that reads a file and copies the files to multiple temporary folders and does the following actions on each folder.

  1. build a label.
  2. Generate an package
  3. Push it to the Nexus.

There are ~500 folders & are processed sequentially. How can I use multiprocessing here, thereby process 100 folders at a time parallel or increase the number still. Also, is it possible to keep track of those process and fail the build even one sub process fails.

I read multiple articles on multiprocessing, but couldn't wrap my head around it :(

Any guidance would be of great help to me, thanks.

folder1
   -- war file
   -- metadata

folder 2
   -- war file
   -- metadata
....
....

folder 500
   -- war file
   -- metadata

Code snippet

import re, shutil, os
from pathlib import Path

target = "/home/work"
file_path = target   "/file.txt"

dict = {}
count = 1

def commands_to_run_on_each_folder(filetype, tmp_folder):
    target_folder = tmp_folder '/tmp' str(count)

    os.system(<1st command to build the label>)
    os.system(<2nd command to build the package>)
    <multiple file manipulations, where `filetype` is used and get the required file with right extension>
    <curl command to upload it to the Nexus>

#Read the text file and assemble it in a dictionary.
with open(file_path, 'r') as f:
    lines = f.read().splitlines()
    for i, line in enumerate(lines):
        match = re.match(r".*.war", line)
        if match:
            j = i-1 if i > 1 else 0
            for k in range(j, i):
                dict[match.string] = lines[k]
#Iterate the dictionary and copy the folder to the temporary folders.
for key, value in dict.items():
    os.mkdir(target '/tmp' str(count))
    shutil.copy(key, target '/tmp' str(count))
    shutil.copy(value, target '/tmp' str(count))
    commands_to_run_on_each_folder("war", target)
    count  = 1

OS : Ubuntu 18.04 Memory : 22 GB container

CodePudding user response:

This is not a good target for multiprocessing, but it is a good target for gnu parallel.

Your builds happen in the background: python is just calling system commands. You can certainly make multiple background os.system calls in parallel from python, but this script would be much better off running as a find | parallel paradigm.

What I would do is rewrite the script to process only one folder. Then I would do:

find /path/to/root/folder -type d | parallel --bar -I{} python3 script.py {} \;

Since you are on ubuntu, you have find and parallel already. Note that this is bash, to run in the shell, and not python.

Reasoning against doing this in python

  • don't re-invent the wheel.
  • easily customisable: you can change the number of processes by adding --jobs N
  • your code just calls other processes: you're using python like a scripting language like bash (which is fine), so it makes more sense to think of it as a build script for each folder
  • you get a progress bar and other goodies for free!

On the other hand if you do want to do this in python it is possible.

Note that current wisdom recommends using subprocess over os.system.

CodePudding user response:

Using concurrent.futures is easy. I have modified your script to become:

#!/usr/bin/env python3
import itertools
import concurrent.futures
import logging
import pathlib
import re
import shutil


logging.basicConfig(
    level=logging.DEBUG,
    format="%(levelname)s:%(processName)s:%(message)s"
)


def worker(path1, path2, src, target, logger):
    logger.debug("Create dir %s", target)
    target.mkdir(exist_ok=True)

    logger.debug("Copy files")
    shutil.copy(src / path1, target / path1)
    shutil.copy(src / path2, target / path2)

    logger.debug("Additional commands to run on %s", target)
    # TODO: Add actions here
    # commands_to_run_on_each_folder(...)


def main():
    #Read the text file and assemble it in a dictionary.
    tasks = {}
    with open("file.txt", 'r') as f:
        lines = f.read().splitlines()
        for i, line in enumerate(lines):
            match = re.match(r".*.war", line)
            if match:
                j = i-1 if i > 1 else 0
                for k in range(j, i):
                    tasks[match.string] = lines[k]

    logger = logging.getLogger()
    # src: The directory where this script is
    src = pathlib.Path(__file__).parent
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for taskid, (path2, path1) in enumerate(tasks.items(), 1):
            target = pathlib.Path(f"/tmp/dir{taskid}")

            # Calls `worker` function with parameters path1, path2, ...
            # concurrently
            executor.submit(worker, path1, path2, src, target, logger)


if __name__ == "__main__":
    main()

Here is a sample output:

DEBUG:ForkProcess-1:Create dir /tmp/dir1
DEBUG:ForkProcess-1:Copy files
DEBUG:ForkProcess-1:Additional commands to run on /tmp/dir1
DEBUG:ForkProcess-1:Create dir /tmp/dir2
DEBUG:ForkProcess-1:Copy files
DEBUG:ForkProcess-1:Additional commands to run on /tmp/dir2
DEBUG:ForkProcess-1:Create dir /tmp/dir3
DEBUG:ForkProcess-1:Copy files
DEBUG:ForkProcess-1:Additional commands to run on /tmp/dir3
DEBUG:ForkProcess-1:Create dir /tmp/dir4
DEBUG:ForkProcess-1:Copy files
DEBUG:ForkProcess-1:Additional commands to run on /tmp/dir4

Notes

  • I use logging instead of print because logging works better with multi process environment
  • To turn off logging, change the level to logging.WARN
  • I use pathlib because it is more convenient than os.path
  • Note: The submit call will not wait. That means if function worker takes a long time to run, submit will return right away.
  • Using the with construct, the executor will wait for all the concurrent tasks to finish before exiting. This is what you want.
  • Related