Home > Software engineering >  Using Multiprocessing to speed up program that performs row-wise operations on a fairly large databa
Using Multiprocessing to speed up program that performs row-wise operations on a fairly large databa

Time:07-01

The logic of my program is very simple, there's a fairly large (12000x5000, will be 12000x50000 in the future) database (currently CSV) and a single 12000x1 row, and it calculates the correlation (there's some more logic in the function to speed things up a bit, but that's the gist of it) between the row and each of the 12000 rows in the database.

I want to use multiprocessing to speed the program up, but doing it the regular way

pool.apply_async(func, args=(single_row,df.loc[i].astype('float64'),)) for i in df.index]

actually resulted in a 30% slowdown, which I assume happened due to the overhead created by passing two 12000-long rows/arrays to a function 5000 times.

I could make the df and the row global variables, but I'm working on Windows so from what I understood, this will make every single process I spawn create the df from scratch, which is definitely not going to speed things up.

I'm completely new to mp, so before everything else, I tried the seemingly obvious thing - nesting the correlation func inside the larger function so that it has access to the df. Then the call ended up being just

pool.apply_async(func, args=(i,)) for i in df.index]

and the calculations sped up by almost 3x, but then I ran into a pickling issue when getting the results, and after some googling it seems like nested functions are a no-go in mp.

So my question is, is there any way to have all processes share the large df, or maybe another solution to speed up this specific problem using multiprocessing? Maybe some way around the pickling issue with nested functions? This being a really simple problem and the 3x speed-up makes me sure that it's possible, but I can't think of a way to get the results without passing large arguments to the function every time.

I also tried some things using the shared_memory module, but so far to no avail.

I have also tried using multiprocessing.mgr's Namespace

mgr = Manager()
ns = mgr.Namespace()
ns.main_df = main_df

but this seems to create a copy in every process and drastically slows the program down.

CodePudding user response:

You can use shared memory so to avoid the slow inter-process communication. You can find a pretty good example in the post Use numpy array in shared memory for multiprocessing.

Alternatively, if func only makes use of Numpy computationally intensive function on native types, then you can use multiples threads because Numpy release the global interpreter lock (GIL) for many function (the GIL is what makes multithreading nearly useless for CPython computationally-intensive codes). Multithreading codes do not suffer from the inter-process communication overhead. Alternatively, if func is relatively simple and only use Numpy, you can even try to use Numba or Cython so to make it faster and even disable the GIL at a bigger granularity (causing a better scalability). Numba/Cython are also good to make parallel code scale better by removing the need to create (many/huge) temporary arrays. Indeed, typical Numpy codes creates a lot of temporary arrays that are expensive to fill and that does not scale because the RAM throughput is saturated with only few cores. Not to mention temporary array also require more memory (and a 12000x50000 float64-based array already takes 4.5 GiB or RAM). Optimized correlations are generally quite memory bound (especially when they are computed in parallel) so this is an important factor to consider when writing parallel codes.

CodePudding user response:

There seems to be confusion here as to what Managers do. As per your comments, it is likely that you are trying to do the following, in this order:

  1. Creating a namespace in the main process, and adding your dataframe to it
  2. Starting a worker process, with the namespace as an argument
  3. Retrieving the dataframe from the namespace, and assigning a variable to it inside the worker

If this is what you are doing, then the dataframe will be copied per process, and there will be extra time wasted to transfer it from one process to another.

Why does this happen

Managers are not made to create shared memory in the way you envision it, rather they could be more aptly described as being used to create synchronized objects. What actually happens when you start a manager, is that a it spawns another process, called the server process. Any objects made inside the manager are then actually stored in that server process, accessible through proxies.

In the case of Namespace, which is a near empty class, the proxy used is the undocumented NamespaceProxy. This proxy simply allows different processes to access and edit the namespace of an object. The way it does this is by communicating with the server process on your behalf, sending the name of the method that must be run and any arguments the proxy used. The method, with the arguments, are then run on the actual object stored in the server process and the return value is sent back which the proxy forwards to the worker. So when you run something like ns.main_frame = main_frame, what actually happens is this:

  1. Proxy receives the requested call to set an attribute of the proxied object
  2. Proxy sends the method name __setattr__ with the arguments ("main_frame", main_frame) to the server process
  3. Server receives the instructions and runs the method with the arguments on the proxied object stored
  4. Server returns the value of the method that was just run back to the proxy
  5. Proxy forwards the return value back to you

