diff options
author | a-romanov <a-romanov@yandex-team.ru> | 2022-02-10 16:48:10 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:10 +0300 |
commit | aa2986a34bde73b2cdcea5080c4443b7cf2ba686 (patch) | |
tree | 410fbde59311309b774a0da147f79628c3429a2c /library/cpp/actors/util/funnel_queue.h | |
parent | e77cfd118321c5b9c168fdee41b4e6c5706b8f68 (diff) | |
download | ydb-aa2986a34bde73b2cdcea5080c4443b7cf2ba686.tar.gz |
Restoring authorship annotation for <a-romanov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/util/funnel_queue.h')
-rw-r--r-- | library/cpp/actors/util/funnel_queue.h | 228 |
1 files changed, 114 insertions, 114 deletions
diff --git a/library/cpp/actors/util/funnel_queue.h b/library/cpp/actors/util/funnel_queue.h index 0e21e2617c..35dec61f2a 100644 --- a/library/cpp/actors/util/funnel_queue.h +++ b/library/cpp/actors/util/funnel_queue.h @@ -1,96 +1,96 @@ -#pragma once - -#include <util/system/atomic.h> -#include <util/generic/noncopyable.h> - +#pragma once + +#include <util/system/atomic.h> +#include <util/generic/noncopyable.h> + template <typename ElementType> class TFunnelQueue: private TNonCopyable { -public: - TFunnelQueue() noexcept +public: + TFunnelQueue() noexcept : Front(nullptr) , Back(nullptr) { } - + virtual ~TFunnelQueue() noexcept { - for (auto entry = Front; entry; entry = DeleteEntry(entry)) - continue; - } - - /// Push element. Can be used from many threads. Return true if is first element. - bool + for (auto entry = Front; entry; entry = DeleteEntry(entry)) + continue; + } + + /// Push element. Can be used from many threads. Return true if is first element. + bool Push(ElementType&& element) noexcept { TEntry* const next = NewEntry(static_cast<ElementType&&>(element)); TEntry* const prev = AtomicSwap(&Back, next); AtomicSet(prev ? prev->Next : Front, next); - return !prev; - } - - /// Extract top element. Must be used only from one thread. Return true if have more. - bool + return !prev; + } + + /// Extract top element. Must be used only from one thread. Return true if have more. + bool Pop() noexcept { if (TEntry* const top = AtomicGet(Front)) { - const auto last = AtomicCas(&Back, nullptr, top); - if (last) // This is last element in queue. Queue is empty now. - AtomicCas(&Front, nullptr, top); - else // This element is not last. - for (;;) { - if (const auto next = AtomicGet(top->Next)) { - AtomicSet(Front, next); - break; - } - // But Next is null. Wait next assignment in spin lock. - } - - DeleteEntry(top); - return !last; - } - - return false; - } - - /// Peek top element. Must be used only from one thread. - ElementType& + const auto last = AtomicCas(&Back, nullptr, top); + if (last) // This is last element in queue. Queue is empty now. + AtomicCas(&Front, nullptr, top); + else // This element is not last. + for (;;) { + if (const auto next = AtomicGet(top->Next)) { + AtomicSet(Front, next); + break; + } + // But Next is null. Wait next assignment in spin lock. + } + + DeleteEntry(top); + return !last; + } + + return false; + } + + /// Peek top element. Must be used only from one thread. + ElementType& Top() const noexcept { return AtomicGet(Front)->Data; - } - - bool + } + + bool IsEmpty() const noexcept { - return !AtomicGet(Front); - } - -protected: + return !AtomicGet(Front); + } + +protected: class TEntry: private TNonCopyable { friend class TFunnelQueue; - private: - explicit TEntry(ElementType&& element) noexcept + private: + explicit TEntry(ElementType&& element) noexcept : Data(static_cast<ElementType&&>(element)) , Next(nullptr) { } - + ~TEntry() noexcept { } - - public: - ElementType Data; + + public: + ElementType Data; TEntry* volatile Next; - }; - + }; + TEntry* volatile Front; TEntry* volatile Back; - + virtual TEntry* NewEntry(ElementType&& element) noexcept { - return new TEntry(static_cast<ElementType&&>(element)); - } - + return new TEntry(static_cast<ElementType&&>(element)); + } + virtual TEntry* DeleteEntry(TEntry* entry) noexcept { - const auto next = entry->Next; - delete entry; - return next; - } + const auto next = entry->Next; + delete entry; + return next; + } protected: struct TEntryIter { @@ -166,75 +166,75 @@ public: const_iterator end() const { return {nullptr}; } -}; - +}; + template <typename ElementType> class TPooledFunnelQueue: public TFunnelQueue<ElementType> { -public: - TPooledFunnelQueue() noexcept - : Stack(nullptr) +public: + TPooledFunnelQueue() noexcept + : Stack(nullptr) { } - + virtual ~TPooledFunnelQueue() noexcept override { - for (auto entry = TBase::Front; entry; entry = TBase::DeleteEntry(entry)) - continue; - for (auto entry = Stack; entry; entry = TBase::DeleteEntry(entry)) - continue; - TBase::Back = TBase::Front = Stack = nullptr; - } - -private: - typedef TFunnelQueue<ElementType> TBase; - + for (auto entry = TBase::Front; entry; entry = TBase::DeleteEntry(entry)) + continue; + for (auto entry = Stack; entry; entry = TBase::DeleteEntry(entry)) + continue; + TBase::Back = TBase::Front = Stack = nullptr; + } + +private: + typedef TFunnelQueue<ElementType> TBase; + typename TBase::TEntry* volatile Stack; - -protected: + +protected: virtual typename TBase::TEntry* NewEntry(ElementType&& element) noexcept override { while (const auto top = AtomicGet(Stack)) if (AtomicCas(&Stack, top->Next, top)) { - top->Data = static_cast<ElementType&&>(element); - AtomicSet(top->Next, nullptr); - return top; - } - - return TBase::NewEntry(static_cast<ElementType&&>(element)); - } - + top->Data = static_cast<ElementType&&>(element); + AtomicSet(top->Next, nullptr); + return top; + } + + return TBase::NewEntry(static_cast<ElementType&&>(element)); + } + virtual typename TBase::TEntry* DeleteEntry(typename TBase::TEntry* entry) noexcept override { - entry->Data = ElementType(); - const auto next = entry->Next; + entry->Data = ElementType(); + const auto next = entry->Next; do AtomicSet(entry->Next, AtomicGet(Stack)); while (!AtomicCas(&Stack, entry, entry->Next)); - return next; - } -}; - + return next; + } +}; + template <typename ElementType, template <typename T> class TQueueType = TFunnelQueue> class TCountedFunnelQueue: public TQueueType<ElementType> { -public: - TCountedFunnelQueue() noexcept - : Count(0) +public: + TCountedFunnelQueue() noexcept + : Count(0) { } - + TAtomicBase GetSize() const noexcept { - return AtomicGet(Count); - } - -private: + return AtomicGet(Count); + } + +private: typedef TQueueType<ElementType> TBase; - + virtual typename TBase::TEntry* NewEntry(ElementType&& element) noexcept override { - AtomicAdd(Count, 1); - return TBase::NewEntry(static_cast<ElementType&&>(element)); - } - + AtomicAdd(Count, 1); + return TBase::NewEntry(static_cast<ElementType&&>(element)); + } + virtual typename TBase::TEntry* DeleteEntry(typename TBase::TEntry* entry) noexcept override { - AtomicSub(Count, 1); - return TBase::DeleteEntry(entry); - } - - TAtomic Count; -}; + AtomicSub(Count, 1); + return TBase::DeleteEntry(entry); + } + + TAtomic Count; +}; |