Home > Software design >  Acquire a multiprocessing.Lock in a with statement if non-blocking or with timeout
Acquire a multiprocessing.Lock in a with statement if non-blocking or with timeout

Time:02-28

With a normal multiprocessing.Lock (or threading.Lock) you can simplify the following code:

lock = multiprocessing.Lock()
lock.acquire()

try:
    ...
finally:
    lock.release()

into:

with lock:
    ...

However, can I still use a context manager when I want to pass some arguments to lock.acquire(...), such as block= or timeout=? For example, I have code like this:

lock_success = lock.acquire(block=False)

if not lock_success:
    return

try:
    ...
finally:
    lock.release()

I don't see a way to pass this argument to the context manager (since it's in the acquire call and not the constructor).

(The idea is that the with-block would get skipped if the lock cannot be acquired.)

Analogously for threading.Lock which provides a similar API.

CodePudding user response:

My suspicion is that this is not possible because of how context managers are designed in Python. What happens with a context manager (CM) is:

with CM:
    ...
  • The CM's __enter__ method is called
  • The block inside of with is run
  • The CM's __exit__ method is called

Thus there is no way to "skip" the inside of the with-block, so I guess this explains the design of the Lock class. Although there are some dark magic ways around that (which should be avoided).

CodePudding user response:

TLDR; you cannot use the built-in lock context manager, but it can still be done fairly cleanly.

It could almost work because Lock.__enter__() does return the value returned from the call to acquire() which should be the boolean success or failure of getting the lock.

l = Lock()
with l as success:
    if success:
        print("run some code")
    else:
        print("skip the code")

Frustratingly however it is not possible to pass arguments to the internal call to acquire (hardcoded args here). I would recommend writing your own context manager to solve this, as it is quite straightforward:

from multiprocessing import Lock
from contextlib import contextmanager

@contextmanager
def get_lock(lock, block=True, timeout=None):
    held = lock.acquire(block=block, timeout=timeout)
    try:
        yield held
    finally:
        if held:
            lock.release()

#### example usage ####

l = Lock()
#lock should be acquired so success == True
with get_lock(l) as success:
    if success:
        print("run some code")
    else:
        print("skip the code")

l.acquire()
#lock won't be acquired and block will proceed after timeout
#use the value of success to determine what to do
with get_lock(l, True, 3) as success:
    if success:
        print("run some code")
    else:
        print("skip the code")

l.release()
  • Related