I have a ring buffer that looks like:
template<class T>
class RingBuffer {
public:
bool Publish();
bool Consume(T& value);
bool IsEmpty(std::size_t head, std::size_t tail);
bool IsFull(std::size_t head, std::size_t tail);
private:
std::size_t Next(std::size_t slot);
std::vector<T> buffer_;
std::atomic<std::size_t> tail_{0};
std::atomic<std::size_t> head_{0};
static constexpr std::size_t kBufferSize{8};
};
This data structure is intended to work with two threads: a publisher thread and a consumer thread. The two major functions without passed memory orders to atomics listed below:
bool Publish(T value) {
const size_t curr_head = head_.load(/* memory order */);
const size_t curr_tail = tail_.load(/* memory_order */);
if (IsFull(curr_head, curr_tail)) {
return false;
}
buffer_[curr_tail] = std::move(value);
tail_.store(Next(curr_tail) /*, memory order */);
return true;
}
bool Consume(T& value) {
const size_t curr_head = head_.load(/* memory order */);
const size_t curr_tail = tail_.load(/* memory order */);
if (IsEmpty(curr_head, curr_tail)) {
return false;
}
value = std::move(buffer_[curr_head]);
head_.store(Next(curr_head) /*, memory order */);
return true;
}
I know that, at least, I must have tail_.store()
in the Publish()
function with std::memory_order::release
and tail_.load()
with std::memory_order::acquire
in the Consume()
function to create happens before connection between a write to buffer_
and a read buffer_
. Also, I can pass std::memory_order::relaxed
to tail_.load()
in Publish()
and to head_.load()
in Consume()
because the same thread will see the last write to an atomic. Now the functions are something like this:
bool Publish(T value) {
const size_t curr_head = head_.load(/* memory order */);
const size_t curr_tail = tail_.load(std::memory_order::relaxed);
if (IsFull(curr_head, curr_tail)) {
return false;
}
buffer_[curr_tail] = std::move(value);
tail_.store(Next(curr_tail), std::memory_order::release);
return true;
}
bool Consume(T& value) {
const size_t curr_head = head_.load(std::memory_order::relaxed);
const size_t curr_tail = tail_.load(std::memory_order::acquire);
if (IsEmpty(curr_head, curr_tail)) {
return false;
}
value = std::move(buffer_[curr_head]);
head_.store(Next(curr_head) /*, memory order */);
return true;
}
The last step is to put memory orders in the remaining pair: head_.load()
in Publish()
and head_.store()
in Consume()
. I have to have value = std::move(buffer_[curr_head]);
line executed before head_.store()
in Consume()
, otherwise I will have data races in cases when the buffer is full, so, at least, I must pass std::memory_order::release
to that store operation to avoid reorderings. But do I have to put std::memory_order::acquire
in head_.load()
in the Publish()
function? I understand that it will help head_.load()
to see head_.store()
for reasonable time unlike std::memory_order::relaxed
, but if I don't need this guarantee of shorter time to see a side effect of a store operation, can I have a relaxed memory order? If I can't, then why? Completed code:
bool Publish(T value) {
const size_t curr_head = head_.load(std::memory_order::relaxed); // or acquire?
const size_t curr_tail = tail_.load(std::memory_order::relaxed);
if (IsFull(curr_head, curr_tail)) {
return false;
}
buffer_[curr_tail] = std::move(value);
tail_.store(Next(curr_tail), std::memory_order::release);
return true;
}
bool Consume(T& value) {
const size_t curr_head = head_.load(std::memory_order::relaxed);
const size_t curr_tail = tail_.load(std::memory_order::acquire);
if (IsEmpty(curr_head, curr_tail)) {
return false;
}
value = std::move(buffer_[curr_head]);
head_.store(Next(curr_head), std::memory_order::release);
return true;
}
Are memory orders for each atomic correct? Am I right about explanation of use of each memory order in each atomic variable?
CodePudding user response:
Previous answers may help as background:
c , std::atomic, what is std::memory_order and how to use them?
https://bartoszmilewski.com/2008/12/01/c-atomics-and-memory-ordering/
Firstly the system you describe is known as a Single Producer - Single Consumer queue. You can always look at the boost version of this container to compare. I often will examine boost code, even when I work in situations where boost is not allowed. This is because examining and understanding a stable solution will give you insights into problems you may encounter (why did they do it that way? Oh, I see it - etc). Given your design, and having written many similar containers I will say that your design has to be careful about distinguishing empty from full. If you use a classic {begin,end} pair, you hit the problem that due to wrapping
{begin, begin size} == {begin, begin} == empty
Okay, so back synchronisation issue.
Given that the order only effects reording, the use of release in Publish seems a textbook use of the flag. Nothing will read the value until the size of the container is incremented, so you don't care if the orders of writes of the value itself happen in a random order, you only care that the value must be fully written before the count is increased. So I would concur, you are correctly using the flag in the Publish function.
I did question whether the "release" was required in the Consume, but if you are moving out of the queue, and those moves are side-effecting, it may be required. I would say that if you are after raw speed, then it may be worth making a second version, that is specialised for trivial objects, that uses relaxed order for incrementing the head.
You might also consider inplace new/delete as you push/pop. Whilst most moves will leave an object in an empty state, the standard only requires that it is left in a valid state after a move. explicitly deleting the object after the move may save you from obscure bugs later.
You could argue that the two atomic loads in consume could be memory_order_consume. This relaxes the constraints to say "I don't care what order they are loaded, as long as they are both loaded by the time they are used". Although I doubt in practice it produces any gain. I am also nervous about this suggestion because when I look at the boost version it is remarkably close to what you have. https://www.boost.org/doc/libs/1_66_0/boost/lockfree/spsc_queue.hpp
template <typename Functor>
bool consume_one(Functor & functor, T * buffer, size_t max_size)
{
const size_t write_index = write_index_.load(memory_order_acquire);
const size_t read_index = read_index_.load(memory_order_relaxed);
if ( empty(write_index, read_index) )
return false;
T & object_to_consume = buffer[read_index];
functor( object_to_consume );
object_to_consume.~T();
size_t next = next_index(read_index, max_size);
read_index_.store(next, memory_order_release);
return true;
}
In general though your approach looks good and very similar to the boost version. but... If I were you I might go through the boost version line-by-line, to see what it does differently. It is very easy to make a mistake.
EDIT: Sorry I just noticed you have the order of the memory_order_acquire/memory_order_relaxed flags the wrong way around in your code. You should make the last write a "release" and the first read "acquire". This doesn't effect the behaviour significantly, but it makes it easier to read. (sync at the start, sync at the end)
Replies to comments: As @user17732522 implies, the copy operation is also a write, so the optimisation for trivial objects will not be synchronised and the optimisation for trivial object will introduce U/B (damn it's easy to make a mistake)