aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/queue/mpsc_htswap.h
diff options
context:
space:
mode:
authoragri <agri@yandex-team.ru>2022-02-10 16:48:12 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:12 +0300
commitd3530b2692e400bd4d29bd4f07cafaee139164e7 (patch)
treeb7ae636a74490e649a2ed0fdd5361f1bec83b9f9 /library/cpp/threading/queue/mpsc_htswap.h
parent0f4c5d1e8c0672bf0a1f2f2d8acac5ba24772435 (diff)
downloadydb-d3530b2692e400bd4d29bd4f07cafaee139164e7.tar.gz
Restoring authorship annotation for <agri@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/queue/mpsc_htswap.h')
-rw-r--r--library/cpp/threading/queue/mpsc_htswap.h252
1 files changed, 126 insertions, 126 deletions
diff --git a/library/cpp/threading/queue/mpsc_htswap.h b/library/cpp/threading/queue/mpsc_htswap.h
index c42caa7ac0..2d0bfd1146 100644
--- a/library/cpp/threading/queue/mpsc_htswap.h
+++ b/library/cpp/threading/queue/mpsc_htswap.h
@@ -1,132 +1,132 @@
-#pragma once
-
-/*
- http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
-
- Simple semi-wait-free queue. Many producers - one consumer.
- Tracking of allocated memory is not required.
- No CAS. Only atomic swap (exchange) operations.
-
- WARNING: a sleeping producer can stop progress for consumer.
-
- WARNING: there is no wait&notify mechanic for consumer,
- consumer receives nullptr if queue was empty.
-
- WARNING: the algorithm itself is lock-free
- but producers and consumer could be blocked by memory allocator
-
- Reference design: rtmapreduce/libs/threading/lfqueue.h
- */
-
-#include <util/generic/noncopyable.h>
-#include <util/system/types.h>
-#include <util/system/atomic.h>
-
-#include "tune.h"
-
-namespace NThreading {
- namespace NHTSwapPrivate {
- template <typename T, typename TTuneup>
- struct TNode
+#pragma once
+
+/*
+ http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
+
+ Simple semi-wait-free queue. Many producers - one consumer.
+ Tracking of allocated memory is not required.
+ No CAS. Only atomic swap (exchange) operations.
+
+ WARNING: a sleeping producer can stop progress for consumer.
+
+ WARNING: there is no wait&notify mechanic for consumer,
+ consumer receives nullptr if queue was empty.
+
+ WARNING: the algorithm itself is lock-free
+ but producers and consumer could be blocked by memory allocator
+
+ Reference design: rtmapreduce/libs/threading/lfqueue.h
+ */
+
+#include <util/generic/noncopyable.h>
+#include <util/system/types.h>
+#include <util/system/atomic.h>
+
+#include "tune.h"
+
+namespace NThreading {
+ namespace NHTSwapPrivate {
+ template <typename T, typename TTuneup>
+ struct TNode
: public TTuneup::TNodeBase,
public TTuneup::template TNodeLayout<TNode<T, TTuneup>, T> {
- TNode(const T& item) {
- this->Next = nullptr;
- this->Item = item;
- }
-
- TNode(T&& item) {
- this->Next = nullptr;
- this->Item = std::move(item);
- }
- };
-
- struct TDefaultTuneup {
- struct TNodeBase: private TNonCopyable {
- };
-
- template <typename TNode, typename T>
- struct TNodeLayout {
- TNode* Next;
- T Item;
- };
-
- template <typename TNode>
- struct TQueueLayout {
- TNode* Head;
- TNode* Tail;
- };
- };
-
- template <typename T, typename TTuneup>
- class THTSwapQueueImpl
+ TNode(const T& item) {
+ this->Next = nullptr;
+ this->Item = item;
+ }
+
+ TNode(T&& item) {
+ this->Next = nullptr;
+ this->Item = std::move(item);
+ }
+ };
+
+ struct TDefaultTuneup {
+ struct TNodeBase: private TNonCopyable {
+ };
+
+ template <typename TNode, typename T>
+ struct TNodeLayout {
+ TNode* Next;
+ T Item;
+ };
+
+ template <typename TNode>
+ struct TQueueLayout {
+ TNode* Head;
+ TNode* Tail;
+ };
+ };
+
+ template <typename T, typename TTuneup>
+ class THTSwapQueueImpl
: protected TTuneup::template TQueueLayout<TNode<T, TTuneup>> {
- protected:
- using TTunedNode = TNode<T, TTuneup>;
-
- public:
- using TItem = T;
-
- THTSwapQueueImpl() {
- this->Head = new TTunedNode(T());
- this->Tail = this->Head;
- }
-
- ~THTSwapQueueImpl() {
- TTunedNode* node = this->Head;
- while (node != nullptr) {
- TTunedNode* next = node->Next;
- delete node;
- node = next;
- }
- }
-
- template <typename TT>
- void Push(TT&& item) {
- Enqueue(new TTunedNode(std::forward<TT>(item)));
- }
-
- T Peek() {
- TTunedNode* next = AtomicGet(this->Head->Next);
- if (next == nullptr) {
- return T();
- }
- return next->Item;
- }
-
- void Enqueue(TTunedNode* node) {
- // our goal is to avoid expensive CAS here,
- // but now consumer will be blocked until new tail linked.
- // fortunately 'window of inconsistency' is extremely small.
- TTunedNode* prev = AtomicSwap(&this->Tail, node);
- AtomicSet(prev->Next, node);
- }
-
- T Pop() {
- TTunedNode* next = AtomicGet(this->Head->Next);
- if (next == nullptr) {
- return nullptr;
- }
- auto item = std::move(next->Item);
- std::swap(this->Head, next); // no need atomic here
- delete next;
- return item;
- }
-
- bool IsEmpty() const {
- TTunedNode* next = AtomicGet(this->Head->Next);
- return (next == nullptr);
- }
- };
- }
-
- DeclareTuneTypeParam(THTSwapNodeBase, TNodeBase);
- DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout);
- DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout);
-
+ protected:
+ using TTunedNode = TNode<T, TTuneup>;
+
+ public:
+ using TItem = T;
+
+ THTSwapQueueImpl() {
+ this->Head = new TTunedNode(T());
+ this->Tail = this->Head;
+ }
+
+ ~THTSwapQueueImpl() {
+ TTunedNode* node = this->Head;
+ while (node != nullptr) {
+ TTunedNode* next = node->Next;
+ delete node;
+ node = next;
+ }
+ }
+
+ template <typename TT>
+ void Push(TT&& item) {
+ Enqueue(new TTunedNode(std::forward<TT>(item)));
+ }
+
+ T Peek() {
+ TTunedNode* next = AtomicGet(this->Head->Next);
+ if (next == nullptr) {
+ return T();
+ }
+ return next->Item;
+ }
+
+ void Enqueue(TTunedNode* node) {
+ // our goal is to avoid expensive CAS here,
+ // but now consumer will be blocked until new tail linked.
+ // fortunately 'window of inconsistency' is extremely small.
+ TTunedNode* prev = AtomicSwap(&this->Tail, node);
+ AtomicSet(prev->Next, node);
+ }
+
+ T Pop() {
+ TTunedNode* next = AtomicGet(this->Head->Next);
+ if (next == nullptr) {
+ return nullptr;
+ }
+ auto item = std::move(next->Item);
+ std::swap(this->Head, next); // no need atomic here
+ delete next;
+ return item;
+ }
+
+ bool IsEmpty() const {
+ TTunedNode* next = AtomicGet(this->Head->Next);
+ return (next == nullptr);
+ }
+ };
+ }
+
+ DeclareTuneTypeParam(THTSwapNodeBase, TNodeBase);
+ DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout);
+ DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout);
+
template <typename T = void*, typename... TParams>
- class THTSwapQueue
+ class THTSwapQueue
: public NHTSwapPrivate::THTSwapQueueImpl<T,
TTune<NHTSwapPrivate::TDefaultTuneup, TParams...>> {
- };
-}
+ };
+}