I'm starting to work with multiprocessing after a very long time, and I have a question regarding how can I parallelize the filling of a large numpy array from a class method, using a set of computing objects that are stored in the same class.
The simplified structure looks something like this:
import multiprocessing as mp
import numpy as np
from itertools import count
import time
class DummyCalculator:
_ids = count(0)
def __init__(self):
self._id = next(self._ids)
def compute(self, x: np.array) -> np.array
time.sleep(10)
return x self._id
class DummyKeeper:
def __init__(self):
self._calculators = []
def addCalculator(self, calculator: DummyCalculator) -> np.array:
self._calculators.append(calculator)
def myfunc(self, x: np.array) -> np.array:
# this is what I want to be handled by multiprocessing, one process for each DummyCalculator instance
out = np.array([c.compute(x) for c in self._calculators]).flatten()
if __name__ == '__main__':
keeper = DummyKeeper()
keeper.addCalculator(DummyCalculator())
keeper.addCalculator(DummyCalculator())
keeper.addCalculator(DummyCalculator())
keeper.addCalculator(DummyCalculator())
x = np.zeros(100000)
keeper.myfunc() #-> this should trigger the processes
Any help will be very much appreciated.
Regards.
CodePudding user response:
Well,
I turned out to be more simple than I expected. After a long loop of try-error I came up with this.
from pathos.multiprocessing import ProcessingPool as Pool
from pathos import multiprocessing as mp
from functools import partial
import numpy as np
from itertools import count
import time
class DummyCalculator:
_ids = count(0)
def __init__(self):
self._id = next(self._ids)
def compute(self, x: np.array) -> np.array:
time.sleep(10)
return x self._id
class DummyKeeper:
def __init__(self):
self._calculators = []
def addCalculator(self, calculator: DummyCalculator) -> np.array:
self._calculators.append(calculator)
def dummyCalc(self, tup: tuple) -> tuple:
i = tup[0]
x = np.array(tup[1])
c = self._calculators[i]
v = c.compute(x)
return (i, v)
def myfunc(self, x: np.array) -> np.array:
pool = mp.Pool(processes = 4)
pargs = [(i, x) for i in range(4)]
out = pool.map(self.dummyCalc, pargs)
# I need to preserve initial order
out.sort(key = lambda tup: tup[0])
out = np.array([o[1] for o in out]).flatten()
return out
if __name__ == '__main__':
keeper = DummyKeeper()
c0 = DummyCalculator()
c1 = DummyCalculator()
c2 = DummyCalculator()
c3 = DummyCalculator()
keeper.addCalculator(c0)
keeper.addCalculator(c1)
keeper.addCalculator(c2)
keeper.addCalculator(c3)
x = np.zeros(100000)
out = keeper.myfunc(x)
print(out)
I just changed multiprocessing to pathos for internal reasons.