diff options
author | eeight <eeight@yandex-team.ru> | 2022-05-04 12:17:36 +0300 |
---|---|---|
committer | eeight <eeight@yandex-team.ru> | 2022-05-04 12:17:36 +0300 |
commit | 1af8bb8789fdb0ca2927ff097537916a7f297bb1 (patch) | |
tree | bdd66cee875a7ee9ab9d4f6d92634d2c61ddadad | |
parent | 6faf680f58ba8341a694dcbadf572d37197ae888 (diff) | |
download | ydb-1af8bb8789fdb0ca2927ff097537916a7f297bb1.tar.gz |
IGNIETFERRO-1105 TAtomic -> std::atomic in util/thread/lfqueue.h
ref:8cf44e7b3fecd13c3a0c699a8c1c7abe780eab0b
-rw-r--r-- | util/thread/lfqueue.h | 143 | ||||
-rw-r--r-- | util/thread/lfqueue_ut.cpp | 8 |
2 files changed, 71 insertions, 80 deletions
diff --git a/util/thread/lfqueue.h b/util/thread/lfqueue.h index ab523631e4..dd4e738661 100644 --- a/util/thread/lfqueue.h +++ b/util/thread/lfqueue.h @@ -1,11 +1,12 @@ #pragma once #include "fwd.h" +#include "lfstack.h" #include <util/generic/ptr.h> -#include <util/system/atomic.h> #include <util/system/yassert.h> -#include "lfstack.h" + +#include <atomic> struct TDefaultLFCounter { template <class T> @@ -39,24 +40,17 @@ class TLockFreeQueue: public TNonCopyable { { } - TListNode* volatile Next; + std::atomic<TListNode*> Next; T Data; }; // using inheritance to be able to use 0 bytes for TCounter when we don't need one struct TRootNode: public TCounter { - TListNode* volatile PushQueue; - TListNode* volatile PopQueue; - TListNode* volatile ToDelete; - TRootNode* volatile NextFree; - - TRootNode() - : PushQueue(nullptr) - , PopQueue(nullptr) - , ToDelete(nullptr) - , NextFree(nullptr) - { - } + std::atomic<TListNode*> PushQueue = nullptr; + std::atomic<TListNode*> PopQueue = nullptr; + std::atomic<TListNode*> ToDelete = nullptr; + std::atomic<TRootNode*> NextFree = nullptr; + void CopyCounter(TRootNode* x) { *(TCounter*)this = *(TCounter*)x; } @@ -64,58 +58,58 @@ class TLockFreeQueue: public TNonCopyable { static void EraseList(TListNode* n) { while (n) { - TListNode* keepNext = AtomicGet(n->Next); + TListNode* keepNext = n->Next.load(std::memory_order_acquire); delete n; n = keepNext; } } - alignas(64) TRootNode* volatile JobQueue; - alignas(64) volatile TAtomic FreememCounter; - alignas(64) volatile TAtomic FreeingTaskCounter; - alignas(64) TRootNode* volatile FreePtr; + alignas(64) std::atomic<TRootNode*> JobQueue; + alignas(64) std::atomic<size_t> FreememCounter; + alignas(64) std::atomic<size_t> FreeingTaskCounter; + alignas(64) std::atomic<TRootNode*> FreePtr; void TryToFreeAsyncMemory() { - TAtomic keepCounter = AtomicAdd(FreeingTaskCounter, 0); - TRootNode* current = AtomicGet(FreePtr); + const auto keepCounter = FreeingTaskCounter.load(); + TRootNode* current = FreePtr.load(std::memory_order_acquire); if (current == nullptr) return; - if (AtomicAdd(FreememCounter, 0) == 1) { + if (FreememCounter.load() == 1) { // we are the last thread, try to cleanup // check if another thread have cleaned up - if (keepCounter != AtomicAdd(FreeingTaskCounter, 0)) { + if (keepCounter != FreeingTaskCounter.load()) { return; } - if (AtomicCas(&FreePtr, (TRootNode*)nullptr, current)) { + if (FreePtr.compare_exchange_strong(current, nullptr)) { // free list while (current) { - TRootNode* p = AtomicGet(current->NextFree); - EraseList(AtomicGet(current->ToDelete)); + TRootNode* p = current->NextFree.load(std::memory_order_acquire); + EraseList(current->ToDelete.load(std::memory_order_acquire)); delete current; current = p; } - AtomicAdd(FreeingTaskCounter, 1); + ++FreeingTaskCounter; } } } void AsyncRef() { - AtomicAdd(FreememCounter, 1); + ++FreememCounter; } void AsyncUnref() { TryToFreeAsyncMemory(); - AtomicAdd(FreememCounter, -1); + --FreememCounter; } void AsyncDel(TRootNode* toDelete, TListNode* lst) { - AtomicSet(toDelete->ToDelete, lst); - for (;;) { - AtomicSet(toDelete->NextFree, AtomicGet(FreePtr)); - if (AtomicCas(&FreePtr, toDelete, AtomicGet(toDelete->NextFree))) + toDelete->ToDelete.store(lst, std::memory_order_release); + for (auto freePtr = FreePtr.load();;) { + toDelete->NextFree.store(freePtr, std::memory_order_release); + if (FreePtr.compare_exchange_weak(freePtr, toDelete)) break; } } void AsyncUnref(TRootNode* toDelete, TListNode* lst) { TryToFreeAsyncMemory(); - if (AtomicAdd(FreememCounter, -1) == 0) { + if (--FreememCounter == 0) { // no other operations in progress, can safely reclaim memory EraseList(lst); delete toDelete; @@ -151,7 +145,7 @@ class TLockFreeQueue: public TNonCopyable { while (ptr) { if (ptr == PrevFirst) { // short cut, we have copied this part already - AtomicSet(Tail->Next, newCopy); + Tail->Next.store(newCopy, std::memory_order_release); newCopy = Copy; Copy = nullptr; // do not destroy prev try if (!newTail) @@ -160,7 +154,7 @@ class TLockFreeQueue: public TNonCopyable { } TListNode* newElem = new TListNode(ptr->Data, newCopy); newCopy = newElem; - ptr = AtomicGet(ptr->Next); + ptr = ptr->Next.load(std::memory_order_acquire); if (!newTail) newTail = newElem; } @@ -174,20 +168,19 @@ class TLockFreeQueue: public TNonCopyable { void EnqueueImpl(TListNode* head, TListNode* tail) { TRootNode* newRoot = new TRootNode; AsyncRef(); - AtomicSet(newRoot->PushQueue, head); - for (;;) { - TRootNode* curRoot = AtomicGet(JobQueue); - AtomicSet(tail->Next, AtomicGet(curRoot->PushQueue)); - AtomicSet(newRoot->PopQueue, AtomicGet(curRoot->PopQueue)); + newRoot->PushQueue.store(head, std::memory_order_release); + for (TRootNode* curRoot = JobQueue.load(std::memory_order_acquire);;) { + tail->Next.store(curRoot->PushQueue.load(std::memory_order_acquire), std::memory_order_release); + newRoot->PopQueue.store(curRoot->PopQueue.load(std::memory_order_acquire), std::memory_order_release); newRoot->CopyCounter(curRoot); - for (TListNode* node = head;; node = AtomicGet(node->Next)) { + for (TListNode* node = head;; node = node->Next.load(std::memory_order_acquire)) { newRoot->IncCount(node->Data); if (node == tail) break; } - if (AtomicCas(&JobQueue, newRoot, curRoot)) { + if (JobQueue.compare_exchange_weak(curRoot, newRoot)) { AsyncUnref(curRoot, nullptr); break; } @@ -198,7 +191,7 @@ class TLockFreeQueue: public TNonCopyable { static void FillCollection(TListNode* lst, TCollection* res) { while (lst) { res->emplace_back(std::move(lst->Data)); - lst = AtomicGet(lst->Next); + lst = lst->Next.load(std::memory_order_acquire); } } @@ -215,7 +208,7 @@ class TLockFreeQueue: public TNonCopyable { do { TListNode* newElem = new TListNode(std::move(lst->Data), newCopy); newCopy = newElem; - lst = AtomicGet(lst->Next); + lst = lst->Next.load(std::memory_order_acquire); } while (lst); FillCollection(newCopy, res); @@ -235,8 +228,8 @@ public: ~TLockFreeQueue() { AsyncRef(); AsyncUnref(); // should free FreeList - EraseList(JobQueue->PushQueue); - EraseList(JobQueue->PopQueue); + EraseList(JobQueue.load(std::memory_order_relaxed)->PushQueue.load(std::memory_order_relaxed)); + EraseList(JobQueue.load(std::memory_order_relaxed)->PopQueue.load(std::memory_order_relaxed)); delete JobQueue; } template <typename U> @@ -262,8 +255,8 @@ public: return; TIter i = dataBegin; - TListNode* volatile node = new TListNode(*i); - TListNode* volatile tail = node; + TListNode* node = new TListNode(*i); + TListNode* tail = node; for (++i; i != dataEnd; ++i) { TListNode* nextNode = node; @@ -275,28 +268,27 @@ public: TRootNode* newRoot = nullptr; TListInvertor listInvertor; AsyncRef(); - for (;;) { - TRootNode* curRoot = AtomicGet(JobQueue); - TListNode* tail = AtomicGet(curRoot->PopQueue); + for (TRootNode* curRoot = JobQueue.load(std::memory_order_acquire);;) { + TListNode* tail = curRoot->PopQueue.load(std::memory_order_acquire); if (tail) { // has elems to pop if (!newRoot) newRoot = new TRootNode; - AtomicSet(newRoot->PushQueue, AtomicGet(curRoot->PushQueue)); - AtomicSet(newRoot->PopQueue, AtomicGet(tail->Next)); + newRoot->PushQueue.store(curRoot->PushQueue.load(std::memory_order_acquire), std::memory_order_release); + newRoot->PopQueue.store(tail->Next.load(std::memory_order_acquire), std::memory_order_release); newRoot->CopyCounter(curRoot); newRoot->DecCount(tail->Data); - Y_ASSERT(AtomicGet(curRoot->PopQueue) == tail); - if (AtomicCas(&JobQueue, newRoot, curRoot)) { + Y_ASSERT(curRoot->PopQueue.load() == tail); + if (JobQueue.compare_exchange_weak(curRoot, newRoot)) { *data = std::move(tail->Data); - AtomicSet(tail->Next, nullptr); + tail->Next.store(nullptr, std::memory_order_release); AsyncUnref(curRoot, tail); return true; } continue; } - if (AtomicGet(curRoot->PushQueue) == nullptr) { + if (curRoot->PushQueue.load(std::memory_order_acquire) == nullptr) { delete newRoot; AsyncUnref(); return false; // no elems to pop @@ -304,17 +296,18 @@ public: if (!newRoot) newRoot = new TRootNode; - AtomicSet(newRoot->PushQueue, nullptr); - listInvertor.DoCopy(AtomicGet(curRoot->PushQueue)); - AtomicSet(newRoot->PopQueue, listInvertor.Copy); + newRoot->PushQueue.store(nullptr, std::memory_order_release); + listInvertor.DoCopy(curRoot->PushQueue.load(std::memory_order_acquire)); + newRoot->PopQueue.store(listInvertor.Copy, std::memory_order_release); newRoot->CopyCounter(curRoot); - Y_ASSERT(AtomicGet(curRoot->PopQueue) == nullptr); - if (AtomicCas(&JobQueue, newRoot, curRoot)) { + Y_ASSERT(curRoot->PopQueue.load() == nullptr); + if (JobQueue.compare_exchange_weak(curRoot, newRoot)) { + AsyncDel(curRoot, curRoot->PushQueue.load(std::memory_order_acquire)); + curRoot = newRoot; newRoot = nullptr; listInvertor.CopyWasUsed(); - AsyncDel(curRoot, AtomicGet(curRoot->PushQueue)); } else { - AtomicSet(newRoot->PopQueue, nullptr); + newRoot->PopQueue.store(nullptr, std::memory_order_release); } } } @@ -323,36 +316,36 @@ public: AsyncRef(); TRootNode* newRoot = new TRootNode; - TRootNode* curRoot; + TRootNode* curRoot = JobQueue.load(std::memory_order_acquire); do { - curRoot = AtomicGet(JobQueue); - } while (!AtomicCas(&JobQueue, newRoot, curRoot)); + } while (!JobQueue.compare_exchange_weak(curRoot, newRoot)); FillCollection(curRoot->PopQueue, res); TListNode* toDeleteHead = curRoot->PushQueue; TListNode* toDeleteTail = FillCollectionReverse(curRoot->PushQueue, res); - AtomicSet(curRoot->PushQueue, nullptr); + curRoot->PushQueue.store(nullptr, std::memory_order_release); if (toDeleteTail) { - toDeleteTail->Next = curRoot->PopQueue; + toDeleteTail->Next.store(curRoot->PopQueue.load()); } else { toDeleteTail = curRoot->PopQueue; } - AtomicSet(curRoot->PopQueue, nullptr); + curRoot->PopQueue.store(nullptr, std::memory_order_release); AsyncUnref(curRoot, toDeleteHead); } bool IsEmpty() { AsyncRef(); - TRootNode* curRoot = AtomicGet(JobQueue); - bool res = AtomicGet(curRoot->PushQueue) == nullptr && AtomicGet(curRoot->PopQueue) == nullptr; + TRootNode* curRoot = JobQueue.load(std::memory_order_acquire); + bool res = curRoot->PushQueue.load(std::memory_order_acquire) == nullptr && + curRoot->PopQueue.load(std::memory_order_acquire) == nullptr; AsyncUnref(); return res; } TCounter GetCounter() { AsyncRef(); - TRootNode* curRoot = AtomicGet(JobQueue); + TRootNode* curRoot = JobQueue.load(std::memory_order_acquire); TCounter res = *(TCounter*)curRoot; AsyncUnref(); return res; diff --git a/util/thread/lfqueue_ut.cpp b/util/thread/lfqueue_ut.cpp index 83bca100cf..626f7d4091 100644 --- a/util/thread/lfqueue_ut.cpp +++ b/util/thread/lfqueue_ut.cpp @@ -4,7 +4,6 @@ #include <util/generic/algorithm.h> #include <util/generic/vector.h> #include <util/generic/ptr.h> -#include <util/system/atomic.h> #include <util/thread/pool.h> #include "lfqueue.h" @@ -211,8 +210,7 @@ Y_UNIT_TEST_SUITE(TLockFreeQueueTests) { }); } - TAtomic elementsLeft; - AtomicSet(elementsLeft, threadsNum * enqueuesPerThread); + std::atomic<size_t> elementsLeft = threadsNum * enqueuesPerThread; ui64 numOfConsumers = singleConsumer ? 1 : threadsNum; @@ -224,12 +222,12 @@ Y_UNIT_TEST_SUITE(TLockFreeQueueTests) { p.SafeAddFunc([&queue, &elementsLeft, promise, consumerData{&dataBuckets[i]}]() mutable { TVector<int> vec; - while (static_cast<i64>(AtomicGet(elementsLeft)) > 0) { + while (static_cast<i64>(elementsLeft.load()) > 0) { for (size_t i = 0; i != 100; ++i) { vec.clear(); queue.DequeueAll(&vec); - AtomicSub(elementsLeft, vec.size()); + elementsLeft -= vec.size(); consumerData->insert(consumerData->end(), vec.begin(), vec.end()); } } |