diff options
author | eeight <eeight@yandex-team.ru> | 2022-04-27 10:20:47 +0300 |
---|---|---|
committer | eeight <eeight@yandex-team.ru> | 2022-04-27 10:20:47 +0300 |
commit | 590e169a0a90aafab8247e6f1797b646d897f086 (patch) | |
tree | 11987614ed478d03168db07fd0304aec670ac5cf /util/thread/lfstack.h | |
parent | 0cab8da2b5ac8c6870c24756e16e6f8151ce1454 (diff) | |
download | ydb-590e169a0a90aafab8247e6f1797b646d897f086.tar.gz |
IGNIETFERRO-1105 TAtomic -> std::atomic in util/thread/lfstack.h
ref:7fac644f716f0423df5d65d6ba11439ca0ca35d1
Diffstat (limited to 'util/thread/lfstack.h')
-rw-r--r-- | util/thread/lfstack.h | 100 |
1 files changed, 52 insertions, 48 deletions
diff --git a/util/thread/lfstack.h b/util/thread/lfstack.h index ca3d95f3c39..effde7c706d 100644 --- a/util/thread/lfstack.h +++ b/util/thread/lfstack.h @@ -1,7 +1,10 @@ #pragma once #include <util/generic/noncopyable.h> -#include <util/system/atomic.h> + +#include <atomic> +#include <cstddef> +#include <utility> ////////////////////////////// // lock free lifo stack @@ -9,7 +12,7 @@ template <class T> class TLockFreeStack: TNonCopyable { struct TNode { T Value; - TNode* Next; + std::atomic<TNode*> Next; TNode() = default; @@ -21,50 +24,52 @@ class TLockFreeStack: TNonCopyable { } }; - TNode* Head; - TNode* FreePtr; - TAtomic DequeueCount; + std::atomic<TNode*> Head = nullptr; + std::atomic<TNode*> FreePtr = nullptr; + std::atomic<size_t> DequeueCount = 0; void TryToFreeMemory() { - TNode* current = AtomicGet(FreePtr); + TNode* current = FreePtr.load(std::memory_order_acquire); if (!current) return; - if (AtomicAdd(DequeueCount, 0) == 1) { + if (DequeueCount.load() == 1) { // node current is in free list, we are the last thread so try to cleanup - if (AtomicCas(&FreePtr, (TNode*)nullptr, current)) + if (FreePtr.compare_exchange_strong(current, nullptr)) EraseList(current); } } - void EraseList(TNode* volatile p) { + void EraseList(TNode* p) { while (p) { TNode* next = p->Next; delete p; p = next; } } - void EnqueueImpl(TNode* volatile head, TNode* volatile tail) { + void EnqueueImpl(TNode* head, TNode* tail) { + auto headValue = Head.load(std::memory_order_acquire); for (;;) { - tail->Next = AtomicGet(Head); - if (AtomicCas(&Head, head, tail->Next)) + tail->Next.store(headValue, std::memory_order_release); + // NB. See https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange + // The weak forms (1-2) of the functions are allowed to fail spuriously, that is, + // act as if *this != expected even if they are equal. + // When a compare-and-exchange is in a loop, the weak version will yield better + // performance on some platforms. + if (Head.compare_exchange_weak(headValue, head)) break; } } template <class U> void EnqueueImpl(U&& u) { - TNode* volatile node = new TNode(std::forward<U>(u)); + TNode* node = new TNode(std::forward<U>(u)); EnqueueImpl(node, node); } public: - TLockFreeStack() - : Head(nullptr) - , FreePtr(nullptr) - , DequeueCount(0) - { - } + TLockFreeStack() = default; + ~TLockFreeStack() { - EraseList(Head); - EraseList(FreePtr); + EraseList(Head.load()); + EraseList(FreePtr.load()); } void Enqueue(const T& t) { @@ -85,32 +90,32 @@ public: return; } TIter i = dataBegin; - TNode* volatile node = new TNode(*i); - TNode* volatile tail = node; + TNode* node = new TNode(*i); + TNode* tail = node; for (++i; i != dataEnd; ++i) { TNode* nextNode = node; node = new TNode(*i); - node->Next = nextNode; + node->Next.store(nextNode, std::memory_order_release); } EnqueueImpl(node, tail); } bool Dequeue(T* res) { - AtomicAdd(DequeueCount, 1); - for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { - if (AtomicCas(&Head, AtomicGet(current->Next), current)) { + ++DequeueCount; + for (TNode* current = Head.load(std::memory_order_acquire); current;) { + if (Head.compare_exchange_weak(current, current->Next.load(std::memory_order_acquire))) { *res = std::move(current->Value); // delete current; // ABA problem // even more complex node deletion TryToFreeMemory(); - if (AtomicAdd(DequeueCount, -1) == 0) { + if (--DequeueCount == 0) { // no other Dequeue()s, can safely reclaim memory delete current; } else { // Dequeue()s in progress, put node to free list - for (;;) { - AtomicSet(current->Next, AtomicGet(FreePtr)); - if (AtomicCas(&FreePtr, current, current->Next)) + for (TNode* freePtr = FreePtr.load(std::memory_order_acquire);;) { + current->Next.store(freePtr, std::memory_order_release); + if (FreePtr.compare_exchange_weak(freePtr, current)) break; } } @@ -118,16 +123,16 @@ public: } } TryToFreeMemory(); - AtomicAdd(DequeueCount, -1); + --DequeueCount; return false; } // add all elements to *res // elements are returned in order of dequeue (top to bottom; see example in unittest) template <typename TCollection> void DequeueAll(TCollection* res) { - AtomicAdd(DequeueCount, 1); - for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { - if (AtomicCas(&Head, (TNode*)nullptr, current)) { + ++DequeueCount; + for (TNode* current = Head.load(std::memory_order_acquire); current;) { + if (Head.compare_exchange_weak(current, nullptr)) { for (TNode* x = current; x;) { res->push_back(std::move(x->Value)); x = x->Next; @@ -135,7 +140,7 @@ public: // EraseList(current); // ABA problem // even more complex node deletion TryToFreeMemory(); - if (AtomicAdd(DequeueCount, -1) == 0) { + if (--DequeueCount == 0) { // no other Dequeue()s, can safely reclaim memory EraseList(current); } else { @@ -144,9 +149,9 @@ public: while (currentLast->Next) { currentLast = currentLast->Next; } - for (;;) { - AtomicSet(currentLast->Next, AtomicGet(FreePtr)); - if (AtomicCas(&FreePtr, current, currentLast->Next)) + for (TNode* freePtr = FreePtr.load(std::memory_order_acquire);;) { + currentLast->Next.store(freePtr, std::memory_order_release); + if (FreePtr.compare_exchange_weak(freePtr, current)) break; } } @@ -154,11 +159,11 @@ public: } } TryToFreeMemory(); - AtomicAdd(DequeueCount, -1); + --DequeueCount; } bool DequeueSingleConsumer(T* res) { - for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { - if (AtomicCas(&Head, current->Next, current)) { + for (TNode* current = Head.load(std::memory_order_acquire); current;) { + if (Head.compare_exchange_weak(current, current->Next)) { *res = std::move(current->Value); delete current; // with single consumer thread ABA does not happen return true; @@ -170,19 +175,18 @@ public: // elements are returned in order of dequeue (top to bottom; see example in unittest) template <typename TCollection> void DequeueAllSingleConsumer(TCollection* res) { - for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { - if (AtomicCas(&Head, (TNode*)nullptr, current)) { - for (TNode* x = current; x;) { + for (TNode* head = Head.load(std::memory_order_acquire); head;) { + if (Head.compare_exchange_weak(head, nullptr)) { + for (TNode* x = head; x;) { res->push_back(std::move(x->Value)); x = x->Next; } - EraseList(current); // with single consumer thread ABA does not happen + EraseList(head); // with single consumer thread ABA does not happen return; } } } bool IsEmpty() { - AtomicAdd(DequeueCount, 0); // mem barrier - return AtomicGet(Head) == nullptr; // without lock, so result is approximate + return Head.load() == nullptr; // without lock, so result is approximate } }; |