Home > Software design >  Python multiprocessing: calling methods and passing objects in asynchronous calls
Python multiprocessing: calling methods and passing objects in asynchronous calls

Time:11-15

I am trying to accomplish two things with apply_async (https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult) call:

(i) Call a class method

(ii) Pass an object as param

I have the following baseline code so far:

import multiprocessing as mp

class myClass():
  def __init__(self, id):
    self.id = id
    self.val = 1.0
    self.pool = None

  def callback(self, obj):
    self.val = obj.val

def foo(new_val):  # foo is outside myClass
    print ('foo passed with', new_val)
    c1.val = new_val
    return c1

if __name__ == '__main__':
  c1 = myClass('c1')
  c1.pool = mp.Pool(processes=1)
  c1.pool.apply_async(foo, args=(2.0, ), callback=c1.callback).wait()  
  c1.pool.close()
  c1.pool.join()
  print ('c1.val:', c1.val)  # should display 'c1 val: 2.0'

Output:

foo passed with 2.0
c1.val: 2.0

When I try to accomplish (i) with the code below, I don't get the same output as above.

class myClass():
  def __init__(self, id):
    self.id = id
    self.val = 1.0
    self.pool = None

  def callback(self, obj):
    self.val = obj.val

  def foo(self, new_val):  # foo is inside myClass
      print ('foo passed with', new_val)
      self.val = new_val
      return self

if __name__ == '__main__':
  c1 = myClass('c1')
  c1.pool = mp.Pool(processes=1)
  c1.pool.apply_async(c1.foo, args=(2.0, ), callback=c1.callback).wait()  
  c1.pool.close()
  c1.pool.join()
  print ('c1.val:', c1.val)  # should display 'c1 val: 2.0'

Output:

c1.val: 1.0

Similarly, when I try to accomplish (ii), foo does not get called again.

class myClass():
  def __init__(self, id):
    self.id = id
    self.val = 1.0
    self.pool = None

  def callback(self, obj):
    self.val = obj.val

def foo(obj, new_val):  # foo is outside myClass
    print ('foo passed with', new_val)
    obj.val = new_val
    return obj

if __name__ == '__main__':
  c1 = myClass('c1')
  c1.pool = mp.Pool(processes=1)
  c1.pool.apply_async(foo, args=(c1, 2.0, ), callback=c1.callback).wait()  
  c1.pool.close()
  c1.pool.join()
  print ('c1.val:', c1.val)  # should display 'c1 val: 2.0'

Output:

c1.val: 1.0

Any idea what needs to be changed in the code above to accomplish (i) and (ii)?

CodePudding user response:

The call was not completed without raising an exception. You can check that with the multiprocessing.pool.AsyncResult.successful method:

import multiprocessing as mp


class myClass():
    def __init__(self, id):
        self.id = id
        self.val = 1.0
        self.pool = None

    def callback(self, obj):
        self.val = obj.val

    def foo(self, new_val):
        print ('foo passed with', new_val)
        self.val = new_val
        return self

if __name__ == '__main__':
    c1 = myClass('c1')
    c1.pool = mp.Pool(processes=1)
    async_result = c1.pool.apply_async(c1.foo, args=(2.0, ), callback=c1.callback)
    async_result.wait()
    print(async_result.successful())  # this is printing False!!!
    c1.pool.close()
    c1.pool.join()
    print ('c1.val:', c1.val)

Now you can define an error_callback to see what's going on:

...
async_result = c1.pool.apply_async(c1.foo, args=(2.0, ), callback=c1.callback, error_callback=lambda x: print(x))
...

This is the error being printed by this function:

pool objects cannot be passed between processes or pickled

On this SO question you can find more info on why this is happening. The problem is that the multiprocessing code has to pickle the things that it sends to the sub-processes it has started, and the pickler doesn't do instance-methods.

  • Related