Home > Blockchain >  Implementing BlockingQueue using Semaphores only
Implementing BlockingQueue using Semaphores only

Time:07-23

I'm having a trouble with my code design and can't find a solution. I am implementing a BlockingQueue using Semaphores (TaggedSemaphores, to be precise) without loops, condvars, mutexes, and if-else statements. My current code looks like this:


template <typename T>
class BlockingQueue {
  using Token = typename TaggedSemaphore<T>::Token;
  using Guard = typename TaggedSemaphore<T>::Guard;
 public:
  explicit BlockingQueue(size_t capacity):
        empty_ (capacity), taken_ (0), mutex_(1), put_mutex_(1), take_mutex_(1)
  {
  }

  // Inserts the specified element into this queue,
  // waiting if necessary for space to become available.
  void Put(T value) {
    Guard put_guard (put_mutex_);
    Token tok (std::move (empty_.Acquire()));
    Guard guard (mutex_);
    buffer_.push_back(std::move(value));

    taken_.Release(std::move(tok));
  }

  // Retrieves and removes the head of this queue,
  // waiting if necessary until an element becomes available
  T Take() {
    Guard take_guard (take_mutex_);
    Token tok (std::move (taken_.Acquire()));
    Guard guard (mutex_);
    T ret_value = std::move(buffer_.front());
    buffer_.pop_front();

    empty_.Release(std::move(tok));
    return std::move(ret_value);
  }

  private:
  TaggedSemaphore<T> empty_, taken_, mutex_, put_mutex_, take_mutex_;
  std::deque<T> buffer_;
};

The TaggedSemaphore class is a simple wrapper of Semaphore that returns a Token after calling Acquire and invalidates a Token in the Release method. Guard is a RAII-Token, which accepts a TaggedSemaphore instance in it constructor. It stores a valid Token and releases it in the destructor.

The issue is that if two threads are simultaneously inserting and retrieving values, then they both modify empty_ and taken_, so in the critical section the threads will be working with wrong data. I am pretty sure that this will cause bugs. However, I can't preemptively lock the mutex_, because it would result in a deadlock, when the queue is either empty or full (thus I lock it in the third line of each function's body). Is there a way to get around this problem?

CodePudding user response:

This queue works fine. put_mutex_ and take_mutex_ are unnecessary, though, and should be removed.

You say that simultaneous threads might be "working with the wrong data", but nobody ever gets the value of empty_ or taken_, so they are not working with this data at all.

Code like this is difficult to write, and even more difficult to understand when reading it, to the extent that you can verify its correctness. It is vitally important to use comments and names to make that easier.

Here is the same code, without the unnecessary mutexes, but with some different names and comments. I think this will make it a lot easier to understand why it's correct.

template <typename T>
class BlockingQueue {
  using Token = typename TaggedSemaphore<T>::Token;
  using Guard = typename TaggedSemaphore<T>::Guard;
 private:
  // Number of elements that can be removed without blocking
  // Always <= queue size
  TaggedSemaphore<T> avail_;
  // Number of elements that can be added without blocking;
  // Always <= queue free space
  TaggedSemaphore<T> free_;
  // Mutex to guard access to buffer_
  TaggedSemaphore<T> mutex_;

  std::deque<T> buffer_;

 public:
  explicit BlockingQueue(size_t capacity):
        free_ (capacity), avail_ (0), mutex_(1)
  {
  }

  // Inserts the specified element into this queue,
  // waiting if necessary for space to become available.
  void Put(T value) {
    // there must be space
    Token tok (std::move (free_.Acquire()));

    Guard guard (mutex_);
    buffer_.push_back(std::move(value));

    // and now the item is available
    avail_.Release(std::move(tok));
  }

  // Retrieves and removes the head of this queue,
  // waiting if necessary until an element becomes available
  T Take() {
    // there must be an item available
    Token tok (std::move (avail_.Acquire()));

    Guard guard (mutex_);
    T ret_value = std::move(buffer_.front());
    buffer_.pop_front();

    // and now there is another free slot
    free_.Release(std::move(tok));
    return std::move(ret_value);
  }

};
  • Related