Home > Software engineering >  Paralellize numpy array generation inside a class method with multiprocessing
Paralellize numpy array generation inside a class method with multiprocessing

Time:08-02

I'm starting to work with multiprocessing after a very long time, and I have a question regarding how can I parallelize the filling of a large numpy array from a class method, using a set of computing objects that are stored in the same class.

The simplified structure looks something like this:

import multiprocessing as mp
import numpy as np

from itertools import count 
import time

class DummyCalculator:
    _ids = count(0)
    def __init__(self):
        self._id = next(self._ids)

    def compute(self, x: np.array) -> np.array
        time.sleep(10)
        return x   self._id

class DummyKeeper:
    def __init__(self):
        self._calculators = []

    def addCalculator(self, calculator: DummyCalculator) -> np.array:
        self._calculators.append(calculator)

    def myfunc(self, x: np.array) -> np.array:
        # this is what I want to be handled by multiprocessing, one process for each DummyCalculator instance
        out = np.array([c.compute(x) for c in self._calculators]).flatten()


if __name__ == '__main__':

    keeper = DummyKeeper()

    keeper.addCalculator(DummyCalculator())
    keeper.addCalculator(DummyCalculator())
    keeper.addCalculator(DummyCalculator())
    keeper.addCalculator(DummyCalculator())

    x = np.zeros(100000)

    keeper.myfunc() #-> this should trigger the processes 

Any help will be very much appreciated.

Regards.

CodePudding user response:

Well,

I turned out to be more simple than I expected. After a long loop of try-error I came up with this.

from pathos.multiprocessing import ProcessingPool as Pool
from pathos import multiprocessing as mp
from functools import partial
import numpy as np

from itertools import count 
import time

class DummyCalculator:
    _ids = count(0)
    def __init__(self):
        self._id = next(self._ids)

    def compute(self, x: np.array) -> np.array:
        time.sleep(10)
        return x   self._id

class DummyKeeper:
    def __init__(self):
        self._calculators = []

    def addCalculator(self, calculator: DummyCalculator) -> np.array:
        self._calculators.append(calculator)

    def dummyCalc(self, tup: tuple) -> tuple:
        i = tup[0]
        x = np.array(tup[1])
        c = self._calculators[i]
        v = c.compute(x)
        return (i, v)

    def myfunc(self, x: np.array) -> np.array:
        pool =  mp.Pool(processes = 4)
        pargs = [(i, x) for i in range(4)]
        out = pool.map(self.dummyCalc, pargs)
        # I need to preserve initial order
        out.sort(key = lambda tup: tup[0])
        out = np.array([o[1] for o in out]).flatten()
        return out


if __name__ == '__main__':

    keeper = DummyKeeper()

    c0 = DummyCalculator()
    c1 = DummyCalculator()
    c2 = DummyCalculator()
    c3 = DummyCalculator()
    keeper.addCalculator(c0)
    keeper.addCalculator(c1)
    keeper.addCalculator(c2)
    keeper.addCalculator(c3)

    x = np.zeros(100000)

    out = keeper.myfunc(x)
    print(out)

I just changed multiprocessing to pathos for internal reasons.

  • Related