Home > OS >  Does python provide a synchronized buffer?
Does python provide a synchronized buffer?

Time:12-16

I'm very familiar with Python queue.Queue. This is definitely the thing you want when you want to have a reliable stream between consumer and producer threads. However, sometimes you have producers that are faster than consumers and are forced to drop data (as for live video frame capture, for example. We may typically want to buffer just the last one, or two frames).

Does Python provide an asynchronous buffer class, similar to queue.Queue? It's not exactly obvious how to correctly implement one using queue.Queue.

I could, for example:

buf = queue.Queue(maxsize=3)
def produce(msg):
   if buf.full():
      buf.get(block=False)  # Make space
   buf.put(msg, block=False)

def consume():
   msg = buf.get(block=True)
   work(msg)

although I don't particularly like that produce is not a locked, queue-atomic operation. A consume may start between full and get, for example, and it would be (probably) broken for a multi-producer scenario.

Is there's an out-of-the-box solution?

CodePudding user response:

Queue is already multiprocessing and multithreading safe, in that you can't write and read from the queue at the same time. However, you are correct that there's nothing stopping the queue from getting modified between the full() and get commands.

As such you can use a lock, which is how you can control thread access between multiple lines. The lock can only be acquired once, so if its currently locked, all other threads will wait until it has been released before they continue.

import threading

lock = threading.Lock()

def produce(msg):
   lock.acquire()
   if buf.full():
      buf.get(block=False)  # Make space
   buf.put(msg, block=False)
   lock.release()

def consume():
   msg = None

   while !msg:
       lock.acquire()
       try:
           msg = buf.get(block=False)
       except queue.Empty:
           # buffer is empty, wait and try again
           sleep(0.01)
       lock.release()

   work(msg)

CodePudding user response:

There's nothing built in for this, but it appears straightforward enough to build your own buffer class that wraps a Queue and provides mutual exclusion between .put() and .get() with its own lock, and using a Condition variable to wake up would-be consumers whenever an item is added. Like so:

import threading

class SBuf:
    def __init__(self, maxsize):
        import queue
        self.q = queue.Queue()
        self.maxsize = maxsize
        self.nonempty = threading.Condition()

    def get(self):
        with self.nonempty:
            while not self.q.qsize():
                self.nonempty.wait()
            assert self.q.qsize()
            return self.q.get()

    def put(self, v):
        with self.nonempty:
            while self.q.qsize() >= self.maxsize:
                 self.q.get()
            self.q.put(v)
            assert 0 < self.q.qsize() <= self.maxsize
            self.nonempty.notify_all()

BTW, I advise against trying to build this kind of logic out of raw locks. Of course it can be done, but Condition variables are very carefully designed to save you from universes of unintended race conditions. There's a learning curve for Condition variables, but one well worth climbing: they often make things easy instead of brain-busting. Indeed, Python's threading module uses them internally to implement all sort of things.

  • Related