I have to run about 200-300 python scripts daily having different arguments, for example:
python scripts/foo.py -a bla -b blabla ..
python scripts/foo.py -a lol -b lolol ..
....
Lets say I already have all these arguments for every script present inside a list, and I would like to concurrently execute them such that the CPU is always busy. How can I do so?'
My current solution:
script for running multiple processes:
workers = 15
for i in range(0,len(jobs),workers):
job_string = ""
for j in range(i,min(i workers,len(jobs))):
job_string = jobs[j] " & "
if len(job_string) == 0:
continue
print(job_string)
val = subprocess.check_call("./scripts/parallelProcessing.sh '%s'" % job_string,shell=True)
scripts/parallelProcessing.sh (used in the above script)
echo $1
echo "running scripts in parallel"
eval $1
wait
echo "done processing"
Drawback:
I am executing K processes in a batch, and then another K and so on. But CPU cores utilization is much lower as the number of running processes keep reducing, and eventually only one process is running at a time (for a given batch). As a result, the time taken to complete all the processes is significant.
One simple solution is to ensure K processes are always running, i.e once the previous process gets completed, a new one must be scheduled. But I am not sure how to implement such a solution.
Expectations:
As the task is not very latency sensitive, I am looking forward to a simple solution which keeps CPU mostly busy.
Note: Any two of those processes can execute simultaneously without any concurrency issues. The host where these processes run has python2.
CodePudding user response:
This is a technique I developed for calling many external programs using subprocess.Popen
. In this example, I'm calling convert
make JPEG images from DICOM files.
In short; it uses manageprocs
to keep checking a list of running subprocesses. If one is finished, it is removed and a new one is started as long as unprocesses files remain. After that, the remaining processes are watched until they are all finished.
from datetime import datetime
from functools import partial
import argparse
import logging
import os
import subprocess as sp
import sys
import time
def main():
"""
Entry point for dicom2jpg.
"""
args = setup()
if not args.fn:
logging.error("no files to process")
sys.exit(1)
if args.quality != 80:
logging.info(f"quality set to {args.quality}")
if args.level:
logging.info("applying level correction.")
start_partial = partial(start_conversion, quality=args.quality, level=args.level)
starttime = str(datetime.now())[:-7]
logging.info(f"started at {starttime}.")
# List of subprocesses
procs = []
# Do not launch more processes concurrently than your CPU has cores.
# That will only lead to the processes fighting over CPU resources.
maxprocs = os.cpu_count()
# Launch and mange subprocesses for all files.
for path in args.fn:
while len(procs) == maxprocs:
manageprocs(procs)
procs.append(start_partial(path))
# Wait for all subprocesses to finish.
while len(procs) > 0:
manageprocs(procs)
endtime = str(datetime.now())[:-7]
logging.info(f"completed at {endtime}.")
def start_conversion(filename, quality, level):
"""
Convert a DICOM file to a JPEG file.
Removing the blank areas from the Philips detector.
Arguments:
filename: name of the file to convert.
quality: JPEG quality to apply
level: Boolean to indicate whether level adustment should be done.
Returns:
Tuple of (input filename, output filename, subprocess.Popen)
"""
outname = filename.strip() ".jpg"
size = "1574x2048"
args = [
"convert",
filename,
"-units",
"PixelsPerInch",
"-density",
"300",
"-depth",
"8",
"-crop",
size " 232 0",
"-page",
size " 0 0",
"-auto-gamma",
"-quality",
str(quality),
]
if level:
args = ["-level", "-35%,70%,0.5"]
args.append(outname)
proc = sp.Popen(args, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
return (filename, outname, proc)
def manageprocs(proclist):
"""Check a list of subprocesses for processes that have ended and
remove them from the list.
Arguments:
proclist: List of tuples. The last item in the tuple must be
a subprocess.Popen object.
"""
for item in proclist:
filename, outname, proc = item
if proc.poll() is not None:
logging.info(f"conversion of “{filename}” to “{outname}” finished.")
proclist.remove(item)
# since manageprocs is called from a loop, keep CPU usage down.
time.sleep(0.05)
if __name__ == "__main__":
main()
I've left out setup()
; it's using argparse
to deal with command-line arguments.
Here the thing to be processed is just a list of file names. But it could also be (in your case) a list of tuples of script names and arguments.