I'm clearly doing something wrong with multiprocessing
but I'm not sure what -- I'd expect to see some speed up on this task, but the time spent running this test function in the forked process is 2 orders of magnitude more than the time it takes in the main process. This is a non-trivial task, so I don't think it's a case where the workload was too small to benefit from multiprocessing, as in this question and basically all the other SO questions about multiprocessing
. And I know there is overhead from starting new processes, but my function returns the time spent doing the actual computation, which I would think would take place after that forking overhead is done.
I've looked at a bunch of docs and examples, tried versions of this code with map
, map_async
, apply
, and apply_async
instead of imap_unordered
, but I get comparable results in all cases. I'm thoroughly mystified at this point... any help understanding what I'm doing wrong would be great, as would a revised code snippet that provides an example of how to obtain a performance boost by parallelizing this task. Thanks!
import time
t_start = time.time()
from multiprocessing import Pool
from numpy.random import rand, randint
import numpy.linalg as la
import numpy as np
def f(sp):
(M, i) = sp
t0 = time.time()
M = (M @ M.T) / 1000 np.eye(M.shape[0])
M_inv = la.inv(M)
t_elapsed = time.time() - t0
return i, M.shape[0], la.det(M_inv), t_elapsed
randmat = lambda m: rand(m, m)
N = 20
n_m = 1500
specs = list(zip([randmat(n_m) for _ in range(N)], range(N)))
t0 = time.time()
for result in [f(sp) for sp in specs]:
print(result)
print(f"\n--- serial time: {time.time()-t0}; total elapsed: {time.time()-t_start }\n")
t0 = time.time()
with Pool(processes=10) as pool:
multiple_results = pool.imap_unordered(f, specs)
for result in multiple_results:
print(result)
print(f"\n--- parallel time: {time.time()-t0}\n")
Output:
(0, 1500, 2.613708465497732e-76, 0.17858004570007324)
(1, 1500, 2.3314319199405457e-76, 0.18518280982971191)
(2, 1500, 2.4510533542449015e-76, 0.18424344062805176)
...(snip)...
(17, 1500, 2.0972534465354807e-76, 0.18465876579284668)
(18, 1500, 2.4890185099760677e-76, 0.18526124954223633)
(19, 1500, 3.0716539033944427e-76, 0.17455506324768066)
--- serial time: 5.365333557128906; total elapsed: 5.747828006744385
(0, 1500, 2.613708465497732e-76, 9.31627368927002)
(1, 1500, 2.3314319199405457e-76, 9.709473848342896)
(5, 1500, 2.6716145027956763e-76, 10.101540327072144)
...(snip)...
(19, 1500, 3.0716539033944427e-76, 10.48097825050354)
(18, 1500, 2.4890185099760677e-76, 10.82164478302002)
(17, 1500, 2.0972534465354807e-76, 10.97563886642456)
--- parallel time: 40.98197340965271
(System info: Mint 20.1, AMD Ryzen 5 2600 (6 core, 12 threads), Python 3.8)
UPDATE
I believe @Paul's answer below is probably correct. Looking into this more, I've come up with an even more pathological test case to address @Charles Duffy's concern about serialization being costly-- in the case below, I only send a tiny 100 element numpy vector to each process, and the inner time of each function call goes from ~0.03 secs to around 100 seconds!! More than three orders of magnitude worse! Bonkers! I can only imagine that there is some kind of disastrous contention over CPU access happening in the background that multiprocessing
is not able to deal with.
...But this also seems to be a problem specifically having to do with the interaction between multiprocessing
and numpy
because I tried out ray
, and I get the kind of performance boost I'd expect from parallelization.
new results tl;dr
- serial time: 4.73s
ray
time: 0.112smultiprocessing
time: 217.48s (!!!)
Code v2
import time
t_start = time.time()
import multiprocessing as mp
from numpy.random import randn
import numpy.linalg as la
import numpy as np
import ray
num_vecs = 20
vec_size = 100
inputs = [(randn(vec_size, 1), i, t_start) for i in range(num_vecs)]
def f(input):
(v, i, t_start) = input
t0 = time.time()
det_sum = 0
M = (v @ v.T) np.diag(v[:, 0])
for _ in range(50):
M = M @ (M.T)
M = M @ (la.inv(M np.eye(M.shape[0])) / 2)
det_sum = la.det(M)
t_inner = time.time() - t0
t_since_start = time.time() - t_start
return i, det_sum, t_inner, t_since_start
def print_result(r):
print(
f"id: {r[0]:2}, det_sum: {r[1]:.3e}, inner time: {r[2]:.4f}, time since start: {r[3]:.4f}"
)
t0 = time.time()
for result in [f(sp) for sp in inputs]:
print_result(result)
print(f"\n--- serial time: {time.time()-t0}; total elapsed: {time.time()-t_start }\n")
ray.init(num_cpus=10)
g = ray.remote(f)
t0 = time.time()
results = ray.get([g.remote(s) for s in inputs])
for result in results:
print(result)
print(f"\n--- parallel time: {time.time()-t0}\n")
t0 = time.time()
with mp.Pool(processes=10) as pool:
multiple_results = pool.imap_unordered(f, inputs)
for result in multiple_results:
print(result)
print(f"\n--- parallel time: {time.time()-t0}\n")
Output v2
id: 0, det_sum: 1.427e-133, inner time: 2.8998, time since start: 3.1620
id: 1, det_sum: 3.294e-118, inner time: 0.3816, time since start: 3.5436
id: 2, det_sum: 2.729e-114, inner time: 0.0569, time since start: 3.6005
...(snip)...
id: 17, det_sum: 2.372e-104, inner time: 0.0344, time since start: 4.8887
id: 18, det_sum: 3.523e-116, inner time: 0.0509, time since start: 4.9396
id: 19, det_sum: 9.242e-101, inner time: 0.0549, time since start: 4.9945
--- serial time: 4.734628677368164; total elapsed: 4.996868848800659
id: 0, det_sum: 1.427e-133, inner time: 0.0436, time since start: 6.1446
id: 1, det_sum: 3.294e-118, inner time: 0.0465, time since start: 6.1541
id: 2, det_sum: 2.729e-114, inner time: 0.0436, time since start: 6.1517
...(snip)...
id: 17, det_sum: 2.372e-104, inner time: 0.0438, time since start: 6.2027
id: 18, det_sum: 3.523e-116, inner time: 0.0394, time since start: 6.1995
id: 19, det_sum: 9.242e-101, inner time: 0.0413, time since start: 6.2032
--- parallel time: 0.1118767261505127
id: 0, det_sum: 1.427e-133, inner time: 101.0206, time since start: 107.2395
id: 2, det_sum: 2.729e-114, inner time: 102.6551, time since start: 108.8744
id: 5, det_sum: 2.063e-111, inner time: 104.2321, time since start: 110.4516
...(snip)...
id: 18, det_sum: 3.523e-116, inner time: 102.0273, time since start: 223.5556
id: 16, det_sum: 5.887e-99, inner time: 102.9106, time since start: 223.5907
id: 19, det_sum: 9.242e-101, inner time: 101.1289, time since start: 223.6742
--- parallel time: 217.47953820228577
CodePudding user response:
I believe it's likely that your numpy
is already taking advantage of your multicore architecture in the single process model. For example, from here:
But many architectures now have a BLAS that also takes advantage of a multicore machine. If your numpy/scipy is compiled using one of these, then dot() will be computed in parallel (if this is faster) without you doing anything. Similarly for other matrix operations, like inversion, singular value decomposition, determinant, and so on.
You can check for that:
>>> import numpy as np
>>> np.show_config()
But also, as a simple test, if you bump the size of your matrix and just run it directly do your see your multiple cores being used by numpy
? For example, watch top
when running:
>>> n_m = 20000
>>> M = np.random.rand(n_m, n_m)
>>> M = (M @ M.T) / 1000 np.eye(M.shape[0])
This is likely slow enough that you can see if it is already using multiple cores when just in one process.
As you can imagine, if it is already doing that then splitting it up into different processes purely adds overhead and therefore is slower.