Home > Back-end >  Ordered semaphore.aquire() in python
Ordered semaphore.aquire() in python

Time:01-04

I have multiple multiprocessing.Process() acquiring and releasing:

s = Semaphore(5)

Are the s.acquire() calls guaranteed to be fulfilled in sequence?

If not, what can I use instead so that the first requesting process gets access to the resource first?

CodePudding user response:

Per the threading.Semaphore.acquire documentation :

If the internal counter is larger than zero on entry, decrement it by one and return True immediately.
If the internal counter is zero on entry, block until awoken by a call to release(). Once awoken (and the counter is greater than 0), decrement the counter by 1 and return True. Exactly one thread will be awoken by each call to release(). The order in which threads are awoken should not be relied on.

(emphasis mine)

In other words, while you semaphore has not reached zero, calls will immediately succeed. Then, all calls will block, until one release is called. At this point, any thread could return from its acquire.

Here is a Minimal Reproducible Example :

import threading

NB_TOTAL = 20
NB_SHARED = 5

sem = threading.Semaphore(5)


def acquire_semaphore_then_do_something(thread_number: int):
    sem.acquire()
    do_something(thread_number)
    sem.release()


def do_something(number: int):
    print(number)


# create the threads
threads = [threading.Thread(target=acquire_semaphore_then_do_something, args=(thread_number,))
           for thread_number in range(NB_TOTAL)]
# start them all
for thread in threads:
    thread.start()
# wait for all of them to finish
for thread in threads:
    thread.join()

If you REALLY need threads to run EXACTLY in the order they were created, I recommend to handle things differently :

import threading

NB_TOTAL = 20
NB_SHARED = 5

THREAD_LOCKS = [threading.Lock() for _ in range(NB_TOTAL)]
LOCKS_LOCK = threading.Lock()  # lock for the locks management
NEXT_UNLOCK_INDEX = NB_SHARED


def acquire_sequentially_then_do_something(thread_number: int):
    # try to acquire the dedicated lock
    THREAD_LOCKS[thread_number].acquire()  # blocking
    # dedicated lock acquired, proceeding
    do_something(thread_number)
    # now it has finished its work, it can unlock the next thread
    global NEXT_UNLOCK_INDEX
    if NEXT_UNLOCK_INDEX < NB_TOTAL:
        with LOCKS_LOCK:
            THREAD_LOCKS[NEXT_UNLOCK_INDEX].release()
            NEXT_UNLOCK_INDEX  = 1


def do_something(number: int):
    print(number)


# lock all the locks, except the NB_SHARED first : indexes from 0 to NB_SHARED-1
for number, lock in enumerate(THREAD_LOCKS):
    if number >= NB_SHARED:
        assert lock.acquire(blocking=False)
# create the threads
threads = [threading.Thread(target=acquire_sequentially_then_do_something, args=(thread_number,))
           for thread_number in range(NB_TOTAL)]
# start them all
for thread in threads:
    thread.start()
# wait for all of them to finish
for thread in threads:
    thread.join()

correctly prints

0
1
2
[...]
17
18
19

The idea is that the NEXT_UNLOCK_INDEX will increase 1 by 1, each time unlocking the specific lock so that the corresponding thread can finally lock it and proceed. It ensures that the threads abide to the "first come, first serve" principle.

  •  Tags:  
  • Related