So since the the dunder __setattr__ does not return anything, there is not much overhead. However, when you access the value of main_df from the child process, the proxy tells the server to execute the dunder method __getattr__ instead. Since this method does have a return value, you are essentially passing the value of the attribute requested from the server to the proxy, and back to you. Hence, when you run main_frame = ns.main_frame, you are not requesting some handle to the dataframe in a shared memory, but rather the entire dataframe instead!

Add to this the fact that the only way to communicate with the server is to pickle and unpickle the messages and return values sent, and you can see why your implementation is slow and memory-expensive.

Running this code should make it all much clearer:

from multiprocessing import Process, Manager
from pandas import DataFrame


class MyDataFrame(DataFrame):
    def __getstate__(self):
        print(f'dataframe being pickled in pid {os.getpid()}')
        return super().__getstate__()

    def __setstate__(self, state):
        print(f'dataframe being unpickled in pid {os.getpid()}')
        print()
        return super().__setstate__(state)


def worker(ns):
    print(f'Attempting to get dataframe in worker pid {os.getpid()} ')
    main_df = ns.main_df


if __name__ == "__main__":

    mgr = Manager()
    ns = mgr.Namespace()

    print(f'creating namespace for dataframe in pid {os.getpid()}')
    ns.main_df = MyDataFrame(df_dic)


    p = Process(target=worker, args=(ns, ))
    p.start()
    p.join()

Over here, MyDataFrame is simply a subclass of pandas.DataFrame with one difference: it lets you know when it's being pickled/unpickled. The rest of the code attempts to do duplicate a possible implementation of yours.

Output

creating namespace for dataframe in pid 69100
being pickled in pid 69100
being unpickled in pid 124720

Attempting to get dataframe in worker pid 72948 
being pickled in pid 124720
being unpickled in pid 72948

Over here, the main process's pid is 69100 (since we created the namespace in the main process) and the server process's pid is 124720 (since that's where the dataframe was unpickled the first time we created a manager). Can you now see how the entire dataframe was pickled and then unpickled from the server process to the worker process when attempting to access it?

So what's the solution?

You need to use a different proxy. Remember when I said that server processes only send back the return value? Well if you try to access the dataframe from NamespaceProxy, the return value will always be the entire dataframe, no working around it. What you can do instead, is create a proxy of the entire DataFrame class imported from pandas, rather than the Namespace class offered by multiprocessing. That way, the object stored in the server process will be the instance of DataFrame itself, and when you try to access it's attributes, the proxy will return the value of those attributes rather than the object itself.

However, we would need a different sort of proxy for this to work since NamespaceProxy only shares the namespace of the proxied object. We would like access to it's methods as well, no? To achieve this, we create our own proxy subclassing NamespaceProxy, and override the dunder __getattr__ to work for method calls as well (based on this, with the difference being it's picklable). We can now just start our manager with our proxy:

from multiprocessing import Process
from multiprocessing.managers import NamespaceProxy, BaseManager
from pandas import DataFrame
import types
import os


class MyDataFrame(DataFrame):
    def __getstate__(self):
        print(f'dataframe being pickled in pid {os.getpid()}')
        return super().__getstate__()

    def __setstate__(self, state):
        print(f'dataframe being unpickled in pid {os.getpid()}')
        print()
        return super().__setstate__(state)


class A:
    def __init__(self, name, method):
        self.name = name
        self.method = method

    def get(self, *args, **kwargs):
        return self.method(self.name, args, kwargs)


class ObjProxy(NamespaceProxy):
    """Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
    functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
    pickable and can its state can be shared among different processes. """

    def __getattr__(self, name):
        result = super().__getattr__(name)
        if isinstance(result, types.MethodType):
            return A(name, self._callmethod).get
        return result

def worker(df):
    print()
    print(f'Attempting to get dataframe in worker pid {os.getpid()} ')
    print(df.index)


