I am trying to process a huge set of job-loads by calling some function on a list of arguments as:
import multiprocessing as mp
print("Number of processors: ", mp.cpu_count())
pool = mp.Pool(mp.cpu_count())
try:
results = pool.map_async(consume_one, [list-of-arguments]).get()
except:
print(e)
finally:
pool.close()
And for each call of consume_one()
, we pass one value from '[list-of-arguments]' and in this function I log the start and end time for the funcition consume_one()
.
Observed values are as:
Completed processing for ... in 0:03:34.283025
Completed processing for ... in 0:04:24.109049
Completed processing for ... in 0:04:58.464374
Completed processing for ... in 0:05:11.830404
Completed processing for ... in 0:08:32.234539
Completed processing for ... in 0:09:09.725937
Completed processing for ... in 0:09:10.968685
Completed processing for ... in 0:09:51.642501
Completed processing for ... in 0:10:58.076675
Completed processing for ... in 0:12:30.905190
Completed processing for ... in 0:14:01.051716
As we can see in the times in log that all the subsequent calls to the same function are taking longer and longer, while it is not because of argument to those calls and argument is more of less same for all of them.
My question is:
Why this might be happening?
How can I debug this?
CodePudding user response:
Q1 :
"Why this might be happening?"
A1 :
Growth of the accumulated processing inefficiencies
Q2 :
" How can I debug this?"
A2 :
Best by understanding the composition of where you lose most of the processing efficiency. First in principle, next using in practical techniques, how to reduce or better eliminate such expensive overhead hotspots at all.
First isolate individual root causes of the processing inefficiencies :
costs of Python Interpreter process spawning add-on overhead costs (on some O/S-es a full, I repeat a FULL copy - i.e. could be a few
[GB]
of moving RAM-to-RAM physical memory-I/O bottleneck on your hardware - check (a) how many I/O channels are present for CPU-motherboard-DRAM on the way from / to a physical-RAM and (b) whether so many copied process-memory-allocations did not actually turned the O/S virtual-memory management into swapping-mode, i.e. since that scope of (processes-times-replicated)-memory did not fit into your computing platform's physical-RAM footprint, the O/S virtual-memory manager started to "emulating" a such missing RAM with on-demand (see LRU-policy et al) by swapping large-chunks of physical-RAM data out, on disk (see 1E5 larger latencies and add-on costs of traffic-jamming if not straight blocking any other physical-RAM to CPU data-flow, during moving blocks of[GB]
-s to/from such a swap-space disk-storage. This cost is multiplied by 2, as the costs of moving another data, from swap-space disk-storage back into the "freed"-RAM for a next process allowed to use it (at least for some fair-amount-of-time, as the O/S virtual-memory manager thinks it being fair) is principally the same as the previous cost of moving data out (to make space for this move in). Sort of "SOKOBAN" problem, when moving all zillions of[GB]
-s through a pair of CPU to physical-RAM memory-I/O channels, waiting for a 1E5 times slower disk-storage for any next block to { read | write } in the long long waiting queue there - it so brutal, as we speak here about the resulting end-to-end computing strategy efficiency, that it is often called RAM-thrashing or swap-juggling performance anti-pattern in efficient computing science )costs of Python Interpreter process-to-process data-transfer ( SER/xfer/DES takes place for both call-signature parameters passed and also for results returned ) - this is an example, where this very performance anti-pattern could have been principally avoided, saving all add-on costs of unnecessary SER/xfer/DES data-flow of
~ 75 [TB]
(! terabytes of physical-RAM memory-I/O, CPU-pickle.dumps(), O/S-pipe-xfers, CPU-pickle.loads() and another physical-RAM memory-I/O-s -- all that wasted for~ 75 [TB]
data-flow, injected pointlessly just due to wrong use of a "slim"-SLOC list-comprehension based "external"-iterator, alike the one "hidden" to operate inside the.map_async()
-method used above )costs of your code inefficiencies in not re-using expensively prefetched data in cache-lines
costs of hardware thermal throttling the "marketing"-promoted CPU frequencies, once the cores start to do some heavy work and get warm ( throttling can on some hardware be deferred, if a job can be shifted onto another, cooler, CPU-core - yet at a cost of losing the core-local cached data, so the code has to re-fetch it again at the most expensive price in TimeDOMAIN, just as it was moved onto a cooler, but working at a higher frequency, CPU-core ). In the situation here, your code has spawned immense amount of processes ( not mentioning the O/S-process-scheduler add-on costs now ), there are no cooler CPU-cores left and your hardware resorts to thermal throttling - i.e. working on lower and lower CPU-core frequency
costs of explicitly uncontrolled garbage collections are a chapter of its own importance
For detailed argumentation on pros and cons & overhead testing templates you might like this collection of multiprocessing.Pool()
& joblib.Parallel()
related
If you are interested not only in debugging the problem, but also in solving the end-to-end process-performance, try to re-factor the code so as to principally prevent this from happening - if you are sure you want that many processes (while these are most probably memory-starved, waiting for not yet fetched data from slow & overloaded/blocked physical-RAM-I/O channels), be sure to move from a __main__
-Python Interpreter hosted "outer"-item-iterator to a block-based work. There you command the worker processes to iterate "internally" (avoiding all the awfully many repetitions of SER/xfer/DES add-on costs) over their commanded, disjunct block-of-list, partitioned by the __main__
-Python Interpreter. As the Python Interpreter process-instantiation works in the known manner (copying a state-full copy of the main-Python Interpreter - previous attempts to reduce the amount of data copied by O/S-provided "forking" have been documented to cause problems or can even lead to deadlocking situations, so a due care is to be paid here, if the code has to be robust enough, if it is going into production or providing some kind of critical intelligence), the list, per-se will be already present in the worker processes, that can iterate over their "private"-part (one of the disjunct blocks of this list
) just by commanded index-ranges, i.e. without expensively passing any other parameter to the .Pool()
-processes, but the respective index-range ~ as trivial as ( startHere, stopHere )
-tuples, that map / cover the whole list. The costs of returning results depend on what these consist of. Tools exist for doing this efficiently, be it in block-transfer between processes on list-block completed, or in compressed file-I/O storage. Details matter, as always, if performance is the goal. As the items take ~ 3 minutes in the as-is state of the consume_one()
processing, there is plenty of opportunities to speed this up, in the name of the overall processing efficiency and performance.
Block-based "internal"-iteration can keep calling the as-is consume_one()
, if that could not get faster by some performance improving tools - be it numpy
-vectorised (using internally high performance multicore, cache re-use efficient libraries) or numba
-JIT-compiled accelerated processing (as the re-use count of the JIT/LLVM-compiled code here works in your direction, towards improving the overall performance )