I'm trying to implement a lock-free wrapper via std::atomic<std::shared_ptr>>
to operate over non-trivial objects like containers.
I found some relevant pieces of information in these two topics:
But it still isn't what I need.
Give an example:
TEST_METHOD(FechAdd)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic<size_t>(0);
auto thread1 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i )
container ;
});
auto thread2 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i )
container ;
});
thread1.join();
thread2.join();
Assert::AreEqual(loopCount * 2, container.load());
}
This function works correctly because the post-increment operator uses an internally fetch_add()
atomic operation.
On the other hand:
TEST_METHOD(LoadStore)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic<size_t>(0);
auto thread1 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i )
{
auto value = container.load();
value ;
container.store(value);
}
});
auto thread2 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i )
{
auto value = container.load();
value ;
container.store(value);
}
});
thread1.join();
thread2.join();
Assert::AreEqual(loopCount * 2, container.load());
}
Whereas if I replace it with .load()
and .store()
operations and incrementation between these two operations, the result is not the same.
That is two atomic operations, so synchronization cannot be done between these operations.
My ultimate goal through std::atomic<std::shared_ptr>
loads the object's actual state, performs some non-const operation, and saves it by store operation again.
TEST_METHOD(AtomicSharedPtr)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic(std::make_shared<std::unordered_set<int>>());
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i )
{
// some other lock-free synchronization primitives as barrier, conditions or?
auto reader = container.load();
reader->emplace(5);
container.store(reader);
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i )
{
// some other lock-free synchronization primitives as barrier, conditions or?
auto reader = container.load();
reader->erase(5);
container.store(reader);
}
});
}
I knew that the second thread also has only shared_ptr from atomic and non-const operations on shared_ptr, which can only cause data race.
So any hint on how to implement a lock-free wrapper that will work with non-const operations of the object stored in std::atomic<std::shared_ptr>
?
CodePudding user response:
First, a sidenote. std::atomic<std::shared_ptr<T>>
gives atomic access to the pointer, and provides no synchronization whatsoever for the T
. That's super important to note here. And your code shows that you're trying to synchronize the T
, not the pointer, so the atomic
is not doing what you think it is. In order to use std::atomic<std::shared_ptr<T>>
, you must treat the pointed-at T
as const
.
There's two ways to handle read-modify-write with arbitrary data in a thread safe way. The first is, obviously, to use locks. This is usually faster to execute and due to its simplicity, usually less buggy, and is therefore highly suggested. If you really want to do this with atomic operations, it's difficult, and executes slower.
It usually looks something like this, where you make a deep copy of the pointed-at data, mutate the copy, and then attempt to replace the old data with the new data. If someone else has changed the data in the meantime, you throw it all away and start the whole mutation over.
template<class T, class F>
bool readModifyWrite(std::atomic<std::shared_ptr<T>>& container, F&& function) {
do {
const auto&& oldT = container.load();
//first a deep copy, to enforce immutability
auto&& newT = std::make_shared(oldT.get());
//then mutate the T
if (!function(*newT))
return false; //function aborted
//then attempt to save the modified T.
//if someone else changed the container during our modification, start over
} while(container.compare_exchange_strong(oldT, newT) == false);
//Note that this may take MANY tries to eventually succeed.
return true;
}
And then usage is similar to what you had:
auto&& container = std::atomic(std::make_shared<std::unordered_set<int>>());
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i )
{
readModifyWrite(container, [](auto& reader) {
reader.emplace(5);
return true;
});
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i )
{
readModifyWrite(container, [](auto& reader) {
reader.erase(5);
return true;
});
}
});
}
Note that since one thread is inserting 5
loopCount
times, and the other is erasing 5
loopCount
times, but they aren't synchronized between them, the first thread might write several times in a row (which is a no-op for a set) and then the second thread might erase several times in a row (which is a no-op for a set), so you don't really have guarantees about the end result here, but I'm assuming you knew that.
If, however, you wanted to use the mutations to synchronize, that gets quite a bit more complicated. The mutating function has to return if it succeeded or aborted, and then the caller of readModifyWrite
has to handle the case where the modify aborted. (Note that readModifyWrite
effectively returns the value from the function, so it returns the value from the modify step. The write step does not affect the return value)
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; )
{
bool did_emplace = readModifyWrite(container, [](auto& reader) {
return reader.emplace(5);
});
if (did_emplace) i ;
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; )
{
bool did_erase = readModifyWrite(container, [](auto& reader) {
return reader.erase(5);
});
if (did_erase) i ;
}
});
}