Home > Blockchain >  use multiprocess.Pool.map in a class
use multiprocess.Pool.map in a class

Time:11-24

from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30)
        pool.close()
        pool.join()

    def run(self, i):
        self.count  = i
        return self.count

a = Acc()
a.multiprocess()
print(a.count)

I suppose the output should be 30, but it is 0. I don't know how multiprocess.Pool.map works and how it cooperate with a class. Please tell me in detail.

By the way, if I print self.count inside like

    def run(self, i):
        print(self.count)
        self.count  = i
        return self.count

It gives

0
1
0
1
00

1
10

1
00

11

0
1
00

1001



11

0
10

10

1

More confusing, why there have mixing 0 and 1.

CodePudding user response:

multiprocessing creates a seperate process for each call of run() this means that count is different for each of them, what you want is some sort of interprocess synchronization that would make sure you access "the same" count in all the processes. the easiest solution in my opinion would be to use multiprocessing.Manager and specifically the Value class to sync all the processes to use the same counter - then use a Lock to make sure the access to that variable isn't done in parallel

the code would remain mostly the same you just need to initialize the value and use it appropriately:

from multiprocessing import Pool, Manager
manager = Manager()
class Acc:
    def __init__(self):
        self.count = manager.Value("i", 0)
        self.lock = manager.Lock()
    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30)
        pool.close()
        pool.join()

    def run(self, i):
        with(self.lock):
            self.count.value  = i
        return self.count.value

a = Acc()
a.multiprocess()
print(a.count.value)

CodePudding user response:

First let's have the printout be a bit more orderly by adding flush=True to the print statement so that each print output occupies its own line:

from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30)
        pool.close()
        pool.join()

    def run(self, i):
        print('i =', self.count, flush=True)
        self.count  = i
        return self.count

if __name__ == '__main__':
    a = Acc()
    a.multiprocess()
    print('a.count =', a.count)

Prints:

i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 1
a.count = 0

Analysis

Now let's analyze what is happening. The creation of a = Acc() is done by the main process. The multiprocessing pool processes are executing is a different address space so when they execute your worker function, self.run, object a must be serialized/de-serialized to the address space of the process that will be executing the worker function. In that new address space self.count comes across with the initial value of 0, which is printed, and then is incremented to 1 and returned. Meanwhile, in parallel, object a is being serialized/de-serialized 3 more times so 3 other processes can do the same processing and they, too, will also print 0 and return the value 1. But since all this incrementing is occurring to the copies of a that exist in address spaces other than the main process's address space, the original a in the main process remains unmodified. So as the map function continues to execute and a is further copied from the main process to the processing pool, it is always with self.count = 0.

Then the question becomes why is i = 1 instead of i = 0 sometimes being printed?

When you execute map with an iterable specifying 30 elements as are doing here, by default these 30 tasks are divided into "chunks" based on the chunksize argument that you provide. Since we took the default chunksize=None, the map function computes a default chunksize value based on the length of the iterable and the pool size:

chunksize, remainder = divmod(len(iterable), 4 * pool_size)
if remainder:
    chunksize  = 1

In this the pool size was 4 and so the chunksize would have been computed to be 2. That means that each process in the multiprocessing pool are taking tasks of the task queue two at a time and so they are processing the same object twice with different values of i (which is ignored).

If we specify a chunksize of 1, so that each process only processes the object one at a time, then we have:

from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30, chunksize=1)
        pool.close()
        pool.join()

    def run(self, i):
        print('i =', self.count, flush=True)
        self.count  = i
        return self.count

if __name__ == '__main__':
    a = Acc()
    a.multiprocess()
    print('a.count =', a.count)

Prints;

i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
a.count = 0

And if we specify a chunksize of 30 so that a single process is processing all of the tasks against a single object:

from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30, chunksize=30)
        pool.close()
        pool.join()

    def run(self, i):
        print('i =', self.count, flush=True)
        self.count  = i
        return self.count

if __name__ == '__main__':
    a = Acc()
    a.multiprocess()
    print('a.count =', a.count)

Prints:

i = 0
i = 1
i = 2
i = 3
i = 4
i = 5
i = 6
i = 7
i = 8
i = 9
i = 10
i = 11
i = 12
i = 13
i = 14
i = 15
i = 16
i = 17
i = 18
i = 19
i = 20
i = 21
i = 22
i = 23
i = 24
i = 25
i = 26
i = 27
i = 28
i = 29
a.count = 0

In this last case, of course, no multiprocessing occurred since a single process of the multiprocessing pool processed all the submitted tasks.

CodePudding user response:

I would follow the below approach if use of multiprocessing is mandatory. As we want to run our code parallelly, I would not prefer to pass instance methods in map.
I would convert the run to a function instead of a method. which will take and argument and return the same.

def run(i):
    return i

then in the multiprocess method, I will loop for the return values of pool.map and add then to self.count

def multiprocess(self):
    pool = Pool(processes=4)
    for r_value in pool.map(run, [1]*30):
        self.count  = r_value
    pool.close()
    pool.join()

which gives output as

30

Process finished with exit code 0

complete code:

from multiprocessing import Pool

def run(i):
    return i

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        for r_value in pool.map(run, [1]*30):
            self.count  = r_value
        pool.close()
        pool.join()



if __name__ =='__main__':
    a = Acc()
    a.multiprocess()
    print(a.count)
  • Related