I've implemented a process pool using concurrent.futures.ProcessPoolExecutor, but I've noticed that when I print out pool._queue_count it keeps growing each time I submit a new item of work to the pool. Why is it doing this and is this going to be an issue?
Here is the output I'm currently logging:
2022-12-06 15:37:31,934 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:31,934 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:31,935 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:31,935 - DEBUG | Pool queue size: 329
2022-12-06 15:37:31,935 - DEBUG | Pool processes: {19113: <ForkProcess name='ForkProcess-2' pid=19113 parent=19104 started>, 19114: <ForkProcess name='ForkProcess-3' pid=19114 parent=19104 started>}
2022-12-06 15:37:31,935 - DEBUG | Pool pending work: {328: <concurrent.futures.process._WorkItem object at 0x7f247f7be2e0>}
2022-12-06 15:37:41,946 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:41,946 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:41,946 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:41,947 - DEBUG | Pool queue size: 330
2022-12-06 15:37:41,947 - DEBUG | Pool processes: {19113: <ForkProcess name='ForkProcess-2' pid=19113 parent=19104 started>, 19114: <ForkProcess name='ForkProcess-3' pid=19114 parent=19104 started>}
2022-12-06 15:37:41,947 - DEBUG | Pool pending work: {329: <concurrent.futures.process._WorkItem object at 0x7f247f7be6a0>}
Notice that the pool queue size now reports 330 - but I don't understand what that means or why it's so high. It increments the size by one each time for some reason.
I can't paste all the code as there's a fair bit, but here is a slightly condensed version, some snippets of code I didn't feel were relevant are cut out:
futures = []
with mp.Manager() as manager:
last_execution = time.perf_counter()
pool = ProcessPoolExecutor()
while True:
current_time = time.perf_counter()
if current_time - last_execution < 10 and not first_run:
time.sleep(1)
else:
last_execution = current_time
for automation_file in automation_files:
with open(automation_file, "r") as f:
automation_config = json.load(f)
automation_name = os.path.splitext(os.path.basename(automation_file))[0]
automation_log = os.path.join(log_dir, f"{automation_name}.log")
automation_type = automation_config["type"]
if automation_type == "task":
automation = pyba.AutomationTask(automation_name, automation_config, automation_log, api_1, api_2)
else:
logger.error(f"Unknown automation type in '{os.path.basename(automation_file)}', skipping")
continue
logger.debug(f"Running automation '{automation.name}' with internal automation id '{automation._id}'")
future = pool.submit(automation.run, args=(session_1, session_2, stop_app_event))
futures.append(future)
logger.debug(f"Pool queue size: {pool._queue_count}")
logger.debug(f"Pool processes: {pool._processes}")
logger.debug(f"Pool pending work: {pool._pending_work_items}")
Basically, we get a bunch of automation files, parse them, then run them in a new process using the process pool. Then we wait for a given interval (for testing here 10 seconds), and do the exact same again.
However, right now there is nothing for these automation processes to actually process as I'm in test and haven't created any test records for it... so I don't see how the queue size could grow so large over time.
Number of CPU's on my test server is 2 - so should only be two processes in the pool?
I don't think memory or CPU is an issue here:
-bash-4.2$ ps aux | head -1; ps aux | grep -iE 'python3.9|19104' | grep -v grep | sort -rnk 4
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
admin 19114 0.0 0.0 225584 15648 pts/1 S 14:42 0:00 python3.9 app.py
admin 19113 0.0 0.0 225584 15612 pts/1 S 14:42 0:00 python3.9 app.py
admin 19107 0.0 0.0 520492 15376 pts/1 Sl 14:42 0:01 python3.9 app.py
admin 19104 0.0 0.0 374080 20248 pts/1 Sl 14:42 0:02 python3.9 app.py
Last thing to mention as well is that I've implemented a graceful stop solution using signals. When I send a signal to the app to stop, it stops almost immediately gracefully - this demonstrates that it's not doing any processing despite such a large queue count. Which sort of adds to the confusion really:
2022-12-06 16:16:05,505 - DEBUG | Pool queue size: 560
2022-12-06 16:16:05,506 - DEBUG | Pool processes: {19113: <ForkProcess name='ForkProcess-2' pid=19113 parent=19104 started>, 19114: <ForkProcess name='ForkProcess-3' pid=19114 parent=19104 started>}
2022-12-06 16:16:05,506 - DEBUG | Pool pending work: {559: <concurrent.futures.process._WorkItem object at 0x7f247f738160>}
2022-12-06 16:16:12,516 - DEBUG | Received a signal to stop the app, setting the stop flag
2022-12-06 16:16:12,516 - DEBUG | Cancelling all scheduled pending work
2022-12-06 16:16:12,518 - DEBUG | Shutting down the process pool
2022-12-06 16:16:12,522 - DEBUG | Process pool shut down successfully, app stopped
CodePudding user response:
_queue_count
is just a sequential work item ID and it will never decrease.
You're not supposed to read it manually anyway (that's what the prefixing underscore in its name means!).