Home > OS >  Why is this multiprocessing code slower than serial? How would you do this better?
Why is this multiprocessing code slower than serial? How would you do this better?

Time:09-24

I'm clearly doing something wrong with multiprocessing but I'm not sure what -- I'd expect to see some speed up on this task, but the time spent running this test function in the forked process is 2 orders of magnitude more than the time it takes in the main process. This is a non-trivial task, so I don't think it's a case where the workload was too small to benefit from multiprocessing, as in this question and basically all the other SO questions about multiprocessing. And I know there is overhead from starting new processes, but my function returns the time spent doing the actual computation, which I would think would take place after that forking overhead is done.

I've looked at a bunch of docs and examples, tried versions of this code with map, map_async, apply, and apply_async instead of imap_unordered, but I get comparable results in all cases. I'm thoroughly mystified at this point... any help understanding what I'm doing wrong would be great, as would a revised code snippet that provides an example of how to obtain a performance boost by parallelizing this task. Thanks!

import time

t_start = time.time()
from multiprocessing import Pool
from numpy.random import rand, randint
import numpy.linalg as la
import numpy as np


def f(sp):
    (M, i) = sp
    t0 = time.time()
    M = (M @ M.T) / 1000   np.eye(M.shape[0])
    M_inv = la.inv(M)
    t_elapsed = time.time() - t0
    return i, M.shape[0], la.det(M_inv), t_elapsed


randmat = lambda m: rand(m, m)

N = 20
n_m = 1500
specs = list(zip([randmat(n_m) for _ in range(N)], range(N)))

t0 = time.time()
for result in [f(sp) for sp in specs]:
    print(result)

print(f"\n--- serial time: {time.time()-t0}; total elapsed: {time.time()-t_start }\n")

t0 = time.time()
with Pool(processes=10) as pool:
    multiple_results = pool.imap_unordered(f, specs)
    for result in multiple_results:
        print(result)

print(f"\n--- parallel time: {time.time()-t0}\n")

Output:

(0, 1500, 2.613708465497732e-76, 0.17858004570007324)
(1, 1500, 2.3314319199405457e-76, 0.18518280982971191)
(2, 1500, 2.4510533542449015e-76, 0.18424344062805176)
...(snip)...
(17, 1500, 2.0972534465354807e-76, 0.18465876579284668)
(18, 1500, 2.4890185099760677e-76, 0.18526124954223633)
(19, 1500, 3.0716539033944427e-76, 0.17455506324768066)

--- serial time: 5.365333557128906; total elapsed: 5.747828006744385

(0, 1500, 2.613708465497732e-76, 9.31627368927002)
(1, 1500, 2.3314319199405457e-76, 9.709473848342896)
(5, 1500, 2.6716145027956763e-76, 10.101540327072144)
...(snip)...
(19, 1500, 3.0716539033944427e-76, 10.48097825050354)
(18, 1500, 2.4890185099760677e-76, 10.82164478302002)
(17, 1500, 2.0972534465354807e-76, 10.97563886642456)

--- parallel time: 40.98197340965271

(System info: Mint 20.1, AMD Ryzen 5 2600 (6 core, 12 threads), Python 3.8)

UPDATE

I believe @Paul's answer below is probably correct. Looking into this more, I've come up with an even more pathological test case to address @Charles Duffy's concern about serialization being costly-- in the case below, I only send a tiny 100 element numpy vector to each process, and the inner time of each function call goes from ~0.03 secs to around 100 seconds!! More than three orders of magnitude worse! Bonkers! I can only imagine that there is some kind of disastrous contention over CPU access happening in the background that multiprocessing is not able to deal with.

...But this also seems to be a problem specifically having to do with the interaction between multiprocessing and numpy because I tried out ray, and I get the kind of performance boost I'd expect from parallelization.

new results tl;dr

  • serial time: 4.73s
  • ray time: 0.112s
  • multiprocessing time: 217.48s (!!!)

Code v2

import time

t_start = time.time()

import multiprocessing as mp
from numpy.random import randn
import numpy.linalg as la
import numpy as np
import ray


num_vecs = 20
vec_size = 100
inputs = [(randn(vec_size, 1), i, t_start) for i in range(num_vecs)]


