aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreeight <eeight@yandex-team.ru>2022-04-27 10:20:47 +0300
committereeight <eeight@yandex-team.ru>2022-04-27 10:20:47 +0300
commit590e169a0a90aafab8247e6f1797b646d897f086 (patch)
tree11987614ed478d03168db07fd0304aec670ac5cf
parent0cab8da2b5ac8c6870c24756e16e6f8151ce1454 (diff)
downloadydb-590e169a0a90aafab8247e6f1797b646d897f086.tar.gz
IGNIETFERRO-1105 TAtomic -> std::atomic in util/thread/lfstack.h
ref:7fac644f716f0423df5d65d6ba11439ca0ca35d1
-rw-r--r--util/thread/lfstack.h100
-rw-r--r--util/thread/lfstack_ut.cpp23
2 files changed, 64 insertions, 59 deletions
diff --git a/util/thread/lfstack.h b/util/thread/lfstack.h
index ca3d95f3c3..effde7c706 100644
--- a/util/thread/lfstack.h
+++ b/util/thread/lfstack.h
@@ -1,7 +1,10 @@
#pragma once
#include <util/generic/noncopyable.h>
-#include <util/system/atomic.h>
+
+#include <atomic>
+#include <cstddef>
+#include <utility>
//////////////////////////////
// lock free lifo stack
@@ -9,7 +12,7 @@ template <class T>
class TLockFreeStack: TNonCopyable {
struct TNode {
T Value;
- TNode* Next;
+ std::atomic<TNode*> Next;
TNode() = default;
@@ -21,50 +24,52 @@ class TLockFreeStack: TNonCopyable {
}
};
- TNode* Head;
- TNode* FreePtr;
- TAtomic DequeueCount;
+ std::atomic<TNode*> Head = nullptr;
+ std::atomic<TNode*> FreePtr = nullptr;
+ std::atomic<size_t> DequeueCount = 0;
void TryToFreeMemory() {
- TNode* current = AtomicGet(FreePtr);
+ TNode* current = FreePtr.load(std::memory_order_acquire);
if (!current)
return;
- if (AtomicAdd(DequeueCount, 0) == 1) {
+ if (DequeueCount.load() == 1) {
// node current is in free list, we are the last thread so try to cleanup
- if (AtomicCas(&FreePtr, (TNode*)nullptr, current))
+ if (FreePtr.compare_exchange_strong(current, nullptr))
EraseList(current);
}
}
- void EraseList(TNode* volatile p) {
+ void EraseList(TNode* p) {
while (p) {
TNode* next = p->Next;
delete p;
p = next;
}
}
- void EnqueueImpl(TNode* volatile head, TNode* volatile tail) {
+ void EnqueueImpl(TNode* head, TNode* tail) {
+ auto headValue = Head.load(std::memory_order_acquire);
for (;;) {
- tail->Next = AtomicGet(Head);
- if (AtomicCas(&Head, head, tail->Next))
+ tail->Next.store(headValue, std::memory_order_release);
+ // NB. See https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange
+ // The weak forms (1-2) of the functions are allowed to fail spuriously, that is,
+ // act as if *this != expected even if they are equal.
+ // When a compare-and-exchange is in a loop, the weak version will yield better
+ // performance on some platforms.
+ if (Head.compare_exchange_weak(headValue, head))
break;
}
}
template <class U>
void EnqueueImpl(U&& u) {
- TNode* volatile node = new TNode(std::forward<U>(u));
+ TNode* node = new TNode(std::forward<U>(u));
EnqueueImpl(node, node);
}
public:
- TLockFreeStack()
- : Head(nullptr)
- , FreePtr(nullptr)
- , DequeueCount(0)
- {
- }
+ TLockFreeStack() = default;
+
~TLockFreeStack() {
- EraseList(Head);
- EraseList(FreePtr);
+ EraseList(Head.load());
+ EraseList(FreePtr.load());
}
void Enqueue(const T& t) {
@@ -85,32 +90,32 @@ public:
return;
}
TIter i = dataBegin;
- TNode* volatile node = new TNode(*i);
- TNode* volatile tail = node;
+ TNode* node = new TNode(*i);
+ TNode* tail = node;
for (++i; i != dataEnd; ++i) {
TNode* nextNode = node;
node = new TNode(*i);
- node->Next = nextNode;
+ node->Next.store(nextNode, std::memory_order_release);
}
EnqueueImpl(node, tail);
}
bool Dequeue(T* res) {
- AtomicAdd(DequeueCount, 1);
- for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
- if (AtomicCas(&Head, AtomicGet(current->Next), current)) {
+ ++DequeueCount;
+ for (TNode* current = Head.load(std::memory_order_acquire); current;) {
+ if (Head.compare_exchange_weak(current, current->Next.load(std::memory_order_acquire))) {
*res = std::move(current->Value);
// delete current; // ABA problem
// even more complex node deletion
TryToFreeMemory();
- if (AtomicAdd(DequeueCount, -1) == 0) {
+ if (--DequeueCount == 0) {
// no other Dequeue()s, can safely reclaim memory
delete current;
} else {
// Dequeue()s in progress, put node to free list
- for (;;) {
- AtomicSet(current->Next, AtomicGet(FreePtr));
- if (AtomicCas(&FreePtr, current, current->Next))
+ for (TNode* freePtr = FreePtr.load(std::memory_order_acquire);;) {
+ current->Next.store(freePtr, std::memory_order_release);
+ if (FreePtr.compare_exchange_weak(freePtr, current))
break;
}
}
@@ -118,16 +123,16 @@ public:
}
}
TryToFreeMemory();
- AtomicAdd(DequeueCount, -1);
+ --DequeueCount;
return false;
}
// add all elements to *res
// elements are returned in order of dequeue (top to bottom; see example in unittest)
template <typename TCollection>
void DequeueAll(TCollection* res) {
- AtomicAdd(DequeueCount, 1);
- for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
- if (AtomicCas(&Head, (TNode*)nullptr, current)) {
+ ++DequeueCount;
+ for (TNode* current = Head.load(std::memory_order_acquire); current;) {
+ if (Head.compare_exchange_weak(current, nullptr)) {
for (TNode* x = current; x;) {
res->push_back(std::move(x->Value));
x = x->Next;
@@ -135,7 +140,7 @@ public:
// EraseList(current); // ABA problem
// even more complex node deletion
TryToFreeMemory();
- if (AtomicAdd(DequeueCount, -1) == 0) {
+ if (--DequeueCount == 0) {
// no other Dequeue()s, can safely reclaim memory
EraseList(current);
} else {
@@ -144,9 +149,9 @@ public:
while (currentLast->Next) {
currentLast = currentLast->Next;
}
- for (;;) {
- AtomicSet(currentLast->Next, AtomicGet(FreePtr));
- if (AtomicCas(&FreePtr, current, currentLast->Next))
+ for (TNode* freePtr = FreePtr.load(std::memory_order_acquire);;) {
+ currentLast->Next.store(freePtr, std::memory_order_release);
+ if (FreePtr.compare_exchange_weak(freePtr, current))
break;
}
}
@@ -154,11 +159,11 @@ public:
}
}
TryToFreeMemory();
- AtomicAdd(DequeueCount, -1);
+ --DequeueCount;
}
bool DequeueSingleConsumer(T* res) {
- for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
- if (AtomicCas(&Head, current->Next, current)) {
+ for (TNode* current = Head.load(std::memory_order_acquire); current;) {
+ if (Head.compare_exchange_weak(current, current->Next)) {
*res = std::move(current->Value);
delete current; // with single consumer thread ABA does not happen
return true;
@@ -170,19 +175,18 @@ public:
// elements are returned in order of dequeue (top to bottom; see example in unittest)
template <typename TCollection>
void DequeueAllSingleConsumer(TCollection* res) {
- for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
- if (AtomicCas(&Head, (TNode*)nullptr, current)) {
- for (TNode* x = current; x;) {
+ for (TNode* head = Head.load(std::memory_order_acquire); head;) {
+ if (Head.compare_exchange_weak(head, nullptr)) {
+ for (TNode* x = head; x;) {
res->push_back(std::move(x->Value));
x = x->Next;
}
- EraseList(current); // with single consumer thread ABA does not happen
+ EraseList(head); // with single consumer thread ABA does not happen
return;
}
}
}
bool IsEmpty() {
- AtomicAdd(DequeueCount, 0); // mem barrier
- return AtomicGet(Head) == nullptr; // without lock, so result is approximate
+ return Head.load() == nullptr; // without lock, so result is approximate
}
};
diff --git a/util/thread/lfstack_ut.cpp b/util/thread/lfstack_ut.cpp
index e20a838f95..e0527aa501 100644
--- a/util/thread/lfstack_ut.cpp
+++ b/util/thread/lfstack_ut.cpp
@@ -1,17 +1,18 @@
-#include <util/system/atomic.h>
-#include <util/system/event.h>
-#include <util/generic/deque.h>
-#include <library/cpp/threading/future/legacy_future.h>
+#include "lfstack.h"
#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/threading/future/legacy_future.h>
-#include "lfstack.h"
+#include <util/generic/deque.h>
+#include <util/system/event.h>
+
+#include <atomic>
Y_UNIT_TEST_SUITE(TLockFreeStackTests) {
class TCountDownLatch {
private:
- TAtomic Current_;
+ std::atomic<size_t> Current_;
TSystemEvent EventObject_;
public:
@@ -21,7 +22,7 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) {
}
void CountDown() {
- if (AtomicDecrement(Current_) == 0) {
+ if (--Current_ == 0) {
EventObject_.Signal();
}
}
@@ -41,7 +42,7 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) {
size_t DequeueThreads;
size_t EnqueuesPerThread;
- TAtomic LeftToDequeue;
+ std::atomic<size_t> LeftToDequeue;
TCountDownLatch StartLatch;
TLockFreeStack<int> Stack;
@@ -69,7 +70,7 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) {
StartLatch.Await();
TVector<int> temp;
- while (AtomicGet(LeftToDequeue) > 0) {
+ while (LeftToDequeue.load() > 0) {
size_t dequeued = 0;
for (size_t i = 0; i < 100; ++i) {
temp.clear();
@@ -80,7 +81,7 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) {
}
dequeued += temp.size();
}
- AtomicAdd(LeftToDequeue, -dequeued);
+ LeftToDequeue -= dequeued;
}
}
@@ -98,7 +99,7 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) {
// effectively join
futures.clear();
- UNIT_ASSERT_VALUES_EQUAL(0, int(AtomicGet(LeftToDequeue)));
+ UNIT_ASSERT_VALUES_EQUAL(0, int(LeftToDequeue.load()));
TVector<int> left;
Stack.DequeueAll(&left);