Home > Blockchain >  How to apply a class method to chunks of a pandas dataframe in parallel?
How to apply a class method to chunks of a pandas dataframe in parallel?

Time:02-16

I have a dataframe df that I would like to break into smaller chunks column wise so that I can apply a class method to each chunk. The method seems to work fine, but then after around 10 iterations I keep getting CustomizablePickler(buffer, self._reducers).dump(obj), AttributeError: Can't pickle local object 'delayed.<locals>.delayed_function', so I am not sure what's wrong.

What I have so far

from typing import Tuple

import pandas as pd
import numpy as np


class MyClass:
    def __init__(self, df: pd.DataFrame):
        self.df = df
    ...
    def run(self) -> Tuple[np.ndarray, np.ndarray]:
        <operate on self.df>


if __name__ == "__main__":
    from joblib import Parallel, delayed
    df = pd.read_csv("data.csv")

    n_cpu = 6
    chunk = df.shape[1] // n_cpu
    # create chunk column indicies
    lower_idx = np.array([0, chunk, 2 * chunk, 3 * chunk, 4 * chunk, 5 * chunk])
    upper_idx = lower_idx   chunk
    upper_idx[-1] = df.shape[1]
    res = Parallel(n_jobs=n_cpu, backend="multiprocessing")(
        delayed(MyClass(df.iloc[:, i: j]).run()) for i, j in zip(lower_idx, upper_idx)
    )

The dataframe df is small and can easily fit in memory (only 54 Mb), but the operations in run() take a long time, so I would like to parallelise these operations over a subset of columns of df at a time.

Am I doing something wrong? Is there an easier to way to do this? I don't need to use the joblib package, but I would like to leave MyClass unchanged if possible.

CodePudding user response:

Use a proxy function to run your class instance:

import pandas as pd
import numpy as np
import multiprocessing as mp
from typing import Tuple

class MyClass:
    def __init__(self, df: pd.DataFrame):
        self.df = df

    def run(self) -> Tuple[np.ndarray, np.ndarray]:
        return (df.min(1).values, df.max(1).values)

def proxy(df):
    c = MyClass(df)
    return c.run()

if __name__ == '__main__':
    np.random.seed(2022)
    df = pd.DataFrame(np.random.randint(1, 1000, (10, 100)))

    n_cpu = 6
    chunks = np.array_split(df, n_cpu, axis=1)

    with mp.Pool(n_cpu) as pool:
        data = pool.map(proxy, chunks)

Output:

>>> data
[(array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
  array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
 (array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
  array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
 (array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
  array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
 (array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
  array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
 (array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
  array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
 (array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
  array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995]))]

Update

Can you replace the with block by:

    p = mp.get_context('fork').Pool(8)
    data = p.map(proxy, chunks)
  • Related