I wanted to have a shared counter in multiprocessing.Pool
and I use the following code to print the varying input list:
import multiprocessing
running = multiprocessing.Value('i', 0)
def f(x):
global running
global lock
# ... code ...
with lock:
running.value -= 1
print(f"Still running: {running.value}\n", end='', flush=True)
return x
if __name__ == '__main__':
lock = multiprocessing.Lock()
rangeval = range(100)
running.value = len(rangeval)
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
result = pool.map(f, iterable=rangeval)
This works well in Mac and Linux. But when I run it in Windows it produces an error:
File "C:\...\...\...\...\main.py", line 11, in f
with lock:
NameError: name 'lock' is not defined
When I put lock = multiprocessing.Lock()
outside the if __name__ == '__main__'
on top of the function f
, it produces a weird output like the following:
Still running: -1
Still running: -2
Still running: -3
Still running: -4
Still running: -1
Still running: -2
Still running: -3
Still running: -4
How can this be solved in Windows?
CodePudding user response:
This can be made not work on macOS and Linux by calling
multiprocessing.set_start_method("spawn", force=True)
(the default is likely to be fork
on those OSes.)
You don't need a separate lock; Value
s have a lock of their own.
You'll need to jump through some hoops to correctly move the shared-memory value into the subprocesses when they initialize. (Inspired by this answer.)
import multiprocessing
# multiprocessing.set_start_method("spawn", force=True)
running: multiprocessing.Value # assigned in initproc
def f(x):
with running.get_lock():
running.value -= 1
print(f"Still running: {running.value}\n", end="", flush=True)
return x
def initproc(r):
global running
running = r
def main():
running = multiprocessing.Value("i", 0)
rangeval = range(10)
running.value = len(rangeval)
with multiprocessing.Pool(
processes=multiprocessing.cpu_count(), initializer=initproc, initargs=(running,)
) as pool:
pool.map(f, iterable=rangeval)
if __name__ == "__main__":
main()
CodePudding user response:
The multiprocessing.Value object needs to be passed to the subprocesses. Here's a trivial example that may help you to understand its use:
from multiprocessing import Process, Value
def p1(v):
with v.get_lock():
v.value = 1
def p2(v):
with v.get_lock():
v.value -= 1
if __name__ == '__main__':
v = Value('i', 0)
plist = []
for _ in range(10):
for p in [p1, p2]:
_p = Process(target=p, args=[v])
plist.append(_p)
_p.start()
for p in plist:
p.join()
assert v.value == 0
Here we create a Value object initialised to zero. We have two functions that will run as subprocesses. p1 will increment the value and p2 will decrement it. We run 10 (effectively) concurrent instances each of p1 and p2. In other words, the value will be incremented 10 times and decremented 10 times ending up as zero