Home > database >  How do you get multiprocessing Pool to not spin up new processes but also not terminate currently ru
How do you get multiprocessing Pool to not spin up new processes but also not terminate currently ru

Time:09-30

I am using the Python multiprocessing.Pool class in Python 2.7. I have a large amount of jobs that can only run in a certain time period of the day. Each job takes some time. I would like to limit the jobs to run at most n parallel at one time.

The Pool functionality works to limit the number of parallel jobs nicely, but it seems to have issues when I am trying to wind down the jobs. When I am at the end of my window I would like the currently running jobs to finish their processing. I would like for no new jobs to start. I have been trying to do this using Pool.close(), which does let my running processes finish as desired, but it seems from experimentation that the jobs which were in the queue but not yet started processing will still be submitted for processing even after the Pool is closed.

The other option, Pool.terminate() aggressively closes even the running jobs, which goes against desired behaviour.

Function Allows running jobs to finish Prevents new jobs from starting
.terminate() No Yes
.close() Yes No
Desired behaviour Yes Yes

CodePudding user response:

First, you should not be using Python2.7, it has been deprecated for a while now.

You should use a ProcessPoolExecutor from the concurrent.futures standard library and call the .shutdown() method with the cancel_futures flag activated to let the executor finish started jobs but cancel any pending work.

from concurrent.futures import ProcessPoolExecutor

parallel_jobs = 4  # The pool size
executor = ProcessPoolExecutor(parallel_jobs)

future_1 = executor.submit(work_1, argument_1)
...
future_n = executor.submit(work_n, argument_n)

...
# At some point when the time window ends and you need to stop jobs:
executor.shutdown(cancel_futures=True)

# Some futures such as future_n may have  been cancelled here, you can check that:
if future_n.cancelled():
    print("job n has been cancelled")

# Or you can try to get the result while checking for CancelledError:

try:
    result_n = future_n.result()
except CancelledError:
    print("job n hasn't finished in the given time window")

Here is an example of cancellation:

from concurrent.futures import ThreadPoolExecutor, as_completed, wait
from time import sleep

# The job to execute concurrently
def foo(i: int) -> str:
    sleep(0.2)
    print(i)
    return f"{i}"

e = ThreadPoolExecutor(4)

# Jobs are scheduled concurrently, this call does not block
futures = [e.submit(foo, i) for i in range(100)]

# Shutdown during execution and cancel pending jobs
e.shutdown(cancel_futures=True)

# Gather completed results
results = [f.result() for f in futures if not f.cancelled()]
print(results)

If you execute this code you'll see that the 100 scheduled jobs are not all completed, only some are because the executor has been shutdown in between.

  • Related