I have a dataframe df
that I would like to break into smaller chunks column wise so that I can apply a class method to each chunk. The method seems to work fine, but then after around 10 iterations I keep getting CustomizablePickler(buffer, self._reducers).dump(obj), AttributeError: Can't pickle local object 'delayed.<locals>.delayed_function'
, so I am not sure what's wrong.
What I have so far
from typing import Tuple
import pandas as pd
import numpy as np
class MyClass:
def __init__(self, df: pd.DataFrame):
self.df = df
...
def run(self) -> Tuple[np.ndarray, np.ndarray]:
<operate on self.df>
if __name__ == "__main__":
from joblib import Parallel, delayed
df = pd.read_csv("data.csv")
n_cpu = 6
chunk = df.shape[1] // n_cpu
# create chunk column indicies
lower_idx = np.array([0, chunk, 2 * chunk, 3 * chunk, 4 * chunk, 5 * chunk])
upper_idx = lower_idx chunk
upper_idx[-1] = df.shape[1]
res = Parallel(n_jobs=n_cpu, backend="multiprocessing")(
delayed(MyClass(df.iloc[:, i: j]).run()) for i, j in zip(lower_idx, upper_idx)
)
The dataframe df
is small and can easily fit in memory (only 54 Mb), but the operations in run()
take a long time, so I would like to parallelise these operations over a subset of columns of df
at a time.
Am I doing something wrong? Is there an easier to way to do this? I don't need to use the joblib
package, but I would like to leave MyClass
unchanged if possible.
CodePudding user response:
Use a proxy function to run your class instance:
import pandas as pd
import numpy as np
import multiprocessing as mp
from typing import Tuple
class MyClass:
def __init__(self, df: pd.DataFrame):
self.df = df
def run(self) -> Tuple[np.ndarray, np.ndarray]:
return (df.min(1).values, df.max(1).values)
def proxy(df):
c = MyClass(df)
return c.run()
if __name__ == '__main__':
np.random.seed(2022)
df = pd.DataFrame(np.random.randint(1, 1000, (10, 100)))
n_cpu = 6
chunks = np.array_split(df, n_cpu, axis=1)
with mp.Pool(n_cpu) as pool:
data = pool.map(proxy, chunks)
Output:
>>> data
[(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
(array([12, 15, 1, 4, 13, 3, 9, 2, 41, 16]),
array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995]))]
Update
Can you replace the with
block by:
p = mp.get_context('fork').Pool(8)
data = p.map(proxy, chunks)