Home > Back-end >  multiprocessing: instances unaffected when iterating over them
multiprocessing: instances unaffected when iterating over them

Time:11-17

I'm trying to use the multiprocessing module to run in parallel the same method over a list object instances.

The closest question that I've found is "apply-a-method-to-a-list-of-objects-in-parallel-using-multi-processing". However the solution given there seems to not work in my problem.

Here is an example of what I'm trying to achieve:

class Foo:
    
    def __init__(self):
        self.bar = None
        
    def put_bar(self):
        self.bar = 1.0


if __name__ == "__main__":
    
    instances = [Foo() for _ in range(100)]
    
    for instance in instances:
        instance.put_bar()
       
    # correctly prints 1.0
    print(instances[0].bar)

However, trying to parallelize this with the multiprocessing module, the variable bar gets unaffected:

import os
from multiprocessing import Pool


class Foo:

    def __init__(self):
        self.bar = None

    def put_bar(self):
        self.bar = 1.0


def worker(instance):
    return instance.put_bar()


if __name__ == "__main__":

    instances = [Foo() for _ in range(100)]

    with Pool(os.cpu_count()) as pool:
        pool.map(worker, (instance for instance in instances))

    # prints None
    print(instances[0].bar)

Any help on figuring it out where is the wrong step(s) is highly appreciated.

CodePudding user response:

You can create managed objects from your Foo class just like the multiprocessing.managers.SyncManager instance created with a call to multiptocessing.Manager() can create certain managed objects such as a list or dict. What is returned is a special proxy object that is shareable among processes. When method calls are made on such a proxy, the name of the method and its arguments are sent via a pipe or socket to a process created by the manager and the specified method is invoked on the actual object residing in the manager's address space. In effect, you are making something similar to a remote method call. This clearly is much slower than directly operating on the object but if you have to you have to. Your coded example, which just a bit too artificial, doesn't leave much alternatives.

Therefore, I will modify your example slightly so that Foo.put_bar takes an argument and your worker function worker will determine what value to pass to put_bar based on some calculation. In that way, the value to be used as the argument to Foo.put_bar is returned back to the main process, which does all the actual updating of the instances:

Example Without Using a Managed Object with a Special Proxy

import os
from multiprocessing import Pool


class Foo:

    def __init__(self):
        self.bar = None

    def put_bar(self, value):
        self.bar = value


def worker(instance):
    # Code to compute a result omitted.
    # We will for demo purposes always use 1.0:
    return 1.0


if __name__ == "__main__":

    instances = [Foo() for _ in range(100)]

    with Pool(os.cpu_count()) as pool:
        # (instance for instance in instances) instead of instances below
        # doesn't accomplish anything:
        for idx, result in enumerate(pool.map(worker, instances)):
            instances[idx].put_bar(result)

    # prints 1.0
    print(instances[0].bar)

Example Using a Managed Object

import os
from multiprocessing import Pool
from multiprocessing.managers import NamespaceProxy, BaseManager

class Foo:

    def __init__(self):
        self.bar = None

    def put_bar(self, value):
        self.bar = value


def worker(instance):
    # Code to compute a result omitted.
    # We will for demo purposes always use 1.0:
    return instance.put_bar(1.0)


# If we did not need to expose attributes such as bar, then we could
# let Python automatically generate a proxy that would expose just the
# methods. But here we do need to access directly the `bar` attribute.
# The alternative would be for Foo to define method get_bar that returns
# self.bar.
class FooProxy(NamespaceProxy):
    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__', 'put_bar', 'bar')

    def put_bar(self, value):
        return self._callmethod('put_bar', args=(value,))

class FooManager(BaseManager):
    pass

if __name__ == "__main__":

    FooManager.register('Foo', Foo, FooProxy)
    with FooManager() as manager:
        instances = [manager.Foo() for _ in range(100)]

        with Pool(os.cpu_count()) as pool:
            # (instance for instance in instances) instead of instances below
            # doesn't accomplish anything:
            pool.map(worker, instances)
        # We must do all access to the proxy while the manager process
        # is still running, i.e. before this block is exited:
        # prints 1.0
        print(instances[0].bar)

Example Using a Managed Object Without a Special Proxy

Here we do not need to access attributes directly on a managed object because we have defined method get_bar:

import os
from multiprocessing import Pool
from multiprocessing.managers import NamespaceProxy, BaseManager

class Foo:

    def __init__(self):
        self._bar = None

    def put_bar(self, value):
        self._bar = value

    def get_bar(self):
        return self._bar


def worker(instance):
    # Code to compute a result omitted.
    # We will for demo purposes always use 1.0:
    return instance.put_bar(1.0)

class FooManager(BaseManager):
    pass

if __name__ == "__main__":

    FooManager.register('Foo', Foo)
    with FooManager() as manager:
        instances = [manager.Foo() for _ in range(100)]

        with Pool(os.cpu_count()) as pool:
            # (instance for instance in instances) instead of instances below
            # doesn't accomplish anything:
            pool.map(worker, instances)
        # We must do all access to the proxy while the manager process
        # is still running, i.e. before this block is exited:
        # prints 1.0
        print(instances[0].get_bar())
  • Related