diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/thread/lfqueue.h | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/thread/lfqueue.h')
-rw-r--r-- | util/thread/lfqueue.h | 406 |
1 files changed, 406 insertions, 0 deletions
diff --git a/util/thread/lfqueue.h b/util/thread/lfqueue.h new file mode 100644 index 0000000000..ab523631e4 --- /dev/null +++ b/util/thread/lfqueue.h @@ -0,0 +1,406 @@ +#pragma once + +#include "fwd.h" + +#include <util/generic/ptr.h> +#include <util/system/atomic.h> +#include <util/system/yassert.h> +#include "lfstack.h" + +struct TDefaultLFCounter { + template <class T> + void IncCount(const T& data) { + (void)data; + } + template <class T> + void DecCount(const T& data) { + (void)data; + } +}; + +// @brief lockfree queue +// @tparam T - the queue element, should be movable +// @tparam TCounter, a observer class to count number of items in queue +// be carifull, IncCount and DecCount can be called on a moved object and +// it is TCounter class responsibility to check validity of passed object +template <class T, class TCounter> +class TLockFreeQueue: public TNonCopyable { + struct TListNode { + template <typename U> + TListNode(U&& u, TListNode* next) + : Next(next) + , Data(std::forward<U>(u)) + { + } + + template <typename U> + explicit TListNode(U&& u) + : Data(std::forward<U>(u)) + { + } + + TListNode* volatile Next; + T Data; + }; + + // using inheritance to be able to use 0 bytes for TCounter when we don't need one + struct TRootNode: public TCounter { + TListNode* volatile PushQueue; + TListNode* volatile PopQueue; + TListNode* volatile ToDelete; + TRootNode* volatile NextFree; + + TRootNode() + : PushQueue(nullptr) + , PopQueue(nullptr) + , ToDelete(nullptr) + , NextFree(nullptr) + { + } + void CopyCounter(TRootNode* x) { + *(TCounter*)this = *(TCounter*)x; + } + }; + + static void EraseList(TListNode* n) { + while (n) { + TListNode* keepNext = AtomicGet(n->Next); + delete n; + n = keepNext; + } + } + + alignas(64) TRootNode* volatile JobQueue; + alignas(64) volatile TAtomic FreememCounter; + alignas(64) volatile TAtomic FreeingTaskCounter; + alignas(64) TRootNode* volatile FreePtr; + + void TryToFreeAsyncMemory() { + TAtomic keepCounter = AtomicAdd(FreeingTaskCounter, 0); + TRootNode* current = AtomicGet(FreePtr); + if (current == nullptr) + return; + if (AtomicAdd(FreememCounter, 0) == 1) { + // we are the last thread, try to cleanup + // check if another thread have cleaned up + if (keepCounter != AtomicAdd(FreeingTaskCounter, 0)) { + return; + } + if (AtomicCas(&FreePtr, (TRootNode*)nullptr, current)) { + // free list + while (current) { + TRootNode* p = AtomicGet(current->NextFree); + EraseList(AtomicGet(current->ToDelete)); + delete current; + current = p; + } + AtomicAdd(FreeingTaskCounter, 1); + } + } + } + void AsyncRef() { + AtomicAdd(FreememCounter, 1); + } + void AsyncUnref() { + TryToFreeAsyncMemory(); + AtomicAdd(FreememCounter, -1); + } + void AsyncDel(TRootNode* toDelete, TListNode* lst) { + AtomicSet(toDelete->ToDelete, lst); + for (;;) { + AtomicSet(toDelete->NextFree, AtomicGet(FreePtr)); + if (AtomicCas(&FreePtr, toDelete, AtomicGet(toDelete->NextFree))) + break; + } + } + void AsyncUnref(TRootNode* toDelete, TListNode* lst) { + TryToFreeAsyncMemory(); + if (AtomicAdd(FreememCounter, -1) == 0) { + // no other operations in progress, can safely reclaim memory + EraseList(lst); + delete toDelete; + } else { + // Dequeue()s in progress, put node to free list + AsyncDel(toDelete, lst); + } + } + + struct TListInvertor { + TListNode* Copy; + TListNode* Tail; + TListNode* PrevFirst; + + TListInvertor() + : Copy(nullptr) + , Tail(nullptr) + , PrevFirst(nullptr) + { + } + ~TListInvertor() { + EraseList(Copy); + } + void CopyWasUsed() { + Copy = nullptr; + Tail = nullptr; + PrevFirst = nullptr; + } + void DoCopy(TListNode* ptr) { + TListNode* newFirst = ptr; + TListNode* newCopy = nullptr; + TListNode* newTail = nullptr; + while (ptr) { + if (ptr == PrevFirst) { + // short cut, we have copied this part already + AtomicSet(Tail->Next, newCopy); + newCopy = Copy; + Copy = nullptr; // do not destroy prev try + if (!newTail) + newTail = Tail; // tried to invert same list + break; + } + TListNode* newElem = new TListNode(ptr->Data, newCopy); + newCopy = newElem; + ptr = AtomicGet(ptr->Next); + if (!newTail) + newTail = newElem; + } + EraseList(Copy); // copy was useless + Copy = newCopy; + PrevFirst = newFirst; + Tail = newTail; + } + }; + + void EnqueueImpl(TListNode* head, TListNode* tail) { + TRootNode* newRoot = new TRootNode; + AsyncRef(); + AtomicSet(newRoot->PushQueue, head); + for (;;) { + TRootNode* curRoot = AtomicGet(JobQueue); + AtomicSet(tail->Next, AtomicGet(curRoot->PushQueue)); + AtomicSet(newRoot->PopQueue, AtomicGet(curRoot->PopQueue)); + newRoot->CopyCounter(curRoot); + + for (TListNode* node = head;; node = AtomicGet(node->Next)) { + newRoot->IncCount(node->Data); + if (node == tail) + break; + } + + if (AtomicCas(&JobQueue, newRoot, curRoot)) { + AsyncUnref(curRoot, nullptr); + break; + } + } + } + + template <typename TCollection> + static void FillCollection(TListNode* lst, TCollection* res) { + while (lst) { + res->emplace_back(std::move(lst->Data)); + lst = AtomicGet(lst->Next); + } + } + + /** Traverses a given list simultaneously creating its inversed version. + * After that, fills a collection with a reversed version and returns the last visited lst's node. + */ + template <typename TCollection> + static TListNode* FillCollectionReverse(TListNode* lst, TCollection* res) { + if (!lst) { + return nullptr; + } + + TListNode* newCopy = nullptr; + do { + TListNode* newElem = new TListNode(std::move(lst->Data), newCopy); + newCopy = newElem; + lst = AtomicGet(lst->Next); + } while (lst); + + FillCollection(newCopy, res); + EraseList(newCopy); + + return lst; + } + +public: + TLockFreeQueue() + : JobQueue(new TRootNode) + , FreememCounter(0) + , FreeingTaskCounter(0) + , FreePtr(nullptr) + { + } + ~TLockFreeQueue() { + AsyncRef(); + AsyncUnref(); // should free FreeList + EraseList(JobQueue->PushQueue); + EraseList(JobQueue->PopQueue); + delete JobQueue; + } + template <typename U> + void Enqueue(U&& data) { + TListNode* newNode = new TListNode(std::forward<U>(data)); + EnqueueImpl(newNode, newNode); + } + void Enqueue(T&& data) { + TListNode* newNode = new TListNode(std::move(data)); + EnqueueImpl(newNode, newNode); + } + void Enqueue(const T& data) { + TListNode* newNode = new TListNode(data); + EnqueueImpl(newNode, newNode); + } + template <typename TCollection> + void EnqueueAll(const TCollection& data) { + EnqueueAll(data.begin(), data.end()); + } + template <typename TIter> + void EnqueueAll(TIter dataBegin, TIter dataEnd) { + if (dataBegin == dataEnd) + return; + + TIter i = dataBegin; + TListNode* volatile node = new TListNode(*i); + TListNode* volatile tail = node; + + for (++i; i != dataEnd; ++i) { + TListNode* nextNode = node; + node = new TListNode(*i, nextNode); + } + EnqueueImpl(node, tail); + } + bool Dequeue(T* data) { + TRootNode* newRoot = nullptr; + TListInvertor listInvertor; + AsyncRef(); + for (;;) { + TRootNode* curRoot = AtomicGet(JobQueue); + TListNode* tail = AtomicGet(curRoot->PopQueue); + if (tail) { + // has elems to pop + if (!newRoot) + newRoot = new TRootNode; + + AtomicSet(newRoot->PushQueue, AtomicGet(curRoot->PushQueue)); + AtomicSet(newRoot->PopQueue, AtomicGet(tail->Next)); + newRoot->CopyCounter(curRoot); + newRoot->DecCount(tail->Data); + Y_ASSERT(AtomicGet(curRoot->PopQueue) == tail); + if (AtomicCas(&JobQueue, newRoot, curRoot)) { + *data = std::move(tail->Data); + AtomicSet(tail->Next, nullptr); + AsyncUnref(curRoot, tail); + return true; + } + continue; + } + if (AtomicGet(curRoot->PushQueue) == nullptr) { + delete newRoot; + AsyncUnref(); + return false; // no elems to pop + } + + if (!newRoot) + newRoot = new TRootNode; + AtomicSet(newRoot->PushQueue, nullptr); + listInvertor.DoCopy(AtomicGet(curRoot->PushQueue)); + AtomicSet(newRoot->PopQueue, listInvertor.Copy); + newRoot->CopyCounter(curRoot); + Y_ASSERT(AtomicGet(curRoot->PopQueue) == nullptr); + if (AtomicCas(&JobQueue, newRoot, curRoot)) { + newRoot = nullptr; + listInvertor.CopyWasUsed(); + AsyncDel(curRoot, AtomicGet(curRoot->PushQueue)); + } else { + AtomicSet(newRoot->PopQueue, nullptr); + } + } + } + template <typename TCollection> + void DequeueAll(TCollection* res) { + AsyncRef(); + + TRootNode* newRoot = new TRootNode; + TRootNode* curRoot; + do { + curRoot = AtomicGet(JobQueue); + } while (!AtomicCas(&JobQueue, newRoot, curRoot)); + + FillCollection(curRoot->PopQueue, res); + + TListNode* toDeleteHead = curRoot->PushQueue; + TListNode* toDeleteTail = FillCollectionReverse(curRoot->PushQueue, res); + AtomicSet(curRoot->PushQueue, nullptr); + + if (toDeleteTail) { + toDeleteTail->Next = curRoot->PopQueue; + } else { + toDeleteTail = curRoot->PopQueue; + } + AtomicSet(curRoot->PopQueue, nullptr); + + AsyncUnref(curRoot, toDeleteHead); + } + bool IsEmpty() { + AsyncRef(); + TRootNode* curRoot = AtomicGet(JobQueue); + bool res = AtomicGet(curRoot->PushQueue) == nullptr && AtomicGet(curRoot->PopQueue) == nullptr; + AsyncUnref(); + return res; + } + TCounter GetCounter() { + AsyncRef(); + TRootNode* curRoot = AtomicGet(JobQueue); + TCounter res = *(TCounter*)curRoot; + AsyncUnref(); + return res; + } +}; + +template <class T, class TCounter> +class TAutoLockFreeQueue { +public: + using TRef = THolder<T>; + + inline ~TAutoLockFreeQueue() { + TRef tmp; + + while (Dequeue(&tmp)) { + } + } + + inline bool Dequeue(TRef* t) { + T* res = nullptr; + + if (Queue.Dequeue(&res)) { + t->Reset(res); + + return true; + } + + return false; + } + + inline void Enqueue(TRef& t) { + Queue.Enqueue(t.Get()); + Y_UNUSED(t.Release()); + } + + inline void Enqueue(TRef&& t) { + Queue.Enqueue(t.Get()); + Y_UNUSED(t.Release()); + } + + inline bool IsEmpty() { + return Queue.IsEmpty(); + } + + inline TCounter GetCounter() { + return Queue.GetCounter(); + } + +private: + TLockFreeQueue<T*, TCounter> Queue; +}; |