from multiprocessing import Array
from ctypes import c_double
import numpy as np
from joblib import Parallel, delayed
def f(a):
for i in range(len(a)):
a[i] = -a[i]
print(a[i])
if __name__ == '__main__':
arr = Array(c_double,
range(10),
lock=False)
arr = np.frombuffer(arr)
arr = arr.reshape((len(arr), 1))
Parallel(n_jobs=2)(delayed(f)(arr) for j in range(1))
print(arr[:])
The expected value is negative, but the result is really the initial value. Can you help me? Thank you!
CodePudding user response:
Update on How to Share a numpy
Array
The simplest answer if using joblib
:
import numpy as np
from joblib import Parallel, delayed
def f(a):
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
# Create numpy array:
arr = np.array([float(i) for i in range(10)])
arr = arr.reshape((len(arr), 1))
print(arr)
print()
Parallel(n_jobs=2, require='sharedmem')(delayed(f)(arr) for j in range(1))
print(arr)
Prints:
[[0.]
[1.]
[2.]
[3.]
[4.]
[5.]
[6.]
[7.]
[8.]
[9.]]
[[-0.]
[-1.]
[-2.]
[-3.]
[-4.]
[-5.]
[-6.]
[-7.]
[-8.]
[-9.]]
A more explicit approach that requires Python 3.8 or above follows. This example used joblib
, which as we saw in the previous example has a simpler way of achieving this. But this shows how to create a the array in shared memory and pass it to a worker function. This technique could just as easily be used with a multiprocessing.Pool
implementation, for example. It's rather clumsy, however:
from multiprocessing import shared_memory
import numpy as np
from joblib import Parallel, delayed
def f(shm_name, shape, dtype):
# Create
shared_mem = shared_memory.SharedMemory(name=shm_name)
a = np.ndarray(shape, dtype=dtype, buffer=shared_mem.buf)
for i in range(len(a)):
a[i] = -a[i]
print(a[i])
if __name__ == '__main__':
# Create numpy array:
_arr = np.array([float(i) for i in range(10)])
shm = shared_memory.SharedMemory(create=True, size=_arr.nbytes)
# Save generated name:
shm_name = shm.name
# Create numpy array backed by shared memory:
arr = np.ndarray(_arr.shape, dtype=_arr.dtype, buffer=shm.buf)
# Copy the original data into shared memory
arr[:] = _arr[:]
print(arr)
Parallel(n_jobs=2)(delayed(f)(shm_name, arr.shape, arr.dtype) for j in range(1))
print()
print(arr)
# Free up shared memory that is no longer needed
shm.close()
shm.unlink()
Prints:
[0. 1. 2. 3. 4. 5. 6. 7. 8. 9.]
-0.0
-1.0
-2.0
-3.0
-4.0
-5.0
-6.0
-7.0
-8.0
-9.0
[-0. -1. -2. -3. -4. -5. -6. -7. -8. -9.]
If you are running Python 3.7 or lower, then you need to create your numpy
array backed by a shared multiprocessing.Array
, which cannot be passed as an argument to a worker function using joblib
and can only be passed as a global. This is not a problem if you are doing your multiprocessing using, for example, the multiprocessing.Pool
class with a pool initializer or a single Process
passing the array as an argument:
Using a multiprocessing.Pool
from multiprocessing import Array, Pool
from ctypes import c_double
import numpy as np
def pool_init(shared_array):
global arr
# Re-create a numpy array from the shared memory array:
arr = np.frombuffer(shared_array)
arr = arr.reshape((len(shared_array), 1))
def f():
# Negate each element:
for i in range(len(arr)):
arr[i] = -arr[i]
if __name__ == '__main__':
# Create shared memory array:
shared_array = Array(c_double,
range(10),
lock=False)
# Create a numpy array from shared memory array:
arr = np.frombuffer(shared_array)
arr = arr.reshape((len(shared_array), 1))
# print np array:
print(arr)
print()
# Pass the shared memory array and not the numpy array:
pool = Pool(2, initializer=pool_init, initargs=(shared_array,))
pool.apply(f)
print(arr)
Prints:
[[0.]
[1.]
[2.]
[3.]
[4.]
[5.]
[6.]
[7.]
[8.]
[9.]]
[[-0.]
[-1.]
[-2.]
[-3.]
[-4.]
[-5.]
[-6.]
[-7.]
[-8.]
[-9.]]
Using a multiprocessing.Process
from multiprocessing import Array, Process
from ctypes import c_double
import numpy as np
def f(shared_array):
# Re-create a numpy array from the shared memory array:
arr = np.frombuffer(shared_array)
arr = arr.reshape((len(shared_array), 1))
# Negate each element:
for i in range(len(arr)):
arr[i] = -arr[i]
if __name__ == '__main__':
# Create shared memory array:
shared_array = Array(c_double,
range(10),
lock=False)
# Create a numpy array from shared memory array:
arr = np.frombuffer(shared_array)
arr = arr.reshape((len(shared_array), 1))
# print np array:
print(arr)
print()
# Pass the shared memory array and not the numpy array:
p = Process(target=f, args=(shared_array,))
p.start()
p.join()
print(arr[:])
Prints:
[[0.]
[1.]
[2.]
[3.]
[4.]
[5.]
[6.]
[7.]
[8.]
[9.]]
[[-0.]
[-1.]
[-2.]
[-3.]
[-4.]
[-5.]
[-6.]
[-7.]
[-8.]
[-9.]]