I am trying to accomplish two things with apply_async
(https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult) call:
(i) Call a class method
(ii) Pass an object as param
I have the following baseline code so far:
import multiprocessing as mp
class myClass():
def __init__(self, id):
self.id = id
self.val = 1.0
self.pool = None
def callback(self, obj):
self.val = obj.val
def foo(new_val): # foo is outside myClass
print ('foo passed with', new_val)
c1.val = new_val
return c1
if __name__ == '__main__':
c1 = myClass('c1')
c1.pool = mp.Pool(processes=1)
c1.pool.apply_async(foo, args=(2.0, ), callback=c1.callback).wait()
c1.pool.close()
c1.pool.join()
print ('c1.val:', c1.val) # should display 'c1 val: 2.0'
Output:
foo passed with 2.0
c1.val: 2.0
When I try to accomplish (i) with the code below, I don't get the same output as above.
class myClass():
def __init__(self, id):
self.id = id
self.val = 1.0
self.pool = None
def callback(self, obj):
self.val = obj.val
def foo(self, new_val): # foo is inside myClass
print ('foo passed with', new_val)
self.val = new_val
return self
if __name__ == '__main__':
c1 = myClass('c1')
c1.pool = mp.Pool(processes=1)
c1.pool.apply_async(c1.foo, args=(2.0, ), callback=c1.callback).wait()
c1.pool.close()
c1.pool.join()
print ('c1.val:', c1.val) # should display 'c1 val: 2.0'
Output:
c1.val: 1.0
Similarly, when I try to accomplish (ii), foo
does not get called again.
class myClass():
def __init__(self, id):
self.id = id
self.val = 1.0
self.pool = None
def callback(self, obj):
self.val = obj.val
def foo(obj, new_val): # foo is outside myClass
print ('foo passed with', new_val)
obj.val = new_val
return obj
if __name__ == '__main__':
c1 = myClass('c1')
c1.pool = mp.Pool(processes=1)
c1.pool.apply_async(foo, args=(c1, 2.0, ), callback=c1.callback).wait()
c1.pool.close()
c1.pool.join()
print ('c1.val:', c1.val) # should display 'c1 val: 2.0'
Output:
c1.val: 1.0
Any idea what needs to be changed in the code above to accomplish (i) and (ii)?
CodePudding user response:
The call was not completed without raising an exception. You can check that with the multiprocessing.pool.AsyncResult.successful
method:
import multiprocessing as mp
class myClass():
def __init__(self, id):
self.id = id
self.val = 1.0
self.pool = None
def callback(self, obj):
self.val = obj.val
def foo(self, new_val):
print ('foo passed with', new_val)
self.val = new_val
return self
if __name__ == '__main__':
c1 = myClass('c1')
c1.pool = mp.Pool(processes=1)
async_result = c1.pool.apply_async(c1.foo, args=(2.0, ), callback=c1.callback)
async_result.wait()
print(async_result.successful()) # this is printing False!!!
c1.pool.close()
c1.pool.join()
print ('c1.val:', c1.val)
Now you can define an error_callback
to see what's going on:
...
async_result = c1.pool.apply_async(c1.foo, args=(2.0, ), callback=c1.callback, error_callback=lambda x: print(x))
...
This is the error being printed by this function:
pool objects cannot be passed between processes or pickled
On this SO question you can find more info on why this is happening. The problem is that the multiprocessing
code has to pickle the things that it sends to the sub-processes it has started, and the pickler doesn't do instance-methods.