Home > Blockchain >  Correctness of modified consumer/producer
Correctness of modified consumer/producer

Time:09-25

I am creating a Sound class to play notes and would like feedback on the correctness and conciseness of my design. This class differs from the typical consumer/producer in two ways:

  1. The consumer should respond to events, such as to shut down the thread, or otherwise continue forever. The typical consumer/producer exits when the queue is empty. For example, a thread waiting in queue.get cannot handle additional notifications.

  2. Each set of notes submitted by the producer should overwrite any unprocessed notes remaining on the queue.

Originally I had the consumer process one note at a time using the queue module. I found continually acquiring and releasing the lock without any competition to be inefficient, and as previously noted, queue.get prevents waiting on additional events. So instead of building upon that, I rewrote it into:

import threading


queue = []
condition = threading.Condition()
interrupt = threading.Event()
stop = threading.Event()

def producer():
    while some_condition:
        ns = get_notes() # [(float,float)]
        with condition:
            queue.clear()
            queue.append(ns)
            interrupt.set()
            condition.notify()
    with condition:
        stop.set()
        condition.notify()
    consumer.join()


def consumer():
    while not stop.is_set():
        with condition:
            while not (queue or stop.is_set()):
                condition.wait()
            if stop.is_set():
                break
            interrupt.clear()
            ns = queue.pop()
        ss = gen_samples(ns) # iterator/fast
        for b in grouper(ss, size/2):
            if interrupt.is_set() or stop.is_set()
                break
            stream.write(b)


thread = threading.Thread(target=consumer)
thread.start()
producer()

My questions are as follows:

  1. Is this thread-safe? I want to specifically point out my use of is_set without locks or synchronization (in the for-loop).

  2. Can the events be replaced with boolean variables? I believe so as conflicting writes in both threads (data race) are guarded by the condition variable. There is a race condition between setting and checking events but I do not believe it affects program flow.

  3. Is there a more efficient approach/algorithm utilizing different synchronization primitives from the threading module?

edit: Found and fixed a possible deadlock described in Why does Python threading.Condition() notify() require a lock?

CodePudding user response:

Analyzing thread-safety in Python can take into account the Global Interpreter Lock (GIL): no two threads will execute Python code simultaneously. Assignments to variables or object fields are effectively atomic (there are no half-assigned variables) and changes propagate effectively immediately to other threads.

This means that your use of Event.is_set() is already equivalent to using plain booleans. An event is a bool guarded by a Condition. The is_set() method checks the boolean directly. The set() method acquires the Condition, sets the boolean, and notifies all waiting threads. The wait() methods waits until the set() method is invoked. The clear() method acquires the Condition and unsets the boolean. Since you never wait() for any Event, and setting the boolean is atomic, the Condition in the Event is effectively unused.

This might get rid of a couple of locks, but isn't really a huge efficiency win. A Condition is still an abstraction over a lock, but the built-in Queue type uses locks directly. Thus, I would assume that the built-in queue is no less performant than your solution, even for a single consumer.

Your main issue with the built-in queue is that “continually acquiring and releasing the lock without any competition [is] inefficient”. This is wrong on two counts:

  1. Due to Python's GIL, there is little competition in either case.
  2. Acquiring uncontested locks is very efficient.

So while your solution is probably sufficiently correct (I can see no opportunity for deadlock) it is unlikely to be particularly efficient. (There are just some small mistakes, like using stop instead of stop.is_set() and some syntax errors.)

If you are seeing poor performance with Python threads that's probably because of CPython, not because of the Queue type. I already mentioned that only one thread can run at a time due to the GIL. If multiple threads want to run, they must be scheduled by the operating system to do so and acquire the GIL. Each thread will wait for 5ms before asking the running thread to give up the GIL (in a manner quite similar to your interrupt flag). And then the thread can do useful work like acquiring a lock for a critical section that must not be interrupted by other threads.

Possibly, the solution could be to avoid CPython's threads.

  • If you have multiple CPU-bound tasks, you must use multiple processes. CPython's threads will not run in parallel. However, communication between processes is more expensive.
  • Consider whether you can combine the producer consumer directly, possibly using features such as generators.
  • For an easier time with juggling multiple tasks in the same thread, consider using async/await. Event loops are provided by the asyncio module. This is just as fast as Python's threads, with the caveat that tasks don't pre-empt (interrupt) each other. But this can be advantage: since a task can only be suspended at an await, you don't need most locks and it is easier to reason about correctness of the code. The downside is that async/await might have even higher latency than using threads.
  • Python has a concept of “executors” that make it easy and efficient to run tasks in separate threads (for I/O-bound tasks) or separate processes (for CPU-bound tasks).
  • For communicating between multiple processes, use the types from the multiprocessing module (e.g. Queue, Connection, or Value).
  • Related