summaryrefslogtreecommitdiffstats
path: root/util/thread/lfstack.h
diff options
context:
space:
mode:
authorDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/thread/lfstack.h
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/thread/lfstack.h')
-rw-r--r--util/thread/lfstack.h188
1 files changed, 188 insertions, 0 deletions
diff --git a/util/thread/lfstack.h b/util/thread/lfstack.h
new file mode 100644
index 00000000000..ca3d95f3c39
--- /dev/null
+++ b/util/thread/lfstack.h
@@ -0,0 +1,188 @@
+#pragma once
+
+#include <util/generic/noncopyable.h>
+#include <util/system/atomic.h>
+
+//////////////////////////////
+// lock free lifo stack
+template <class T>
+class TLockFreeStack: TNonCopyable {
+ struct TNode {
+ T Value;
+ TNode* Next;
+
+ TNode() = default;
+
+ template <class U>
+ explicit TNode(U&& val)
+ : Value(std::forward<U>(val))
+ , Next(nullptr)
+ {
+ }
+ };
+
+ TNode* Head;
+ TNode* FreePtr;
+ TAtomic DequeueCount;
+
+ void TryToFreeMemory() {
+ TNode* current = AtomicGet(FreePtr);
+ if (!current)
+ return;
+ if (AtomicAdd(DequeueCount, 0) == 1) {
+ // node current is in free list, we are the last thread so try to cleanup
+ if (AtomicCas(&FreePtr, (TNode*)nullptr, current))
+ EraseList(current);
+ }
+ }
+ void EraseList(TNode* volatile p) {
+ while (p) {
+ TNode* next = p->Next;
+ delete p;
+ p = next;
+ }
+ }
+ void EnqueueImpl(TNode* volatile head, TNode* volatile tail) {
+ for (;;) {
+ tail->Next = AtomicGet(Head);
+ if (AtomicCas(&Head, head, tail->Next))
+ break;
+ }
+ }
+ template <class U>
+ void EnqueueImpl(U&& u) {
+ TNode* volatile node = new TNode(std::forward<U>(u));
+ EnqueueImpl(node, node);
+ }
+
+public:
+ TLockFreeStack()
+ : Head(nullptr)
+ , FreePtr(nullptr)
+ , DequeueCount(0)
+ {
+ }
+ ~TLockFreeStack() {
+ EraseList(Head);
+ EraseList(FreePtr);
+ }
+
+ void Enqueue(const T& t) {
+ EnqueueImpl(t);
+ }
+
+ void Enqueue(T&& t) {
+ EnqueueImpl(std::move(t));
+ }
+
+ template <typename TCollection>
+ void EnqueueAll(const TCollection& data) {
+ EnqueueAll(data.begin(), data.end());
+ }
+ template <typename TIter>
+ void EnqueueAll(TIter dataBegin, TIter dataEnd) {
+ if (dataBegin == dataEnd) {
+ return;
+ }
+ TIter i = dataBegin;
+ TNode* volatile node = new TNode(*i);
+ TNode* volatile tail = node;
+
+ for (++i; i != dataEnd; ++i) {
+ TNode* nextNode = node;
+ node = new TNode(*i);
+ node->Next = nextNode;
+ }
+ EnqueueImpl(node, tail);
+ }
+ bool Dequeue(T* res) {
+ AtomicAdd(DequeueCount, 1);
+ for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
+ if (AtomicCas(&Head, AtomicGet(current->Next), current)) {
+ *res = std::move(current->Value);
+ // delete current; // ABA problem
+ // even more complex node deletion
+ TryToFreeMemory();
+ if (AtomicAdd(DequeueCount, -1) == 0) {
+ // no other Dequeue()s, can safely reclaim memory
+ delete current;
+ } else {
+ // Dequeue()s in progress, put node to free list
+ for (;;) {
+ AtomicSet(current->Next, AtomicGet(FreePtr));
+ if (AtomicCas(&FreePtr, current, current->Next))
+ break;
+ }
+ }
+ return true;
+ }
+ }
+ TryToFreeMemory();
+ AtomicAdd(DequeueCount, -1);
+ return false;
+ }
+ // add all elements to *res
+ // elements are returned in order of dequeue (top to bottom; see example in unittest)
+ template <typename TCollection>
+ void DequeueAll(TCollection* res) {
+ AtomicAdd(DequeueCount, 1);
+ for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
+ if (AtomicCas(&Head, (TNode*)nullptr, current)) {
+ for (TNode* x = current; x;) {
+ res->push_back(std::move(x->Value));
+ x = x->Next;
+ }
+ // EraseList(current); // ABA problem
+ // even more complex node deletion
+ TryToFreeMemory();
+ if (AtomicAdd(DequeueCount, -1) == 0) {
+ // no other Dequeue()s, can safely reclaim memory
+ EraseList(current);
+ } else {
+ // Dequeue()s in progress, add nodes list to free list
+ TNode* currentLast = current;
+ while (currentLast->Next) {
+ currentLast = currentLast->Next;
+ }
+ for (;;) {
+ AtomicSet(currentLast->Next, AtomicGet(FreePtr));
+ if (AtomicCas(&FreePtr, current, currentLast->Next))
+ break;
+ }
+ }
+ return;
+ }
+ }
+ TryToFreeMemory();
+ AtomicAdd(DequeueCount, -1);
+ }
+ bool DequeueSingleConsumer(T* res) {
+ for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
+ if (AtomicCas(&Head, current->Next, current)) {
+ *res = std::move(current->Value);
+ delete current; // with single consumer thread ABA does not happen
+ return true;
+ }
+ }
+ return false;
+ }
+ // add all elements to *res
+ // elements are returned in order of dequeue (top to bottom; see example in unittest)
+ template <typename TCollection>
+ void DequeueAllSingleConsumer(TCollection* res) {
+ for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
+ if (AtomicCas(&Head, (TNode*)nullptr, current)) {
+ for (TNode* x = current; x;) {
+ res->push_back(std::move(x->Value));
+ x = x->Next;
+ }
+ EraseList(current); // with single consumer thread ABA does not happen
+ return;
+ }
+ }
+ }
+ bool IsEmpty() {
+ AtomicAdd(DequeueCount, 0); // mem barrier
+ return AtomicGet(Head) == nullptr; // without lock, so result is approximate
+ }
+};