diff options
author | eeight <eeight@yandex-team.ru> | 2022-04-27 10:20:47 +0300 |
---|---|---|
committer | eeight <eeight@yandex-team.ru> | 2022-04-27 10:20:47 +0300 |
commit | 590e169a0a90aafab8247e6f1797b646d897f086 (patch) | |
tree | 11987614ed478d03168db07fd0304aec670ac5cf | |
parent | 0cab8da2b5ac8c6870c24756e16e6f8151ce1454 (diff) | |
download | ydb-590e169a0a90aafab8247e6f1797b646d897f086.tar.gz |
IGNIETFERRO-1105 TAtomic -> std::atomic in util/thread/lfstack.h
ref:7fac644f716f0423df5d65d6ba11439ca0ca35d1
-rw-r--r-- | util/thread/lfstack.h | 100 | ||||
-rw-r--r-- | util/thread/lfstack_ut.cpp | 23 |
2 files changed, 64 insertions, 59 deletions
diff --git a/util/thread/lfstack.h b/util/thread/lfstack.h index ca3d95f3c3..effde7c706 100644 --- a/util/thread/lfstack.h +++ b/util/thread/lfstack.h @@ -1,7 +1,10 @@ #pragma once #include <util/generic/noncopyable.h> -#include <util/system/atomic.h> + +#include <atomic> +#include <cstddef> +#include <utility> ////////////////////////////// // lock free lifo stack @@ -9,7 +12,7 @@ template <class T> class TLockFreeStack: TNonCopyable { struct TNode { T Value; - TNode* Next; + std::atomic<TNode*> Next; TNode() = default; @@ -21,50 +24,52 @@ class TLockFreeStack: TNonCopyable { } }; - TNode* Head; - TNode* FreePtr; - TAtomic DequeueCount; + std::atomic<TNode*> Head = nullptr; + std::atomic<TNode*> FreePtr = nullptr; + std::atomic<size_t> DequeueCount = 0; void TryToFreeMemory() { - TNode* current = AtomicGet(FreePtr); + TNode* current = FreePtr.load(std::memory_order_acquire); if (!current) return; - if (AtomicAdd(DequeueCount, 0) == 1) { + if (DequeueCount.load() == 1) { // node current is in free list, we are the last thread so try to cleanup - if (AtomicCas(&FreePtr, (TNode*)nullptr, current)) + if (FreePtr.compare_exchange_strong(current, nullptr)) EraseList(current); } } - void EraseList(TNode* volatile p) { + void EraseList(TNode* p) { while (p) { TNode* next = p->Next; delete p; p = next; } } - void EnqueueImpl(TNode* volatile head, TNode* volatile tail) { + void EnqueueImpl(TNode* head, TNode* tail) { + auto headValue = Head.load(std::memory_order_acquire); for (;;) { - tail->Next = AtomicGet(Head); - if (AtomicCas(&Head, head, tail->Next)) + tail->Next.store(headValue, std::memory_order_release); + // NB. See https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange + // The weak forms (1-2) of the functions are allowed to fail spuriously, that is, + // act as if *this != expected even if they are equal. + // When a compare-and-exchange is in a loop, the weak version will yield better + // performance on some platforms. + if (Head.compare_exchange_weak(headValue, head)) break; } } template <class U> void EnqueueImpl(U&& u) { - TNode* volatile node = new TNode(std::forward<U>(u)); + TNode* node = new TNode(std::forward<U>(u)); EnqueueImpl(node, node); } public: - TLockFreeStack() - : Head(nullptr) - , FreePtr(nullptr) - , DequeueCount(0) - { - } + TLockFreeStack() = default; + ~TLockFreeStack() { - EraseList(Head); - EraseList(FreePtr); + EraseList(Head.load()); + EraseList(FreePtr.load()); } void Enqueue(const T& t) { @@ -85,32 +90,32 @@ public: return; } TIter i = dataBegin; - TNode* volatile node = new TNode(*i); - TNode* volatile tail = node; + TNode* node = new TNode(*i); + TNode* tail = node; for (++i; i != dataEnd; ++i) { TNode* nextNode = node; node = new TNode(*i); - node->Next = nextNode; + node->Next.store(nextNode, std::memory_order_release); } EnqueueImpl(node, tail); } bool Dequeue(T* res) { - AtomicAdd(DequeueCount, 1); - for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { - if (AtomicCas(&Head, AtomicGet(current->Next), current)) { + ++DequeueCount; + for (TNode* current = Head.load(std::memory_order_acquire); current;) { + if (Head.compare_exchange_weak(current, current->Next.load(std::memory_order_acquire))) { *res = std::move(current->Value); // delete current; // ABA problem // even more complex node deletion TryToFreeMemory(); - if (AtomicAdd(DequeueCount, -1) == 0) { + if (--DequeueCount == 0) { // no other Dequeue()s, can safely reclaim memory delete current; } else { // Dequeue()s in progress, put node to free list - for (;;) { - AtomicSet(current->Next, AtomicGet(FreePtr)); - if (AtomicCas(&FreePtr, current, current->Next)) + for (TNode* freePtr = FreePtr.load(std::memory_order_acquire);;) { + current->Next.store(freePtr, std::memory_order_release); + if (FreePtr.compare_exchange_weak(freePtr, current)) break; } } @@ -118,16 +123,16 @@ public: } } TryToFreeMemory(); - AtomicAdd(DequeueCount, -1); + --DequeueCount; return false; } // add all elements to *res // elements are returned in order of dequeue (top to bottom; see example in unittest) template <typename TCollection> void DequeueAll(TCollection* res) { - AtomicAdd(DequeueCount, 1); - for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { - if (AtomicCas(&Head, (TNode*)nullptr, current)) { + ++DequeueCount; + for (TNode* current = Head.load(std::memory_order_acquire); current;) { + if (Head.compare_exchange_weak(current, nullptr)) { for (TNode* x = current; x;) { res->push_back(std::move(x->Value)); x = x->Next; @@ -135,7 +140,7 @@ public: // EraseList(current); // ABA problem // even more complex node deletion TryToFreeMemory(); - if (AtomicAdd(DequeueCount, -1) == 0) { + if (--DequeueCount == 0) { // no other Dequeue()s, can safely reclaim memory EraseList(current); } else { @@ -144,9 +149,9 @@ public: while (currentLast->Next) { currentLast = currentLast->Next; } - for (;;) { - AtomicSet(currentLast->Next, AtomicGet(FreePtr)); - if (AtomicCas(&FreePtr, current, currentLast->Next)) + for (TNode* freePtr = FreePtr.load(std::memory_order_acquire);;) { + currentLast->Next.store(freePtr, std::memory_order_release); + if (FreePtr.compare_exchange_weak(freePtr, current)) break; } } @@ -154,11 +159,11 @@ public: } } TryToFreeMemory(); - AtomicAdd(DequeueCount, -1); + --DequeueCount; } bool DequeueSingleConsumer(T* res) { - for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { - if (AtomicCas(&Head, current->Next, current)) { + for (TNode* current = Head.load(std::memory_order_acquire); current;) { + if (Head.compare_exchange_weak(current, current->Next)) { *res = std::move(current->Value); delete current; // with single consumer thread ABA does not happen return true; @@ -170,19 +175,18 @@ public: // elements are returned in order of dequeue (top to bottom; see example in unittest) template <typename TCollection> void DequeueAllSingleConsumer(TCollection* res) { - for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { - if (AtomicCas(&Head, (TNode*)nullptr, current)) { - for (TNode* x = current; x;) { + for (TNode* head = Head.load(std::memory_order_acquire); head;) { + if (Head.compare_exchange_weak(head, nullptr)) { + for (TNode* x = head; x;) { res->push_back(std::move(x->Value)); x = x->Next; } - EraseList(current); // with single consumer thread ABA does not happen + EraseList(head); // with single consumer thread ABA does not happen return; } } } bool IsEmpty() { - AtomicAdd(DequeueCount, 0); // mem barrier - return AtomicGet(Head) == nullptr; // without lock, so result is approximate + return Head.load() == nullptr; // without lock, so result is approximate } }; diff --git a/util/thread/lfstack_ut.cpp b/util/thread/lfstack_ut.cpp index e20a838f95..e0527aa501 100644 --- a/util/thread/lfstack_ut.cpp +++ b/util/thread/lfstack_ut.cpp @@ -1,17 +1,18 @@ -#include <util/system/atomic.h> -#include <util/system/event.h> -#include <util/generic/deque.h> -#include <library/cpp/threading/future/legacy_future.h> +#include "lfstack.h" #include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/threading/future/legacy_future.h> -#include "lfstack.h" +#include <util/generic/deque.h> +#include <util/system/event.h> + +#include <atomic> Y_UNIT_TEST_SUITE(TLockFreeStackTests) { class TCountDownLatch { private: - TAtomic Current_; + std::atomic<size_t> Current_; TSystemEvent EventObject_; public: @@ -21,7 +22,7 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) { } void CountDown() { - if (AtomicDecrement(Current_) == 0) { + if (--Current_ == 0) { EventObject_.Signal(); } } @@ -41,7 +42,7 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) { size_t DequeueThreads; size_t EnqueuesPerThread; - TAtomic LeftToDequeue; + std::atomic<size_t> LeftToDequeue; TCountDownLatch StartLatch; TLockFreeStack<int> Stack; @@ -69,7 +70,7 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) { StartLatch.Await(); TVector<int> temp; - while (AtomicGet(LeftToDequeue) > 0) { + while (LeftToDequeue.load() > 0) { size_t dequeued = 0; for (size_t i = 0; i < 100; ++i) { temp.clear(); @@ -80,7 +81,7 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) { } dequeued += temp.size(); } - AtomicAdd(LeftToDequeue, -dequeued); + LeftToDequeue -= dequeued; } } @@ -98,7 +99,7 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) { // effectively join futures.clear(); - UNIT_ASSERT_VALUES_EQUAL(0, int(AtomicGet(LeftToDequeue))); + UNIT_ASSERT_VALUES_EQUAL(0, int(LeftToDequeue.load())); TVector<int> left; Stack.DequeueAll(&left); |