Home > Blockchain >  The expected results were not obtained (np.frombuffer)
The expected results were not obtained (np.frombuffer)

Time:10-27

from multiprocessing import Array
from ctypes import c_double
import numpy as np
from joblib import Parallel, delayed


def f(a):
    for i in range(len(a)):
        a[i] = -a[i]
        print(a[i])


if __name__ == '__main__':
    arr = Array(c_double,
                range(10),
                lock=False)
    arr = np.frombuffer(arr)
    arr = arr.reshape((len(arr), 1))

    Parallel(n_jobs=2)(delayed(f)(arr) for j in range(1))

    print(arr[:])

The expected value is negative, but the result is really the initial value. Can you help me? Thank you!

CodePudding user response:

Update on How to Share a numpy Array

The simplest answer if using joblib:

import numpy as np
from joblib import Parallel, delayed

def f(a):
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    # Create numpy array:
    arr = np.array([float(i) for i in range(10)])
    arr = arr.reshape((len(arr), 1))
    print(arr)
    print()
    Parallel(n_jobs=2, require='sharedmem')(delayed(f)(arr) for j in range(1))
    print(arr)

Prints:

[[0.]
 [1.]
 [2.]
 [3.]
 [4.]
 [5.]
 [6.]
 [7.]
 [8.]
 [9.]]

[[-0.]
 [-1.]
 [-2.]
 [-3.]
 [-4.]
 [-5.]
 [-6.]
 [-7.]
 [-8.]
 [-9.]]

A more explicit approach that requires Python 3.8 or above follows. This example used joblib, which as we saw in the previous example has a simpler way of achieving this. But this shows how to create a the array in shared memory and pass it to a worker function. This technique could just as easily be used with a multiprocessing.Pool implementation, for example. It's rather clumsy, however:

from multiprocessing import shared_memory
import numpy as np
from joblib import Parallel, delayed

def f(shm_name, shape, dtype):
    # Create
    shared_mem = shared_memory.SharedMemory(name=shm_name)
    a = np.ndarray(shape, dtype=dtype, buffer=shared_mem.buf)
    for i in range(len(a)):
        a[i] = -a[i]
        print(a[i])

if __name__ == '__main__':
    # Create numpy array:
    _arr = np.array([float(i) for i in range(10)])
    shm = shared_memory.SharedMemory(create=True, size=_arr.nbytes)
    # Save generated name:
    shm_name = shm.name
    # Create numpy array backed by shared memory:
    arr = np.ndarray(_arr.shape, dtype=_arr.dtype, buffer=shm.buf)
    # Copy the original data into shared memory
    arr[:] = _arr[:]
    print(arr)
    Parallel(n_jobs=2)(delayed(f)(shm_name, arr.shape, arr.dtype) for j in range(1))
    print()
    print(arr)
    # Free up shared memory that is no longer needed
    shm.close()
    shm.unlink()

Prints:

[0. 1. 2. 3. 4. 5. 6. 7. 8. 9.]
-0.0
-1.0
-2.0
-3.0
-4.0
-5.0
-6.0
-7.0
-8.0
-9.0

[-0. -1. -2. -3. -4. -5. -6. -7. -8. -9.]

If you are running Python 3.7 or lower, then you need to create your numpy array backed by a shared multiprocessing.Array, which cannot be passed as an argument to a worker function using joblib and can only be passed as a global. This is not a problem if you are doing your multiprocessing using, for example, the multiprocessing.Pool class with a pool initializer or a single Process passing the array as an argument:

Using a multiprocessing.Pool

from multiprocessing import Array, Pool
from ctypes import c_double
import numpy as np

def pool_init(shared_array):
    global arr
    # Re-create a numpy array from the shared memory array:
    arr = np.frombuffer(shared_array)
    arr = arr.reshape((len(shared_array), 1))

def f():
    # Negate each element:
    for i in range(len(arr)):
        arr[i] = -arr[i]

if __name__ == '__main__':
    # Create shared memory array:
    shared_array = Array(c_double,
                         range(10),
                         lock=False)
    # Create a numpy array from shared memory array:
    arr = np.frombuffer(shared_array)
    arr = arr.reshape((len(shared_array), 1))
    # print np array:
    print(arr)

    print()

    # Pass the shared memory array and not the numpy array:
    pool = Pool(2, initializer=pool_init, initargs=(shared_array,))
    pool.apply(f)

    print(arr)

Prints:

[[0.]
 [1.]
 [2.]
 [3.]
 [4.]
 [5.]
 [6.]
 [7.]
 [8.]
 [9.]]

[[-0.]
 [-1.]
 [-2.]
 [-3.]
 [-4.]
 [-5.]
 [-6.]
 [-7.]
 [-8.]
 [-9.]]

Using a multiprocessing.Process

from multiprocessing import Array, Process
from ctypes import c_double
import numpy as np


def f(shared_array):
    # Re-create a numpy array from the shared memory array:
    arr = np.frombuffer(shared_array)
    arr = arr.reshape((len(shared_array), 1))

    # Negate each element:
    for i in range(len(arr)):
        arr[i] = -arr[i]

if __name__ == '__main__':
    # Create shared memory array:
    shared_array = Array(c_double,
                         range(10),
                         lock=False)
    # Create a numpy array from shared memory array:
    arr = np.frombuffer(shared_array)
    arr = arr.reshape((len(shared_array), 1))
    # print np array:
    print(arr)

    print()

    # Pass the shared memory array and not the numpy array:
    p = Process(target=f, args=(shared_array,))
    p.start()
    p.join()

    print(arr[:])

Prints:

[[0.]
 [1.]
 [2.]
 [3.]
 [4.]
 [5.]
 [6.]
 [7.]
 [8.]
 [9.]]

[[-0.]
 [-1.]
 [-2.]
 [-3.]
 [-4.]
 [-5.]
 [-6.]
 [-7.]
 [-8.]
 [-9.]]
  • Related