Home > Software design >  Shared counter with multiprocessing Lock is not working in Windows
Shared counter with multiprocessing Lock is not working in Windows

Time:10-08

I wanted to have a shared counter in multiprocessing.Pool and I use the following code to print the varying input list:

import multiprocessing

running = multiprocessing.Value('i', 0)

def f(x):
    global running
    global lock

    # ... code ...

    with lock:
        running.value -= 1
        print(f"Still running: {running.value}\n", end='', flush=True)

    return x

if __name__ == '__main__':
    lock = multiprocessing.Lock()

    rangeval = range(100)
    running.value = len(rangeval)

    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    result = pool.map(f, iterable=rangeval)

This works well in Mac and Linux. But when I run it in Windows it produces an error:

  File "C:\...\...\...\...\main.py", line 11, in f
    with lock:
       NameError: name 'lock' is not defined

When I put lock = multiprocessing.Lock() outside the if __name__ == '__main__' on top of the function f, it produces a weird output like the following:

Still running: -1
Still running: -2
Still running: -3
Still running: -4
Still running: -1
Still running: -2
Still running: -3
Still running: -4

How can this be solved in Windows?

CodePudding user response:

This can be made not work on macOS and Linux by calling

multiprocessing.set_start_method("spawn", force=True)

(the default is likely to be fork on those OSes.)

You don't need a separate lock; Values have a lock of their own.

You'll need to jump through some hoops to correctly move the shared-memory value into the subprocesses when they initialize. (Inspired by this answer.)

import multiprocessing

# multiprocessing.set_start_method("spawn", force=True)

running: multiprocessing.Value  # assigned in initproc


def f(x):
    with running.get_lock():
        running.value -= 1
        print(f"Still running: {running.value}\n", end="", flush=True)

    return x


def initproc(r):
    global running
    running = r


def main():
    running = multiprocessing.Value("i", 0)
    rangeval = range(10)
    running.value = len(rangeval)

    with multiprocessing.Pool(
        processes=multiprocessing.cpu_count(), initializer=initproc, initargs=(running,)
    ) as pool:
        pool.map(f, iterable=rangeval)


if __name__ == "__main__":
    main()

CodePudding user response:

The multiprocessing.Value object needs to be passed to the subprocesses. Here's a trivial example that may help you to understand its use:

from multiprocessing import Process, Value


def p1(v):
    with v.get_lock():
        v.value  = 1


def p2(v):
    with v.get_lock():
        v.value -= 1


if __name__ == '__main__':
    v = Value('i', 0)
    plist = []
    for _ in range(10):
        for p in [p1, p2]:
            _p = Process(target=p, args=[v])
            plist.append(_p)
            _p.start()
    for p in plist:
        p.join()
    assert v.value == 0

Here we create a Value object initialised to zero. We have two functions that will run as subprocesses. p1 will increment the value and p2 will decrement it. We run 10 (effectively) concurrent instances each of p1 and p2. In other words, the value will be incremented 10 times and decremented 10 times ending up as zero

  • Related