diff options
author | agri <agri@yandex-team.ru> | 2022-02-10 16:48:12 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:12 +0300 |
commit | d3530b2692e400bd4d29bd4f07cafaee139164e7 (patch) | |
tree | b7ae636a74490e649a2ed0fdd5361f1bec83b9f9 /library/cpp/threading/queue/mpsc_htswap.h | |
parent | 0f4c5d1e8c0672bf0a1f2f2d8acac5ba24772435 (diff) | |
download | ydb-d3530b2692e400bd4d29bd4f07cafaee139164e7.tar.gz |
Restoring authorship annotation for <agri@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/queue/mpsc_htswap.h')
-rw-r--r-- | library/cpp/threading/queue/mpsc_htswap.h | 252 |
1 files changed, 126 insertions, 126 deletions
diff --git a/library/cpp/threading/queue/mpsc_htswap.h b/library/cpp/threading/queue/mpsc_htswap.h index c42caa7ac0..2d0bfd1146 100644 --- a/library/cpp/threading/queue/mpsc_htswap.h +++ b/library/cpp/threading/queue/mpsc_htswap.h @@ -1,132 +1,132 @@ -#pragma once - -/* - http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue - - Simple semi-wait-free queue. Many producers - one consumer. - Tracking of allocated memory is not required. - No CAS. Only atomic swap (exchange) operations. - - WARNING: a sleeping producer can stop progress for consumer. - - WARNING: there is no wait¬ify mechanic for consumer, - consumer receives nullptr if queue was empty. - - WARNING: the algorithm itself is lock-free - but producers and consumer could be blocked by memory allocator - - Reference design: rtmapreduce/libs/threading/lfqueue.h - */ - -#include <util/generic/noncopyable.h> -#include <util/system/types.h> -#include <util/system/atomic.h> - -#include "tune.h" - -namespace NThreading { - namespace NHTSwapPrivate { - template <typename T, typename TTuneup> - struct TNode +#pragma once + +/* + http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue + + Simple semi-wait-free queue. Many producers - one consumer. + Tracking of allocated memory is not required. + No CAS. Only atomic swap (exchange) operations. + + WARNING: a sleeping producer can stop progress for consumer. + + WARNING: there is no wait¬ify mechanic for consumer, + consumer receives nullptr if queue was empty. + + WARNING: the algorithm itself is lock-free + but producers and consumer could be blocked by memory allocator + + Reference design: rtmapreduce/libs/threading/lfqueue.h + */ + +#include <util/generic/noncopyable.h> +#include <util/system/types.h> +#include <util/system/atomic.h> + +#include "tune.h" + +namespace NThreading { + namespace NHTSwapPrivate { + template <typename T, typename TTuneup> + struct TNode : public TTuneup::TNodeBase, public TTuneup::template TNodeLayout<TNode<T, TTuneup>, T> { - TNode(const T& item) { - this->Next = nullptr; - this->Item = item; - } - - TNode(T&& item) { - this->Next = nullptr; - this->Item = std::move(item); - } - }; - - struct TDefaultTuneup { - struct TNodeBase: private TNonCopyable { - }; - - template <typename TNode, typename T> - struct TNodeLayout { - TNode* Next; - T Item; - }; - - template <typename TNode> - struct TQueueLayout { - TNode* Head; - TNode* Tail; - }; - }; - - template <typename T, typename TTuneup> - class THTSwapQueueImpl + TNode(const T& item) { + this->Next = nullptr; + this->Item = item; + } + + TNode(T&& item) { + this->Next = nullptr; + this->Item = std::move(item); + } + }; + + struct TDefaultTuneup { + struct TNodeBase: private TNonCopyable { + }; + + template <typename TNode, typename T> + struct TNodeLayout { + TNode* Next; + T Item; + }; + + template <typename TNode> + struct TQueueLayout { + TNode* Head; + TNode* Tail; + }; + }; + + template <typename T, typename TTuneup> + class THTSwapQueueImpl : protected TTuneup::template TQueueLayout<TNode<T, TTuneup>> { - protected: - using TTunedNode = TNode<T, TTuneup>; - - public: - using TItem = T; - - THTSwapQueueImpl() { - this->Head = new TTunedNode(T()); - this->Tail = this->Head; - } - - ~THTSwapQueueImpl() { - TTunedNode* node = this->Head; - while (node != nullptr) { - TTunedNode* next = node->Next; - delete node; - node = next; - } - } - - template <typename TT> - void Push(TT&& item) { - Enqueue(new TTunedNode(std::forward<TT>(item))); - } - - T Peek() { - TTunedNode* next = AtomicGet(this->Head->Next); - if (next == nullptr) { - return T(); - } - return next->Item; - } - - void Enqueue(TTunedNode* node) { - // our goal is to avoid expensive CAS here, - // but now consumer will be blocked until new tail linked. - // fortunately 'window of inconsistency' is extremely small. - TTunedNode* prev = AtomicSwap(&this->Tail, node); - AtomicSet(prev->Next, node); - } - - T Pop() { - TTunedNode* next = AtomicGet(this->Head->Next); - if (next == nullptr) { - return nullptr; - } - auto item = std::move(next->Item); - std::swap(this->Head, next); // no need atomic here - delete next; - return item; - } - - bool IsEmpty() const { - TTunedNode* next = AtomicGet(this->Head->Next); - return (next == nullptr); - } - }; - } - - DeclareTuneTypeParam(THTSwapNodeBase, TNodeBase); - DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout); - DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout); - + protected: + using TTunedNode = TNode<T, TTuneup>; + + public: + using TItem = T; + + THTSwapQueueImpl() { + this->Head = new TTunedNode(T()); + this->Tail = this->Head; + } + + ~THTSwapQueueImpl() { + TTunedNode* node = this->Head; + while (node != nullptr) { + TTunedNode* next = node->Next; + delete node; + node = next; + } + } + + template <typename TT> + void Push(TT&& item) { + Enqueue(new TTunedNode(std::forward<TT>(item))); + } + + T Peek() { + TTunedNode* next = AtomicGet(this->Head->Next); + if (next == nullptr) { + return T(); + } + return next->Item; + } + + void Enqueue(TTunedNode* node) { + // our goal is to avoid expensive CAS here, + // but now consumer will be blocked until new tail linked. + // fortunately 'window of inconsistency' is extremely small. + TTunedNode* prev = AtomicSwap(&this->Tail, node); + AtomicSet(prev->Next, node); + } + + T Pop() { + TTunedNode* next = AtomicGet(this->Head->Next); + if (next == nullptr) { + return nullptr; + } + auto item = std::move(next->Item); + std::swap(this->Head, next); // no need atomic here + delete next; + return item; + } + + bool IsEmpty() const { + TTunedNode* next = AtomicGet(this->Head->Next); + return (next == nullptr); + } + }; + } + + DeclareTuneTypeParam(THTSwapNodeBase, TNodeBase); + DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout); + DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout); + template <typename T = void*, typename... TParams> - class THTSwapQueue + class THTSwapQueue : public NHTSwapPrivate::THTSwapQueueImpl<T, TTune<NHTSwapPrivate::TDefaultTuneup, TParams...>> { - }; -} + }; +} |