diff options
author | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/thread/lfstack.h |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/thread/lfstack.h')
-rw-r--r-- | util/thread/lfstack.h | 188 |
1 files changed, 188 insertions, 0 deletions
diff --git a/util/thread/lfstack.h b/util/thread/lfstack.h new file mode 100644 index 00000000000..ca3d95f3c39 --- /dev/null +++ b/util/thread/lfstack.h @@ -0,0 +1,188 @@ +#pragma once + +#include <util/generic/noncopyable.h> +#include <util/system/atomic.h> + +////////////////////////////// +// lock free lifo stack +template <class T> +class TLockFreeStack: TNonCopyable { + struct TNode { + T Value; + TNode* Next; + + TNode() = default; + + template <class U> + explicit TNode(U&& val) + : Value(std::forward<U>(val)) + , Next(nullptr) + { + } + }; + + TNode* Head; + TNode* FreePtr; + TAtomic DequeueCount; + + void TryToFreeMemory() { + TNode* current = AtomicGet(FreePtr); + if (!current) + return; + if (AtomicAdd(DequeueCount, 0) == 1) { + // node current is in free list, we are the last thread so try to cleanup + if (AtomicCas(&FreePtr, (TNode*)nullptr, current)) + EraseList(current); + } + } + void EraseList(TNode* volatile p) { + while (p) { + TNode* next = p->Next; + delete p; + p = next; + } + } + void EnqueueImpl(TNode* volatile head, TNode* volatile tail) { + for (;;) { + tail->Next = AtomicGet(Head); + if (AtomicCas(&Head, head, tail->Next)) + break; + } + } + template <class U> + void EnqueueImpl(U&& u) { + TNode* volatile node = new TNode(std::forward<U>(u)); + EnqueueImpl(node, node); + } + +public: + TLockFreeStack() + : Head(nullptr) + , FreePtr(nullptr) + , DequeueCount(0) + { + } + ~TLockFreeStack() { + EraseList(Head); + EraseList(FreePtr); + } + + void Enqueue(const T& t) { + EnqueueImpl(t); + } + + void Enqueue(T&& t) { + EnqueueImpl(std::move(t)); + } + + template <typename TCollection> + void EnqueueAll(const TCollection& data) { + EnqueueAll(data.begin(), data.end()); + } + template <typename TIter> + void EnqueueAll(TIter dataBegin, TIter dataEnd) { + if (dataBegin == dataEnd) { + return; + } + TIter i = dataBegin; + TNode* volatile node = new TNode(*i); + TNode* volatile tail = node; + + for (++i; i != dataEnd; ++i) { + TNode* nextNode = node; + node = new TNode(*i); + node->Next = nextNode; + } + EnqueueImpl(node, tail); + } + bool Dequeue(T* res) { + AtomicAdd(DequeueCount, 1); + for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { + if (AtomicCas(&Head, AtomicGet(current->Next), current)) { + *res = std::move(current->Value); + // delete current; // ABA problem + // even more complex node deletion + TryToFreeMemory(); + if (AtomicAdd(DequeueCount, -1) == 0) { + // no other Dequeue()s, can safely reclaim memory + delete current; + } else { + // Dequeue()s in progress, put node to free list + for (;;) { + AtomicSet(current->Next, AtomicGet(FreePtr)); + if (AtomicCas(&FreePtr, current, current->Next)) + break; + } + } + return true; + } + } + TryToFreeMemory(); + AtomicAdd(DequeueCount, -1); + return false; + } + // add all elements to *res + // elements are returned in order of dequeue (top to bottom; see example in unittest) + template <typename TCollection> + void DequeueAll(TCollection* res) { + AtomicAdd(DequeueCount, 1); + for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { + if (AtomicCas(&Head, (TNode*)nullptr, current)) { + for (TNode* x = current; x;) { + res->push_back(std::move(x->Value)); + x = x->Next; + } + // EraseList(current); // ABA problem + // even more complex node deletion + TryToFreeMemory(); + if (AtomicAdd(DequeueCount, -1) == 0) { + // no other Dequeue()s, can safely reclaim memory + EraseList(current); + } else { + // Dequeue()s in progress, add nodes list to free list + TNode* currentLast = current; + while (currentLast->Next) { + currentLast = currentLast->Next; + } + for (;;) { + AtomicSet(currentLast->Next, AtomicGet(FreePtr)); + if (AtomicCas(&FreePtr, current, currentLast->Next)) + break; + } + } + return; + } + } + TryToFreeMemory(); + AtomicAdd(DequeueCount, -1); + } + bool DequeueSingleConsumer(T* res) { + for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { + if (AtomicCas(&Head, current->Next, current)) { + *res = std::move(current->Value); + delete current; // with single consumer thread ABA does not happen + return true; + } + } + return false; + } + // add all elements to *res + // elements are returned in order of dequeue (top to bottom; see example in unittest) + template <typename TCollection> + void DequeueAllSingleConsumer(TCollection* res) { + for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { + if (AtomicCas(&Head, (TNode*)nullptr, current)) { + for (TNode* x = current; x;) { + res->push_back(std::move(x->Value)); + x = x->Next; + } + EraseList(current); // with single consumer thread ABA does not happen + return; + } + } + } + bool IsEmpty() { + AtomicAdd(DequeueCount, 0); // mem barrier + return AtomicGet(Head) == nullptr; // without lock, so result is approximate + } +}; |