aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/util/funnel_queue.h
diff options
context:
space:
mode:
authora-romanov <a-romanov@yandex-team.ru>2022-02-10 16:48:10 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:10 +0300
commitaa2986a34bde73b2cdcea5080c4443b7cf2ba686 (patch)
tree410fbde59311309b774a0da147f79628c3429a2c /library/cpp/actors/util/funnel_queue.h
parente77cfd118321c5b9c168fdee41b4e6c5706b8f68 (diff)
downloadydb-aa2986a34bde73b2cdcea5080c4443b7cf2ba686.tar.gz
Restoring authorship annotation for <a-romanov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/util/funnel_queue.h')
-rw-r--r--library/cpp/actors/util/funnel_queue.h228
1 files changed, 114 insertions, 114 deletions
diff --git a/library/cpp/actors/util/funnel_queue.h b/library/cpp/actors/util/funnel_queue.h
index 0e21e2617c..35dec61f2a 100644
--- a/library/cpp/actors/util/funnel_queue.h
+++ b/library/cpp/actors/util/funnel_queue.h
@@ -1,96 +1,96 @@
-#pragma once
-
-#include <util/system/atomic.h>
-#include <util/generic/noncopyable.h>
-
+#pragma once
+
+#include <util/system/atomic.h>
+#include <util/generic/noncopyable.h>
+
template <typename ElementType>
class TFunnelQueue: private TNonCopyable {
-public:
- TFunnelQueue() noexcept
+public:
+ TFunnelQueue() noexcept
: Front(nullptr)
, Back(nullptr)
{
}
-
+
virtual ~TFunnelQueue() noexcept {
- for (auto entry = Front; entry; entry = DeleteEntry(entry))
- continue;
- }
-
- /// Push element. Can be used from many threads. Return true if is first element.
- bool
+ for (auto entry = Front; entry; entry = DeleteEntry(entry))
+ continue;
+ }
+
+ /// Push element. Can be used from many threads. Return true if is first element.
+ bool
Push(ElementType&& element) noexcept {
TEntry* const next = NewEntry(static_cast<ElementType&&>(element));
TEntry* const prev = AtomicSwap(&Back, next);
AtomicSet(prev ? prev->Next : Front, next);
- return !prev;
- }
-
- /// Extract top element. Must be used only from one thread. Return true if have more.
- bool
+ return !prev;
+ }
+
+ /// Extract top element. Must be used only from one thread. Return true if have more.
+ bool
Pop() noexcept {
if (TEntry* const top = AtomicGet(Front)) {
- const auto last = AtomicCas(&Back, nullptr, top);
- if (last) // This is last element in queue. Queue is empty now.
- AtomicCas(&Front, nullptr, top);
- else // This element is not last.
- for (;;) {
- if (const auto next = AtomicGet(top->Next)) {
- AtomicSet(Front, next);
- break;
- }
- // But Next is null. Wait next assignment in spin lock.
- }
-
- DeleteEntry(top);
- return !last;
- }
-
- return false;
- }
-
- /// Peek top element. Must be used only from one thread.
- ElementType&
+ const auto last = AtomicCas(&Back, nullptr, top);
+ if (last) // This is last element in queue. Queue is empty now.
+ AtomicCas(&Front, nullptr, top);
+ else // This element is not last.
+ for (;;) {
+ if (const auto next = AtomicGet(top->Next)) {
+ AtomicSet(Front, next);
+ break;
+ }
+ // But Next is null. Wait next assignment in spin lock.
+ }
+
+ DeleteEntry(top);
+ return !last;
+ }
+
+ return false;
+ }
+
+ /// Peek top element. Must be used only from one thread.
+ ElementType&
Top() const noexcept {
return AtomicGet(Front)->Data;
- }
-
- bool
+ }
+
+ bool
IsEmpty() const noexcept {
- return !AtomicGet(Front);
- }
-
-protected:
+ return !AtomicGet(Front);
+ }
+
+protected:
class TEntry: private TNonCopyable {
friend class TFunnelQueue;
- private:
- explicit TEntry(ElementType&& element) noexcept
+ private:
+ explicit TEntry(ElementType&& element) noexcept
: Data(static_cast<ElementType&&>(element))
, Next(nullptr)
{
}
-
+
~TEntry() noexcept {
}
-
- public:
- ElementType Data;
+
+ public:
+ ElementType Data;
TEntry* volatile Next;
- };
-
+ };
+
TEntry* volatile Front;
TEntry* volatile Back;
-
+
virtual TEntry* NewEntry(ElementType&& element) noexcept {
- return new TEntry(static_cast<ElementType&&>(element));
- }
-
+ return new TEntry(static_cast<ElementType&&>(element));
+ }
+
virtual TEntry* DeleteEntry(TEntry* entry) noexcept {
- const auto next = entry->Next;
- delete entry;
- return next;
- }
+ const auto next = entry->Next;
+ delete entry;
+ return next;
+ }
protected:
struct TEntryIter {
@@ -166,75 +166,75 @@ public:
const_iterator end() const {
return {nullptr};
}
-};
-
+};
+
template <typename ElementType>
class TPooledFunnelQueue: public TFunnelQueue<ElementType> {
-public:
- TPooledFunnelQueue() noexcept
- : Stack(nullptr)
+public:
+ TPooledFunnelQueue() noexcept
+ : Stack(nullptr)
{
}
-
+
virtual ~TPooledFunnelQueue() noexcept override {
- for (auto entry = TBase::Front; entry; entry = TBase::DeleteEntry(entry))
- continue;
- for (auto entry = Stack; entry; entry = TBase::DeleteEntry(entry))
- continue;
- TBase::Back = TBase::Front = Stack = nullptr;
- }
-
-private:
- typedef TFunnelQueue<ElementType> TBase;
-
+ for (auto entry = TBase::Front; entry; entry = TBase::DeleteEntry(entry))
+ continue;
+ for (auto entry = Stack; entry; entry = TBase::DeleteEntry(entry))
+ continue;
+ TBase::Back = TBase::Front = Stack = nullptr;
+ }
+
+private:
+ typedef TFunnelQueue<ElementType> TBase;
+
typename TBase::TEntry* volatile Stack;
-
-protected:
+
+protected:
virtual typename TBase::TEntry* NewEntry(ElementType&& element) noexcept override {
while (const auto top = AtomicGet(Stack))
if (AtomicCas(&Stack, top->Next, top)) {
- top->Data = static_cast<ElementType&&>(element);
- AtomicSet(top->Next, nullptr);
- return top;
- }
-
- return TBase::NewEntry(static_cast<ElementType&&>(element));
- }
-
+ top->Data = static_cast<ElementType&&>(element);
+ AtomicSet(top->Next, nullptr);
+ return top;
+ }
+
+ return TBase::NewEntry(static_cast<ElementType&&>(element));
+ }
+
virtual typename TBase::TEntry* DeleteEntry(typename TBase::TEntry* entry) noexcept override {
- entry->Data = ElementType();
- const auto next = entry->Next;
+ entry->Data = ElementType();
+ const auto next = entry->Next;
do
AtomicSet(entry->Next, AtomicGet(Stack));
while (!AtomicCas(&Stack, entry, entry->Next));
- return next;
- }
-};
-
+ return next;
+ }
+};
+
template <typename ElementType, template <typename T> class TQueueType = TFunnelQueue>
class TCountedFunnelQueue: public TQueueType<ElementType> {
-public:
- TCountedFunnelQueue() noexcept
- : Count(0)
+public:
+ TCountedFunnelQueue() noexcept
+ : Count(0)
{
}
-
+
TAtomicBase GetSize() const noexcept {
- return AtomicGet(Count);
- }
-
-private:
+ return AtomicGet(Count);
+ }
+
+private:
typedef TQueueType<ElementType> TBase;
-
+
virtual typename TBase::TEntry* NewEntry(ElementType&& element) noexcept override {
- AtomicAdd(Count, 1);
- return TBase::NewEntry(static_cast<ElementType&&>(element));
- }
-
+ AtomicAdd(Count, 1);
+ return TBase::NewEntry(static_cast<ElementType&&>(element));
+ }
+
virtual typename TBase::TEntry* DeleteEntry(typename TBase::TEntry* entry) noexcept override {
- AtomicSub(Count, 1);
- return TBase::DeleteEntry(entry);
- }
-
- TAtomic Count;
-};
+ AtomicSub(Count, 1);
+ return TBase::DeleteEntry(entry);
+ }
+
+ TAtomic Count;
+};