def f(input):
    (v, i, t_start) = input
    t0 = time.time()
    det_sum = 0
    M = (v @ v.T)   np.diag(v[:, 0])
    for _ in range(50):
        M = M @ (M.T)
        M = M @ (la.inv(M   np.eye(M.shape[0])) / 2)
        det_sum  = la.det(M)
    t_inner = time.time() - t0
    t_since_start = time.time() - t_start
    return i, det_sum, t_inner, t_since_start


def print_result(r):
    print(
        f"id: {r[0]:2}, det_sum: {r[1]:.3e}, inner time: {r[2]:.4f}, time since start: {r[3]:.4f}"
    )


t0 = time.time()
for result in [f(sp) for sp in inputs]:
    print_result(result)
print(f"\n--- serial time: {time.time()-t0}; total elapsed: {time.time()-t_start }\n")

ray.init(num_cpus=10)
g = ray.remote(f)
t0 = time.time()
results = ray.get([g.remote(s) for s in inputs])
for result in results:
    print(result)
print(f"\n--- parallel time: {time.time()-t0}\n")

t0 = time.time()
with mp.Pool(processes=10) as pool:
    multiple_results = pool.imap_unordered(f, inputs)
    for result in multiple_results:
        print(result)
print(f"\n--- parallel time: {time.time()-t0}\n")

Output v2

id:  0, det_sum: 1.427e-133, inner time: 2.8998, time since start: 3.1620
id:  1, det_sum: 3.294e-118, inner time: 0.3816, time since start: 3.5436
id:  2, det_sum: 2.729e-114, inner time: 0.0569, time since start: 3.6005
...(snip)...
id: 17, det_sum: 2.372e-104, inner time: 0.0344, time since start: 4.8887
id: 18, det_sum: 3.523e-116, inner time: 0.0509, time since start: 4.9396
id: 19, det_sum: 9.242e-101, inner time: 0.0549, time since start: 4.9945

--- serial time: 4.734628677368164; total elapsed: 4.996868848800659

id:  0, det_sum: 1.427e-133, inner time: 0.0436, time since start: 6.1446
id:  1, det_sum: 3.294e-118, inner time: 0.0465, time since start: 6.1541
id:  2, det_sum: 2.729e-114, inner time: 0.0436, time since start: 6.1517
...(snip)...
id: 17, det_sum: 2.372e-104, inner time: 0.0438, time since start: 6.2027
id: 18, det_sum: 3.523e-116, inner time: 0.0394, time since start: 6.1995
id: 19, det_sum: 9.242e-101, inner time: 0.0413, time since start: 6.2032

--- parallel time: 0.1118767261505127

id:  0, det_sum: 1.427e-133, inner time: 101.0206, time since start: 107.2395
id:  2, det_sum: 2.729e-114, inner time: 102.6551, time since start: 108.8744
id:  5, det_sum: 2.063e-111, inner time: 104.2321, time since start: 110.4516
...(snip)...
id: 18, det_sum: 3.523e-116, inner time: 102.0273, time since start: 223.5556
id: 16, det_sum: 5.887e-99, inner time: 102.9106, time since start: 223.5907
id: 19, det_sum: 9.242e-101, inner time: 101.1289, time since start: 223.6742

--- parallel time: 217.47953820228577

CodePudding user response:

I believe it's likely that your numpy is already taking advantage of your multicore architecture in the single process model. For example, from here:

But many architectures now have a BLAS that also takes advantage of a multicore machine. If your numpy/scipy is compiled using one of these, then dot() will be computed in parallel (if this is faster) without you doing anything. Similarly for other matrix operations, like inversion, singular value decomposition, determinant, and so on.

You can check for that:

>>> import numpy as np
>>> np.show_config()

But also, as a simple test, if you bump the size of your matrix and just run it directly do your see your multiple cores being used by numpy? For example, watch top when running:

>>> n_m = 20000
>>> M = np.random.rand(n_m, n_m)
>>> M = (M @ M.T) / 1000   np.eye(M.shape[0])

This is likely slow enough that you can see if it is already using multiple cores when just in one process.

As you can imagine, if it is already doing that then splitting it up into different processes purely adds overhead and therefore is slower.

  • Related