if __name__ == "__main__":
    df_dic = {"product_id": ['1', '2', '3'],
              "product_url": ['me', 'moo', 'maa'],
              "cost": [1, 2, 3]}
    print(f'creating manager for dataframe in pid {os.getpid()}')

    BaseManager.register('DataFrame', MyDataFrame, ObjProxy, exposed=tuple(dir(ObjProxy)))
    manager = BaseManager()
    manager.start()

    df = manager.DataFrame(df_dic)
    p = Process(target=worker, args=(df, ))
    p.start()
    p.join()

Output

creating manager for dataframe in pid 45320

Attempting to get dataframe in worker pid 83856 
RangeIndex(start=0, stop=3, step=1)

As you can see, our dataframe wasn't even pickled once! This is because when we were creating our dataframe, only the df_dict was pickled, the dataframe itself was made in the server process.

Is there a problem?

There is, infact, one problem: our proxy class does not copy the dunder methods of the original DataFrame class. This means that, for example, trying to subscript the instance like below would result into an error:

# TypeError: 'ObjProxy' object is not subscriptable
print(df[["product_id", "product_url"]])

This is because our ObjProxy class did not copy the DataFrame class's __getitem__ method, and changing ObjProxy.__getattr__ does not intercept calls to these special dunder methods. Obviously, you can manually add all these dunder methods to ObjProxy yourself, but that's pretty tedious. Instead (using this answer), we can find all attributes of DataFrame (through populate_obj_attributes in the below code) and create our proxy class dynamically:

from multiprocessing import Process
from multiprocessing.managers import NamespaceProxy, BaseManager
from pandas import DataFrame
import inspect
import os


class MyDataFrame(DataFrame):
    def __getstate__(self):
        print(f'dataframe being pickled in pid {os.getpid()}')
        return super().__getstate__()

    def __setstate__(self, state):
        print(f'dataframe being unpickled in pid {os.getpid()}')
        print()
        return super().__setstate__(state)


class ObjProxy(NamespaceProxy):
    """Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
    functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
    pickable and can its state can be shared among different processes. """

    @classmethod
    def populate_obj_attributes(cls, real_cls):
        DISALLOWED = set(dir(cls))
        ALLOWED = ['__sizeof__', '__eq__', '__ne__', '__le__', '__repr__', '__dict__', '__lt__',
                   '__gt__']
        DISALLOWED.add('__class__')
        new_dict = {}
        for (attr, value) in inspect.getmembers(real_cls, callable):
            if attr not in DISALLOWED or attr in ALLOWED:
                new_dict[attr] = proxy_wrap(attr)
        return new_dict


def proxy_wrap(attr):
    """ This method creates function that calls the proxified object's method."""
    def f(self, *args, **kwargs):

        # _callmethod is the method that proxies provided by multiprocessing use to call methods in the proxified object
        return self._callmethod(attr, args, kwargs)

    return f


def worker(df):
    print(f'Attempting to get dataframe in worker pid {os.getpid()} ')
    print(df[["product_id", "product_url"]])


# Create a class during runtime
new_dict = ObjProxy.populate_obj_attributes(MyDataFrame)
DataFrameObjProxy = type("DataFrameObjProxy", (ObjProxy,), new_dict)

if __name__ == "__main__":
    df_dic = {"product_id": ['1', '2', '3'],
              "product_url": ['me', 'moo', 'maa'],
              "cost": [1, 2, 3]}
    print(f'creating namespace for dataframe in pid {os.getpid()}')

    BaseManager.register('DataFrame', MyDataFrame, DataFrameObjProxy, exposed=tuple(dir(DataFrameObjProxy)))
    manager = BaseManager()
    manager.start()

    df = manager.DataFrame(df_dic)
    p = Process(target=worker, args=(df, ))
    p.start()
    p.join() 

The reason we created our DataFrameObjProxy dynamically like that, rather than using something like setattr, is because when we pass it to our manager as our proxy, only the class and module name gets pickled, not the class attributes! So adding attributes to the class in our main process will not change the proxy used with the manager.

Output

creating namespace for dataframe in pid 105180
Attempting to get dataframe in worker pid 84696 

  product_id product_url
0          1          me
1          2         moo
2          3         maa

Now you can use the proxy more or less like an actual DataFrame object, without actually passing the dataframe around to the child processes!

  • Related