I'm reviewing some code and noticed some possibly redundant code:
def tasker(val):
do stuff
def multiprocessor (func, vals):
chunks = np.array_split(vals, os.cpu_count())
with multiprocessing.Pool() as pool:
pool.map(partial(func,vals), chunksize=chunks)
if __name__ == '__main__':
values = foobar
p = multiprocessing.Process(target=multiprocessor(tasker,values))
p.start()
p.close()
p.join()
Just for a sanity check - Is running multiprocessing.Process on the multiprocessing.Pool function not redundant? No need to functionalize the multiprocessing.Pool to begin with, correct? Is there any advantage of running it like this?
CodePudding user response:
As it happens, the Process
call never actually does anything useful; target=multiprocessor(tasker,values)
is running multiprocessor
in the main process, then passing its return value (None
, since it has no explicit return) as the target
for the Process
.
So yes, definitionally, this is completely pointless; you make the Pool
in the parent process, run it to completion, then create a no-op Process
, launch it, it does nothing, then when the useless Process
exits, the main process continues. Unless there is some benefit to creating such a no-op process, the code would do the same thing if the guarded block were just:
if __name__ == '__main__':
values = foobar
multiprocessor(tasker, values)
If the Process
had been created correctly, with:
p = multiprocessing.Process(target=multiprocessor, args=(tasker, values))
and the code was more complex, there might be some benefit to this, if the Process
needed to be killable (you could kill it easily for whatever reason, e.g. because some deadline had passed), or it would allocate huge amounts of memory that must be completely returned to the OS (not merely released to the user-mode free pool for reuse), or you were trying to avoid any mutations of the main process's globals (if the Process
's target
mutated them, the changes would only be seen in that child process and any processes fork
ed after the change, the parent would not see them changed).
As written, none of these conditions seem to apply (aside from maybe memory growth issues, especially due to the use of partial
, which has issues when used as the mapper function with Pool
's various map
-like methods), but without knowing the contents of tasker
(more specifically, what it returns, which Pool.map
will collect and dispose of, consuming memory that isn't strictly needed only to free it in bulk at the end), I can't be sure.
An aside:
I'll note your code as written makes no sense:
def multiprocessor (func, vals):
chunks = np.array_split(vals, os.cpu_count())
with multiprocessing.Pool() as pool:
pool.map(partial(func,vals), chunksize=chunks)
doesn't provide an iterable to pool.map
, and passed chunks
(a list
of numpy
sub-arrays) as the chunksize
, which should be an int
.
The additional comments below assume it was actually implemented as:
def multiprocessor (func, vals):
chunks = np.array_split(vals, os.cpu_count())
with multiprocessing.Pool() as pool:
pool.map(func, chunks, chunksize=1)
or:
def multiprocessor (func, vals):
chunk_size = -(-len(vals) // os.cpu_count()) # Trick to get ceiling division out of floor division operator
with multiprocessing.Pool() as pool:
pool.map(func, vals, chunksize=chunk_size)
Having said that, the possible memory issue from Pool.map
storing all the results when they're clearly discarded can be ameliorated by using Pool.imap_unordered
instead, and just forcing the resulting iterator to run to completion efficiently. For example, you could replace pool.map(func, chunks, chunksize=1)
with consume(pool.imap_unordered(func, chunks))
and pool.map(func, vals, chunksize=chunk_size)
with consume(pool.imap_unordered(func, vals, chunksize=chunk_size))
(where consume
is the itertools
recipe of the same name).
In both cases, rather than allocating a list
for all the results, storing each result in it as the workers complete tasks (allocating more and more stuff you don't need), imap_unordered
produces each result as it's returned, and consume
immediately grabs each result and throws it away (memory must be allocated for each result, but it's immediately released, so the peak memory consumption for the process, and therefore the size the heap grows to, is kept minimal).