from multiprocessing import Pool
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30)
pool.close()
pool.join()
def run(self, i):
self.count = i
return self.count
a = Acc()
a.multiprocess()
print(a.count)
I suppose the output should be 30, but it is 0. I don't know how multiprocess.Pool.map
works and how it cooperate with a class. Please tell me in detail.
By the way, if I print self.count inside like
def run(self, i):
print(self.count)
self.count = i
return self.count
It gives
0
1
0
1
00
1
10
1
00
11
0
1
00
1001
11
0
10
10
1
More confusing, why there have mixing 0 and 1.
CodePudding user response:
multiprocessing creates a seperate process for each call of run()
this means that count
is different for each of them, what you want is some sort of interprocess synchronization that would make sure you access "the same" count in all the processes. the easiest solution in my opinion would be to use multiprocessing.Manager
and specifically the Value
class to sync all the processes to use the same counter - then use a Lock
to make sure the access to that variable isn't done in parallel
the code would remain mostly the same you just need to initialize the value and use it appropriately:
from multiprocessing import Pool, Manager
manager = Manager()
class Acc:
def __init__(self):
self.count = manager.Value("i", 0)
self.lock = manager.Lock()
def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30)
pool.close()
pool.join()
def run(self, i):
with(self.lock):
self.count.value = i
return self.count.value
a = Acc()
a.multiprocess()
print(a.count.value)
CodePudding user response:
First let's have the printout be a bit more orderly by adding flush=True
to the print statement so that each print output occupies its own line:
from multiprocessing import Pool
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30)
pool.close()
pool.join()
def run(self, i):
print('i =', self.count, flush=True)
self.count = i
return self.count
if __name__ == '__main__':
a = Acc()
a.multiprocess()
print('a.count =', a.count)
Prints:
i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 1
a.count = 0
Analysis
Now let's analyze what is happening. The creation of a = Acc()
is done by the main process. The multiprocessing pool processes are executing is a different address space so when they execute your worker function, self.run
, object a
must be serialized/de-serialized to the address space of the process that will be executing the worker function. In that new address space self.count
comes across with the initial value of 0, which is printed, and then is incremented to 1 and returned. Meanwhile, in parallel, object a
is being serialized/de-serialized 3 more times so 3 other processes can do the same processing and they, too, will also print 0 and return the value 1. But since all this incrementing is occurring to the copies of a
that exist in address spaces other than the main process's address space, the original a
in the main process remains unmodified. So as the map
function continues to execute and a
is further copied from the main process to the processing pool, it is always with self.count = 0
.
Then the question becomes why is i = 1
instead of i = 0
sometimes being printed?
When you execute map
with an iterable specifying 30 elements as are doing here, by default these 30 tasks are divided into "chunks" based on the chunksize argument that you provide. Since we took the default chunksize=None, the map
function computes a default chunksize
value based on the length of the iterable and the pool size:
chunksize, remainder = divmod(len(iterable), 4 * pool_size)
if remainder:
chunksize = 1
In this the pool size was 4 and so the chunksize
would have been computed to be 2. That means that each process in the multiprocessing pool are taking tasks of the task queue two at a time and so they are processing the same object twice with different values of i
(which is ignored).
If we specify a chunksize of 1, so that each process only processes the object one at a time, then we have:
from multiprocessing import Pool
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30, chunksize=1)
pool.close()
pool.join()
def run(self, i):
print('i =', self.count, flush=True)
self.count = i
return self.count
if __name__ == '__main__':
a = Acc()
a.multiprocess()
print('a.count =', a.count)
Prints;
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
a.count = 0
And if we specify a chunksize of 30 so that a single process is processing all of the tasks against a single object:
from multiprocessing import Pool
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
result = pool.map(self.run, [1]*30, chunksize=30)
pool.close()
pool.join()
def run(self, i):
print('i =', self.count, flush=True)
self.count = i
return self.count
if __name__ == '__main__':
a = Acc()
a.multiprocess()
print('a.count =', a.count)
Prints:
i = 0
i = 1
i = 2
i = 3
i = 4
i = 5
i = 6
i = 7
i = 8
i = 9
i = 10
i = 11
i = 12
i = 13
i = 14
i = 15
i = 16
i = 17
i = 18
i = 19
i = 20
i = 21
i = 22
i = 23
i = 24
i = 25
i = 26
i = 27
i = 28
i = 29
a.count = 0
In this last case, of course, no multiprocessing occurred since a single process of the multiprocessing pool processed all the submitted tasks.
CodePudding user response:
I would follow the below approach if use of multiprocessing is mandatory.
As we want to run our code parallelly, I would not prefer to pass instance methods in map
.
I would convert the run
to a function instead of a method. which will take and argument and return the same.
def run(i):
return i
then in the multiprocess
method, I will loop for the return values of pool.map
and add then to self.count
def multiprocess(self):
pool = Pool(processes=4)
for r_value in pool.map(run, [1]*30):
self.count = r_value
pool.close()
pool.join()
which gives output as
30
Process finished with exit code 0
complete code:
from multiprocessing import Pool
def run(i):
return i
class Acc:
def __init__(self):
self.count = 0
def multiprocess(self):
pool = Pool(processes=4)
for r_value in pool.map(run, [1]*30):
self.count = r_value
pool.close()
pool.join()
if __name__ =='__main__':
a = Acc()
a.multiprocess()
print(a.count)