I am trying to get myself up-to-date on C 11 changes, and I'm making a thread-safe wrapper around std::queue called SafeQueue. I have two conditions to potentially block on, the queue being full and the queue being empty. I am using std::condition_variable for this. Unfortunately, on Linux, the notify_all() call on my empty condition is segfaulting. It works fine on a Mac with clang. It's segfaulting in the enqueue() method:
#ifndef mqueue_hpp
#define mqueue_hpp
#include <queue>
#include <mutex>
//////////////////////////////////////////////////////////////////////
// SafeQueue - A thread-safe templated queue. //
//////////////////////////////////////////////////////////////////////
template<class T>
class SafeQueue
{
public:
// Instantiate a new queue. 0 maxsize means unlimited.
SafeQueue(unsigned int maxsize = 0);
~SafeQueue(void);
// Enqueue a new T. If enqueue would cause it to exceed maxsize,
// block until there is room on the queue.
void enqueue(const T& item);
// Dequeue a new T and return it. If the queue is empty, wait on it
// until it is not empty.
T& dequeue(void);
// Return size of the queue.
size_t size(void);
// Return the maxsize of the queue.
size_t maxsize(void) const;
private:
std::mutex m_mutex;
std::condition_variable m_empty;
std::condition_variable m_full;
std::queue<T> m_queue;
size_t m_maxsize;
};
template<class T>
SafeQueue<T>::SafeQueue(unsigned int maxsize) : m_maxsize(maxsize) { }
template<class T>
SafeQueue<T>::~SafeQueue() { }
template<class T>
void SafeQueue<T>::enqueue(const T& item) {
// Synchronize.
if ((m_maxsize != 0) && (size() == m_maxsize)) {
// Queue full. Can't push more on. Block until there's room.
std::unique_lock<std::mutex> lock(m_mutex);
m_full.wait(lock);
}
{
std::lock_guard<std::mutex> lock(m_mutex);
// Add to m_queue and notify the reader if it's waiting.
m_queue.push(item);
}
m_empty.notify_all();
}
template<class T>
T& SafeQueue<T>::dequeue(void) {
// Synchronize. No unlock needed due to unique lock.
if (size() == 0) {
// Wait until something is put on it.
std::unique_lock<std::mutex> lock(m_mutex);
m_empty.wait(lock);
}
std::lock_guard<std::mutex> lock(m_mutex);
// Pull the item off and notify writer if it's waiting on full cond.
T& item = m_queue.front();
m_queue.pop();
m_full.notify_all();
return item;
}
template<class T>
size_t SafeQueue<T>::size(void) {
std::lock_guard<std::mutex> lock(m_mutex);
return m_queue.size();
}
template<class T>
size_t SafeQueue<T>::maxsize(void) const {
return m_maxsize;
}
#endif /* mqueue_hpp */
Clearly I'm doing something wrong but I can't figure it out. Output from gdb:
Core was generated by `./test'.
Program terminated with signal SIGSEGV, Segmentation fault.
#0 0x0000000000000000 in ?? ()
(gdb) bt
#0 0x0000000000000000 in ?? ()
#1 0x0000000000414739 in std::condition_variable::notify_all() ()
#2 0x00000000004054c4 in SafeQueue<int>::enqueue (this=0x7ffee06b3470,
item=@0x7ffee06b355c: 1) at ../mqueue.hpp:59
#3 0x0000000000404ab6 in testsafequeue () at test.cpp:13
#4 0x0000000000404e99 in main () at test.cpp:49
(gdb) frame 2
#2 0x00000000004054c4 in SafeQueue<int>::enqueue (this=0x7ffee06b3470,
item=@0x7ffee06b355c: 1) at ../mqueue.hpp:59
59 m_empty.notify_all();
(gdb) info locals
No locals.
(gdb) this.m_empty
Undefined command: "this.m_empty". Try "help".
(gdb) print this->m_empty
$1 = {_M_cond = {__data = {{__wseq = 0, __wseq32 = {__low = 0,
__high = 0}}, {__g1_start = 0, __g1_start32 = {__low = 0,
__high = 0}}, __g_refs = {0, 0}, __g_size = {0, 0},
__g1_orig_size = 0, __wrefs = 0, __g_signals = {0, 0}},
__size = '\000' <repeats 47 times>, __align = 0}}
Help appreciated.
CodePudding user response:
In both functions, you use one lock to wait on the condition variable, but then you destroy that lock once the wait is over before than taking a new lock to actually operate on the queue.
In between taking a new lock another thread may acquire a lock to the mutex and e.g. remove the object from the queue which the original thread intended to take from the queue, potentially calling front
on an empty queue.
You need to take a single lock in each function, and perform all operations under it. The lock is released only (automatically) while wait
is executing.
Also, returning from wait
does not imply that notify_*
was called. wait
may awake spuriously. Also notify_all
may notify multiple threads about a single new element being available.
You need to call wait
in a loop, checking the conditions required to perform the operation before exiting it.
wait
also provides an overload with which you can give the condition as second argument as predicate to avoid an explicit loop.
Aside from that the lines
T& item = m_queue.front();
m_queue.pop();
//...
return item;
also lead to undefined behavior. pop
will destroy the object that item
references, resulting in a dangling reference. Using the returned reference then result in undefined behavior.
You need to copy/move the object from the queue, not keep a reference to it:
T item = m_queue.front();
m_queue.pop();
//...
return item;
Consequently, dequeue
must also return T
, not T&
.
CodePudding user response:
First, it's a mistake to attempt to make a collection class thread safe - by wrapping all the public methods with a mutex lock. Please see my previous answer on this topic about inherent race conditions of trying to make a thread safe container.
As for your current code, this line looks suspicious:
T& item = m_queue.front();
m_queue.pop();
Your m_queue.front
call will return a reference to the item in the queue, but the pop
method most certainly will destruct the instance that reference points too.
Better
T SafeQueue<T>::dequeue(void) { // return a copy of the item in the queue
...
T item = m_queue.front(); // make the copy to be returned
m_queue.pop();
As for the rest of your implementation, you have several race conditions and issues. You should never assume just because wait
returned that the condition you were waiting for is still valid (i.e. race conditions!). Instead hold the lock for the entire function. When you invoke wait
, it will release the lock until wait
returns.
And as user17732522 mentioned in the comments, attempting to recursively lock a std::mutex (by calling size()
within your enqueue
or dequeue
methods ) will most certain deadlock. You could use a recursive mutex, but it's best to avoid that pattern.
Improved as follows:
void SafeQueue<T>::enqueue(const T& item) {
std::unique_lock<std::mutex> lock(m_mutex);
while ((m_maxsize != 0) && (m_queue.size() >= m_maxsize)) {
// Queue full. Can't push more on. Block until there's room.
m_full.wait(lock); // this will atomically unlock the mutex and wait for the cv to get notified
}
m_queue.push(item);
m_empty.notify_all();
}
T SafeQueue<T>::dequeue(void) {
std::unique_lock<std::mutex> lock(m_mutex);
while (m_queue.size() == 0) {
// Wait until something is put on it.
m_empty.wait(lock); // this will atomically unlock the mutex and wait for the cv
}
// Pull the item off and notify writer if it's waiting on full cond.
T item = m_queue.front();
m_queue.pop();
m_full.notify_all();
return item;
}
Again, I stand by my first paragraph - it's a design fallacy to create a thread safe container. Instead, make the scenario that uses the non-thread safe container thread safe!