I'm trying to use the multiprocessing module to run in parallel the same method over a list object instances.
The closest question that I've found is "apply-a-method-to-a-list-of-objects-in-parallel-using-multi-processing". However the solution given there seems to not work in my problem.
Here is an example of what I'm trying to achieve:
class Foo:
def __init__(self):
self.bar = None
def put_bar(self):
self.bar = 1.0
if __name__ == "__main__":
instances = [Foo() for _ in range(100)]
for instance in instances:
instance.put_bar()
# correctly prints 1.0
print(instances[0].bar)
However, trying to parallelize this with the multiprocessing module, the variable bar
gets unaffected:
import os
from multiprocessing import Pool
class Foo:
def __init__(self):
self.bar = None
def put_bar(self):
self.bar = 1.0
def worker(instance):
return instance.put_bar()
if __name__ == "__main__":
instances = [Foo() for _ in range(100)]
with Pool(os.cpu_count()) as pool:
pool.map(worker, (instance for instance in instances))
# prints None
print(instances[0].bar)
Any help on figuring it out where is the wrong step(s) is highly appreciated.
CodePudding user response:
You can create managed objects from your Foo
class just like the multiprocessing.managers.SyncManager
instance created with a call to multiptocessing.Manager()
can create certain managed objects such as a list
or dict
. What is returned is a special proxy object that is shareable among processes. When method calls are made on such a proxy, the name of the method and its arguments are sent via a pipe or socket to a process created by the manager and the specified method is invoked on the actual object residing in the manager's address space. In effect, you are making something similar to a remote method call. This clearly is much slower than directly operating on the object but if you have to you have to. Your coded example, which just a bit too artificial, doesn't leave much alternatives.
Therefore, I will modify your example slightly so that Foo.put_bar
takes an argument and your worker function worker
will determine what value to pass to put_bar
based on some calculation. In that way, the value to be used as the argument to Foo.put_bar
is returned back to the main process, which does all the actual updating of the instances:
Example Without Using a Managed Object with a Special Proxy
import os
from multiprocessing import Pool
class Foo:
def __init__(self):
self.bar = None
def put_bar(self, value):
self.bar = value
def worker(instance):
# Code to compute a result omitted.
# We will for demo purposes always use 1.0:
return 1.0
if __name__ == "__main__":
instances = [Foo() for _ in range(100)]
with Pool(os.cpu_count()) as pool:
# (instance for instance in instances) instead of instances below
# doesn't accomplish anything:
for idx, result in enumerate(pool.map(worker, instances)):
instances[idx].put_bar(result)
# prints 1.0
print(instances[0].bar)
Example Using a Managed Object
import os
from multiprocessing import Pool
from multiprocessing.managers import NamespaceProxy, BaseManager
class Foo:
def __init__(self):
self.bar = None
def put_bar(self, value):
self.bar = value
def worker(instance):
# Code to compute a result omitted.
# We will for demo purposes always use 1.0:
return instance.put_bar(1.0)
# If we did not need to expose attributes such as bar, then we could
# let Python automatically generate a proxy that would expose just the
# methods. But here we do need to access directly the `bar` attribute.
# The alternative would be for Foo to define method get_bar that returns
# self.bar.
class FooProxy(NamespaceProxy):
_exposed_ = ('__getattribute__', '__setattr__', '__delattr__', 'put_bar', 'bar')
def put_bar(self, value):
return self._callmethod('put_bar', args=(value,))
class FooManager(BaseManager):
pass
if __name__ == "__main__":
FooManager.register('Foo', Foo, FooProxy)
with FooManager() as manager:
instances = [manager.Foo() for _ in range(100)]
with Pool(os.cpu_count()) as pool:
# (instance for instance in instances) instead of instances below
# doesn't accomplish anything:
pool.map(worker, instances)
# We must do all access to the proxy while the manager process
# is still running, i.e. before this block is exited:
# prints 1.0
print(instances[0].bar)
Example Using a Managed Object Without a Special Proxy
Here we do not need to access attributes directly on a managed object because we have defined method get_bar
:
import os
from multiprocessing import Pool
from multiprocessing.managers import NamespaceProxy, BaseManager
class Foo:
def __init__(self):
self._bar = None
def put_bar(self, value):
self._bar = value
def get_bar(self):
return self._bar
def worker(instance):
# Code to compute a result omitted.
# We will for demo purposes always use 1.0:
return instance.put_bar(1.0)
class FooManager(BaseManager):
pass
if __name__ == "__main__":
FooManager.register('Foo', Foo)
with FooManager() as manager:
instances = [manager.Foo() for _ in range(100)]
with Pool(os.cpu_count()) as pool:
# (instance for instance in instances) instead of instances below
# doesn't accomplish anything:
pool.map(worker, instances)
# We must do all access to the proxy while the manager process
# is still running, i.e. before this block is exited:
# prints 1.0
print(instances[0].get_bar())