path: root/library/cpp/threading/queue
diff options
authoragri <agri@yandex-team.ru>2022-02-10 16:48:12 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:12 +0300
commitd3530b2692e400bd4d29bd4f07cafaee139164e7 (patch)
treeb7ae636a74490e649a2ed0fdd5361f1bec83b9f9 /library/cpp/threading/queue
parent0f4c5d1e8c0672bf0a1f2f2d8acac5ba24772435 (diff)
Restoring authorship annotation for <agri@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/queue')
19 files changed, 2215 insertions, 2215 deletions
diff --git a/library/cpp/threading/queue/basic_ut.cpp b/library/cpp/threading/queue/basic_ut.cpp
index 5f56f8583e..2db5d6e8e8 100644
--- a/library/cpp/threading/queue/basic_ut.cpp
+++ b/library/cpp/threading/queue/basic_ut.cpp
@@ -1,92 +1,92 @@
#include <library/cpp/testing/unittest/registar.h>
-#include <util/generic/vector.h>
-#include <util/system/thread.h>
-#include "ut_helpers.h"
+#include <util/generic/vector.h>
+#include <util/system/thread.h>
+#include "ut_helpers.h"
template <typename TQueueType>
-class TQueueTestsInSingleThread: public TTestBase {
+class TQueueTestsInSingleThread: public TTestBase {
using TSelf = TQueueTestsInSingleThread<TQueueType>;
- using TLink = TIntrusiveLink;
- UNIT_TEST(OnePushOnePop)
- UNIT_TEST(OnePushOnePop_Repeat1M)
- UNIT_TEST(Threads8_Repeat1M_Push1Pop1)
- void OnePushOnePop() {
+ using TLink = TIntrusiveLink;
+ UNIT_TEST(OnePushOnePop)
+ UNIT_TEST(OnePushOnePop_Repeat1M)
+ UNIT_TEST(Threads8_Repeat1M_Push1Pop1)
+ void OnePushOnePop() {
TQueueType queue;
- auto popped = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
- TLink msg;
- queue.Push(&msg);
- popped = queue.Pop();
- popped = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
- };
- void OnePushOnePop_Repeat1M() {
+ auto popped = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
+ TLink msg;
+ queue.Push(&msg);
+ popped = queue.Pop();
+ popped = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
+ };
+ void OnePushOnePop_Repeat1M() {
TQueueType queue;
- TLink msg;
- auto popped = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
- for (int i = 0; i < 1000000; ++i) {
- queue.Push(&msg);
- popped = queue.Pop();
- popped = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
- }
- }
- template <size_t NUMBER_OF_THREADS>
- void RepeatPush1Pop1_InManyThreads() {
+ TLink msg;
+ auto popped = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
+ for (int i = 0; i < 1000000; ++i) {
+ queue.Push(&msg);
+ popped = queue.Pop();
+ popped = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
+ }
+ }
+ template <size_t NUMBER_OF_THREADS>
+ void RepeatPush1Pop1_InManyThreads() {
class TCycleThread: public ISimpleThread {
- public:
- void* ThreadProc() override {
+ public:
+ void* ThreadProc() override {
TQueueType queue;
- TLink msg;
- auto popped = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
- for (size_t i = 0; i < 1000000; ++i) {
- queue.Push(&msg);
- popped = queue.Pop();
- popped = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
- }
- return nullptr;
- }
- };
+ TLink msg;
+ auto popped = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
+ for (size_t i = 0; i < 1000000; ++i) {
+ queue.Push(&msg);
+ popped = queue.Pop();
+ popped = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
+ }
+ return nullptr;
+ }
+ };
TVector<TAutoPtr<TCycleThread>> cyclers;
- for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) {
- cyclers.emplace_back(new TCycleThread);
- cyclers.back()->Start();
- }
- for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) {
- cyclers[i]->Join();
- }
- }
- void Threads8_Repeat1M_Push1Pop1() {
- RepeatPush1Pop1_InManyThreads<8>();
- }
+ for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) {
+ cyclers.emplace_back(new TCycleThread);
+ cyclers.back()->Start();
+ }
+ for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) {
+ cyclers[i]->Join();
+ }
+ }
+ void Threads8_Repeat1M_Push1Pop1() {
+ RepeatPush1Pop1_InManyThreads<8>();
+ }
diff --git a/library/cpp/threading/queue/mpmc_unordered_ring.cpp b/library/cpp/threading/queue/mpmc_unordered_ring.cpp
index 160547f594..df48182210 100644
--- a/library/cpp/threading/queue/mpmc_unordered_ring.cpp
+++ b/library/cpp/threading/queue/mpmc_unordered_ring.cpp
@@ -1,74 +1,74 @@
-#include "mpmc_unordered_ring.h"
-namespace NThreading {
- TMPMCUnorderedRing::TMPMCUnorderedRing(size_t size) {
- Y_VERIFY(size > 0);
- RingSize = size;
- RingBuffer.Reset(new void*[size]);
- memset(&RingBuffer[0], 0, sizeof(void*) * size);
- }
- bool TMPMCUnorderedRing::Push(void* msg, ui16 retryCount) noexcept {
- if (retryCount == 0) {
- StubbornPush(msg);
- return true;
- }
- for (ui16 itry = retryCount; itry-- > 0;) {
- if (WeakPush(msg)) {
- return true;
- }
- }
- return false;
- }
- bool TMPMCUnorderedRing::WeakPush(void* msg) noexcept {
- auto pawl = AtomicIncrement(WritePawl);
- if (pawl - AtomicGet(ReadFront) >= RingSize) {
- // Queue is full
- AtomicDecrement(WritePawl);
- return false;
- }
- auto writeSlot = AtomicGetAndIncrement(WriteFront);
- if (AtomicCas(&RingBuffer[writeSlot % RingSize], msg, nullptr)) {
- return true;
- }
- // slot is occupied for some reason, retry
- return false;
- }
- void* TMPMCUnorderedRing::Pop() noexcept {
- ui64 readSlot;
- for (ui16 itry = MAX_POP_TRIES; itry-- > 0;) {
- auto pawl = AtomicIncrement(ReadPawl);
- if (pawl > AtomicGet(WriteFront)) {
- // Queue is empty
- AtomicDecrement(ReadPawl);
- return nullptr;
- }
- readSlot = AtomicGetAndIncrement(ReadFront);
- auto msg = AtomicSwap(&RingBuffer[readSlot % RingSize], nullptr);
- if (msg != nullptr) {
- return msg;
- }
- }
- /* got no message in the slot, let's try to rollback readfront */
- AtomicCas(&ReadFront, readSlot - 1, readSlot);
- return nullptr;
- }
- void* TMPMCUnorderedRing::UnsafeScanningPop(ui64* last) noexcept {
- for (; *last < RingSize;) {
- auto msg = AtomicSwap(&RingBuffer[*last], nullptr);
- ++*last;
- if (msg != nullptr) {
- return msg;
- }
- }
- return nullptr;
- }
+#include "mpmc_unordered_ring.h"
+namespace NThreading {
+ TMPMCUnorderedRing::TMPMCUnorderedRing(size_t size) {
+ Y_VERIFY(size > 0);
+ RingSize = size;
+ RingBuffer.Reset(new void*[size]);
+ memset(&RingBuffer[0], 0, sizeof(void*) * size);
+ }
+ bool TMPMCUnorderedRing::Push(void* msg, ui16 retryCount) noexcept {
+ if (retryCount == 0) {
+ StubbornPush(msg);
+ return true;
+ }
+ for (ui16 itry = retryCount; itry-- > 0;) {
+ if (WeakPush(msg)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ bool TMPMCUnorderedRing::WeakPush(void* msg) noexcept {
+ auto pawl = AtomicIncrement(WritePawl);
+ if (pawl - AtomicGet(ReadFront) >= RingSize) {
+ // Queue is full
+ AtomicDecrement(WritePawl);
+ return false;
+ }
+ auto writeSlot = AtomicGetAndIncrement(WriteFront);
+ if (AtomicCas(&RingBuffer[writeSlot % RingSize], msg, nullptr)) {
+ return true;
+ }
+ // slot is occupied for some reason, retry
+ return false;
+ }
+ void* TMPMCUnorderedRing::Pop() noexcept {
+ ui64 readSlot;
+ for (ui16 itry = MAX_POP_TRIES; itry-- > 0;) {
+ auto pawl = AtomicIncrement(ReadPawl);
+ if (pawl > AtomicGet(WriteFront)) {
+ // Queue is empty
+ AtomicDecrement(ReadPawl);
+ return nullptr;
+ }
+ readSlot = AtomicGetAndIncrement(ReadFront);
+ auto msg = AtomicSwap(&RingBuffer[readSlot % RingSize], nullptr);
+ if (msg != nullptr) {
+ return msg;
+ }
+ }
+ /* got no message in the slot, let's try to rollback readfront */
+ AtomicCas(&ReadFront, readSlot - 1, readSlot);
+ return nullptr;
+ }
+ void* TMPMCUnorderedRing::UnsafeScanningPop(ui64* last) noexcept {
+ for (; *last < RingSize;) {
+ auto msg = AtomicSwap(&RingBuffer[*last], nullptr);
+ ++*last;
+ if (msg != nullptr) {
+ return msg;
+ }
+ }
+ return nullptr;
+ }
diff --git a/library/cpp/threading/queue/mpmc_unordered_ring.h b/library/cpp/threading/queue/mpmc_unordered_ring.h
index 5042f7528e..59758d2c35 100644
--- a/library/cpp/threading/queue/mpmc_unordered_ring.h
+++ b/library/cpp/threading/queue/mpmc_unordered_ring.h
@@ -1,42 +1,42 @@
-#pragma once
- It's not a general purpose queue.
- No order guarantee, but it mostly ordered.
- Items may stuck in almost empty queue.
- Use UnsafeScanningPop to pop all stuck items.
- Almost wait-free for producers and consumers.
- */
-#include <util/system/atomic.h>
-#include <util/generic/ptr.h>
-namespace NThreading {
- struct TMPMCUnorderedRing {
- public:
- static constexpr ui16 MAX_PUSH_TRIES = 4;
- static constexpr ui16 MAX_POP_TRIES = 4;
- TMPMCUnorderedRing(size_t size);
- bool Push(void* msg, ui16 retryCount = MAX_PUSH_TRIES) noexcept;
- void StubbornPush(void* msg) {
- while (!WeakPush(msg)) {
- }
- }
- void* Pop() noexcept;
- void* UnsafeScanningPop(ui64* last) noexcept;
- private:
- bool WeakPush(void* msg) noexcept;
- size_t RingSize;
- TArrayPtr<void*> RingBuffer;
- ui64 WritePawl = 0;
- ui64 WriteFront = 0;
- ui64 ReadPawl = 0;
- ui64 ReadFront = 0;
- };
+#pragma once
+ It's not a general purpose queue.
+ No order guarantee, but it mostly ordered.
+ Items may stuck in almost empty queue.
+ Use UnsafeScanningPop to pop all stuck items.
+ Almost wait-free for producers and consumers.
+ */
+#include <util/system/atomic.h>
+#include <util/generic/ptr.h>
+namespace NThreading {
+ struct TMPMCUnorderedRing {
+ public:
+ static constexpr ui16 MAX_PUSH_TRIES = 4;
+ static constexpr ui16 MAX_POP_TRIES = 4;
+ TMPMCUnorderedRing(size_t size);
+ bool Push(void* msg, ui16 retryCount = MAX_PUSH_TRIES) noexcept;
+ void StubbornPush(void* msg) {
+ while (!WeakPush(msg)) {
+ }
+ }
+ void* Pop() noexcept;
+ void* UnsafeScanningPop(ui64* last) noexcept;
+ private:
+ bool WeakPush(void* msg) noexcept;
+ size_t RingSize;
+ TArrayPtr<void*> RingBuffer;
+ ui64 WritePawl = 0;
+ ui64 WriteFront = 0;
+ ui64 ReadPawl = 0;
+ ui64 ReadFront = 0;
+ };
diff --git a/library/cpp/threading/queue/mpsc_htswap.cpp b/library/cpp/threading/queue/mpsc_htswap.cpp
index 610c8f67f1..d8ab0d4f48 100644
--- a/library/cpp/threading/queue/mpsc_htswap.cpp
+++ b/library/cpp/threading/queue/mpsc_htswap.cpp
@@ -1 +1 @@
-#include "mpsc_htswap.h"
+#include "mpsc_htswap.h"
diff --git a/library/cpp/threading/queue/mpsc_htswap.h b/library/cpp/threading/queue/mpsc_htswap.h
index c42caa7ac0..2d0bfd1146 100644
--- a/library/cpp/threading/queue/mpsc_htswap.h
+++ b/library/cpp/threading/queue/mpsc_htswap.h
@@ -1,132 +1,132 @@
-#pragma once
- http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
- Simple semi-wait-free queue. Many producers - one consumer.
- Tracking of allocated memory is not required.
- No CAS. Only atomic swap (exchange) operations.
- WARNING: a sleeping producer can stop progress for consumer.
- WARNING: there is no wait&notify mechanic for consumer,
- consumer receives nullptr if queue was empty.
- WARNING: the algorithm itself is lock-free
- but producers and consumer could be blocked by memory allocator
- Reference design: rtmapreduce/libs/threading/lfqueue.h
- */
-#include <util/generic/noncopyable.h>
-#include <util/system/types.h>
-#include <util/system/atomic.h>
-#include "tune.h"
-namespace NThreading {
- namespace NHTSwapPrivate {
- template <typename T, typename TTuneup>
- struct TNode
+#pragma once
+ http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
+ Simple semi-wait-free queue. Many producers - one consumer.
+ Tracking of allocated memory is not required.
+ No CAS. Only atomic swap (exchange) operations.
+ WARNING: a sleeping producer can stop progress for consumer.
+ WARNING: there is no wait&notify mechanic for consumer,
+ consumer receives nullptr if queue was empty.
+ WARNING: the algorithm itself is lock-free
+ but producers and consumer could be blocked by memory allocator
+ Reference design: rtmapreduce/libs/threading/lfqueue.h
+ */
+#include <util/generic/noncopyable.h>
+#include <util/system/types.h>
+#include <util/system/atomic.h>
+#include "tune.h"
+namespace NThreading {
+ namespace NHTSwapPrivate {
+ template <typename T, typename TTuneup>
+ struct TNode
: public TTuneup::TNodeBase,
public TTuneup::template TNodeLayout<TNode<T, TTuneup>, T> {
- TNode(const T& item) {
- this->Next = nullptr;
- this->Item = item;
- }
- TNode(T&& item) {
- this->Next = nullptr;
- this->Item = std::move(item);
- }
- };
- struct TDefaultTuneup {
- struct TNodeBase: private TNonCopyable {
- };
- template <typename TNode, typename T>
- struct TNodeLayout {
- TNode* Next;
- T Item;
- };
- template <typename TNode>
- struct TQueueLayout {
- TNode* Head;
- TNode* Tail;
- };
- };
- template <typename T, typename TTuneup>
- class THTSwapQueueImpl
+ TNode(const T& item) {
+ this->Next = nullptr;
+ this->Item = item;
+ }
+ TNode(T&& item) {
+ this->Next = nullptr;
+ this->Item = std::move(item);
+ }
+ };
+ struct TDefaultTuneup {
+ struct TNodeBase: private TNonCopyable {
+ };
+ template <typename TNode, typename T>
+ struct TNodeLayout {
+ TNode* Next;
+ T Item;
+ };
+ template <typename TNode>
+ struct TQueueLayout {
+ TNode* Head;
+ TNode* Tail;
+ };
+ };
+ template <typename T, typename TTuneup>
+ class THTSwapQueueImpl
: protected TTuneup::template TQueueLayout<TNode<T, TTuneup>> {
- protected:
- using TTunedNode = TNode<T, TTuneup>;
- public:
- using TItem = T;
- THTSwapQueueImpl() {
- this->Head = new TTunedNode(T());
- this->Tail = this->Head;
- }
- ~THTSwapQueueImpl() {
- TTunedNode* node = this->Head;
- while (node != nullptr) {
- TTunedNode* next = node->Next;
- delete node;
- node = next;
- }
- }
- template <typename TT>
- void Push(TT&& item) {
- Enqueue(new TTunedNode(std::forward<TT>(item)));
- }
- T Peek() {
- TTunedNode* next = AtomicGet(this->Head->Next);
- if (next == nullptr) {
- return T();
- }
- return next->Item;
- }
- void Enqueue(TTunedNode* node) {
- // our goal is to avoid expensive CAS here,
- // but now consumer will be blocked until new tail linked.
- // fortunately 'window of inconsistency' is extremely small.
- TTunedNode* prev = AtomicSwap(&this->Tail, node);
- AtomicSet(prev->Next, node);
- }
- T Pop() {
- TTunedNode* next = AtomicGet(this->Head->Next);
- if (next == nullptr) {
- return nullptr;
- }
- auto item = std::move(next->Item);
- std::swap(this->Head, next); // no need atomic here
- delete next;
- return item;
- }
- bool IsEmpty() const {
- TTunedNode* next = AtomicGet(this->Head->Next);
- return (next == nullptr);
- }
- };
- }
- DeclareTuneTypeParam(THTSwapNodeBase, TNodeBase);
- DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout);
- DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout);
+ protected:
+ using TTunedNode = TNode<T, TTuneup>;
+ public:
+ using TItem = T;
+ THTSwapQueueImpl() {
+ this->Head = new TTunedNode(T());
+ this->Tail = this->Head;
+ }
+ ~THTSwapQueueImpl() {
+ TTunedNode* node = this->Head;
+ while (node != nullptr) {
+ TTunedNode* next = node->Next;
+ delete node;
+ node = next;
+ }
+ }
+ template <typename TT>
+ void Push(TT&& item) {
+ Enqueue(new TTunedNode(std::forward<TT>(item)));
+ }
+ T Peek() {
+ TTunedNode* next = AtomicGet(this->Head->Next);
+ if (next == nullptr) {
+ return T();
+ }
+ return next->Item;
+ }
+ void Enqueue(TTunedNode* node) {
+ // our goal is to avoid expensive CAS here,
+ // but now consumer will be blocked until new tail linked.
+ // fortunately 'window of inconsistency' is extremely small.
+ TTunedNode* prev = AtomicSwap(&this->Tail, node);
+ AtomicSet(prev->Next, node);
+ }
+ T Pop() {
+ TTunedNode* next = AtomicGet(this->Head->Next);
+ if (next == nullptr) {
+ return nullptr;
+ }
+ auto item = std::move(next->Item);
+ std::swap(this->Head, next); // no need atomic here
+ delete next;
+ return item;
+ }
+ bool IsEmpty() const {
+ TTunedNode* next = AtomicGet(this->Head->Next);
+ return (next == nullptr);
+ }
+ };
+ }
+ DeclareTuneTypeParam(THTSwapNodeBase, TNodeBase);
+ DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout);
+ DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout);
template <typename T = void*, typename... TParams>
- class THTSwapQueue
+ class THTSwapQueue
: public NHTSwapPrivate::THTSwapQueueImpl<T,
TTune<NHTSwapPrivate::TDefaultTuneup, TParams...>> {
- };
+ };
diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp
index 3bb1a04f7e..a6a2fcef39 100644
--- a/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp
+++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp
@@ -1,79 +1,79 @@
-#include "mpsc_intrusive_unordered.h"
-#include <util/system/atomic.h>
-namespace NThreading {
- void TMPSCIntrusiveUnordered::Push(TIntrusiveNode* node) noexcept {
- auto head = AtomicGet(HeadForCaS);
- for (ui32 i = NUMBER_OF_TRIES_FOR_CAS; i-- > 0;) {
- // no ABA here, because Next is exactly head
- // it does not matter how many travels head was made/
- node->Next = head;
- auto prev = AtomicGetAndCas(&HeadForCaS, node, head);
- if (head == prev) {
- return;
- }
- head = prev;
- }
- // boring of trying to do cas, let's just swap
- // no need for atomic here, because the next is atomic swap
- node->Next = 0;
- head = AtomicSwap(&HeadForSwap, node);
- if (head != nullptr) {
- AtomicSet(node->Next, head);
- } else {
- // consumer must know if no other thread may access the memory,
- // setting Next to node is a way to notify consumer
- AtomicSet(node->Next, node);
- }
- }
- TIntrusiveNode* TMPSCIntrusiveUnordered::PopMany() noexcept {
- if (NotReadyChain == nullptr) {
- auto head = AtomicSwap(&HeadForSwap, nullptr);
- NotReadyChain = head;
- }
- if (NotReadyChain != nullptr) {
- auto next = AtomicGet(NotReadyChain->Next);
- if (next != nullptr) {
- auto ready = NotReadyChain;
- TIntrusiveNode* cut;
- do {
- cut = NotReadyChain;
- NotReadyChain = next;
- next = AtomicGet(NotReadyChain->Next);
- if (next == NotReadyChain) {
- cut = NotReadyChain;
- NotReadyChain = nullptr;
- break;
- }
- } while (next != nullptr);
- cut->Next = nullptr;
- return ready;
- }
- }
- if (AtomicGet(HeadForCaS) != nullptr) {
- return AtomicSwap(&HeadForCaS, nullptr);
- }
- return nullptr;
- }
- TIntrusiveNode* TMPSCIntrusiveUnordered::Pop() noexcept {
- if (PopOneQueue != nullptr) {
- auto head = PopOneQueue;
- PopOneQueue = PopOneQueue->Next;
- return head;
- }
- PopOneQueue = PopMany();
- if (PopOneQueue != nullptr) {
- auto head = PopOneQueue;
- PopOneQueue = PopOneQueue->Next;
- return head;
- }
- return nullptr;
- }
+#include "mpsc_intrusive_unordered.h"
+#include <util/system/atomic.h>
+namespace NThreading {
+ void TMPSCIntrusiveUnordered::Push(TIntrusiveNode* node) noexcept {
+ auto head = AtomicGet(HeadForCaS);
+ for (ui32 i = NUMBER_OF_TRIES_FOR_CAS; i-- > 0;) {
+ // no ABA here, because Next is exactly head
+ // it does not matter how many travels head was made/
+ node->Next = head;
+ auto prev = AtomicGetAndCas(&HeadForCaS, node, head);
+ if (head == prev) {
+ return;
+ }
+ head = prev;
+ }
+ // boring of trying to do cas, let's just swap
+ // no need for atomic here, because the next is atomic swap
+ node->Next = 0;
+ head = AtomicSwap(&HeadForSwap, node);
+ if (head != nullptr) {
+ AtomicSet(node->Next, head);
+ } else {
+ // consumer must know if no other thread may access the memory,
+ // setting Next to node is a way to notify consumer
+ AtomicSet(node->Next, node);
+ }
+ }
+ TIntrusiveNode* TMPSCIntrusiveUnordered::PopMany() noexcept {
+ if (NotReadyChain == nullptr) {
+ auto head = AtomicSwap(&HeadForSwap, nullptr);
+ NotReadyChain = head;
+ }
+ if (NotReadyChain != nullptr) {
+ auto next = AtomicGet(NotReadyChain->Next);
+ if (next != nullptr) {
+ auto ready = NotReadyChain;
+ TIntrusiveNode* cut;
+ do {
+ cut = NotReadyChain;
+ NotReadyChain = next;
+ next = AtomicGet(NotReadyChain->Next);
+ if (next == NotReadyChain) {
+ cut = NotReadyChain;
+ NotReadyChain = nullptr;
+ break;
+ }
+ } while (next != nullptr);
+ cut->Next = nullptr;
+ return ready;
+ }
+ }
+ if (AtomicGet(HeadForCaS) != nullptr) {
+ return AtomicSwap(&HeadForCaS, nullptr);
+ }
+ return nullptr;
+ }
+ TIntrusiveNode* TMPSCIntrusiveUnordered::Pop() noexcept {
+ if (PopOneQueue != nullptr) {
+ auto head = PopOneQueue;
+ PopOneQueue = PopOneQueue->Next;
+ return head;
+ }
+ PopOneQueue = PopMany();
+ if (PopOneQueue != nullptr) {
+ auto head = PopOneQueue;
+ PopOneQueue = PopOneQueue->Next;
+ return head;
+ }
+ return nullptr;
+ }
diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.h b/library/cpp/threading/queue/mpsc_intrusive_unordered.h
index 6ac7537ae9..c07cf761f6 100644
--- a/library/cpp/threading/queue/mpsc_intrusive_unordered.h
+++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.h
@@ -1,35 +1,35 @@
-#pragma once
- Simple almost-wait-free unordered queue for low contention operations.
- It's wait-free for producers.
- Hanging producer can hide some items from consumer.
- */
-#include <util/system/types.h>
-namespace NThreading {
- struct TIntrusiveNode {
- TIntrusiveNode* Next;
- };
- class TMPSCIntrusiveUnordered {
- public:
- static constexpr ui32 NUMBER_OF_TRIES_FOR_CAS = 3;
- void Push(TIntrusiveNode* node) noexcept;
- TIntrusiveNode* PopMany() noexcept;
- TIntrusiveNode* Pop() noexcept;
- void Push(void* node) noexcept {
- Push(reinterpret_cast<TIntrusiveNode*>(node));
- }
- private:
- TIntrusiveNode* HeadForCaS = nullptr;
- TIntrusiveNode* HeadForSwap = nullptr;
- TIntrusiveNode* NotReadyChain = nullptr;
- TIntrusiveNode* PopOneQueue = nullptr;
- };
+#pragma once
+ Simple almost-wait-free unordered queue for low contention operations.
+ It's wait-free for producers.
+ Hanging producer can hide some items from consumer.
+ */
+#include <util/system/types.h>
+namespace NThreading {
+ struct TIntrusiveNode {
+ TIntrusiveNode* Next;
+ };
+ class TMPSCIntrusiveUnordered {
+ public:
+ static constexpr ui32 NUMBER_OF_TRIES_FOR_CAS = 3;
+ void Push(TIntrusiveNode* node) noexcept;
+ TIntrusiveNode* PopMany() noexcept;
+ TIntrusiveNode* Pop() noexcept;
+ void Push(void* node) noexcept {
+ Push(reinterpret_cast<TIntrusiveNode*>(node));
+ }
+ private:
+ TIntrusiveNode* HeadForCaS = nullptr;
+ TIntrusiveNode* HeadForSwap = nullptr;
+ TIntrusiveNode* NotReadyChain = nullptr;
+ TIntrusiveNode* PopOneQueue = nullptr;
+ };
diff --git a/library/cpp/threading/queue/mpsc_read_as_filled.cpp b/library/cpp/threading/queue/mpsc_read_as_filled.cpp
index 8b4664a6f3..3b89fb1df6 100644
--- a/library/cpp/threading/queue/mpsc_read_as_filled.cpp
+++ b/library/cpp/threading/queue/mpsc_read_as_filled.cpp
@@ -1 +1 @@
-#include "mpsc_read_as_filled.h"
+#include "mpsc_read_as_filled.h"
diff --git a/library/cpp/threading/queue/mpsc_read_as_filled.h b/library/cpp/threading/queue/mpsc_read_as_filled.h
index be33ba5a58..4dfdb1fbbf 100644
--- a/library/cpp/threading/queue/mpsc_read_as_filled.h
+++ b/library/cpp/threading/queue/mpsc_read_as_filled.h
@@ -1,611 +1,611 @@
-#pragma once
- Completely wait-free queue, multiple producers - one consumer. Strict order.
- The queue algorithm is using concept of virtual infinite array.
+#pragma once
+ Completely wait-free queue, multiple producers - one consumer. Strict order.
+ The queue algorithm is using concept of virtual infinite array.
A producer takes a number from a counter and atomically increments the counter.
- The number taken is a number of a slot for the producer to put a new message
- into infinite array.
- Then producer constructs a virtual infinite array by bidirectional linked list
- of blocks. Each block contains several slots.
+ The number taken is a number of a slot for the producer to put a new message
+ into infinite array.
+ Then producer constructs a virtual infinite array by bidirectional linked list
+ of blocks. Each block contains several slots.
There is a hint pointer which optimistically points to the last block
- of the list and never goes backward.
- Consumer exploits the property of the hint pointer always going forward
- to free old blocks eventually. Consumer periodically read the hint pointer
- and the counter and thus deduce producers which potentially holds the pointer
- to a block. Consumer can free the block if all that producers filled their
- slots and left the queue.
- No producer can stop the progress for other producers.
- Consumer can't stop the progress for producers.
- Consumer can skip not-yet-filled slots and read them later.
- Thus no producer can stop the progress for consumer.
+ of the list and never goes backward.
+ Consumer exploits the property of the hint pointer always going forward
+ to free old blocks eventually. Consumer periodically read the hint pointer
+ and the counter and thus deduce producers which potentially holds the pointer
+ to a block. Consumer can free the block if all that producers filled their
+ slots and left the queue.
+ No producer can stop the progress for other producers.
+ Consumer can't stop the progress for producers.
+ Consumer can skip not-yet-filled slots and read them later.
+ Thus no producer can stop the progress for consumer.
The algorithm is virtually strictly ordered because it skips slots only
- if it is really does not matter in which order the slots were produced and
- consumed.
- WARNING: there is no wait&notify mechanic for consumer,
- consumer receives nullptr if queue was empty.
- WARNING: though the algorithm itself is completely wait-free
- but producers and consumer could be blocked by memory allocator
+ if it is really does not matter in which order the slots were produced and
+ consumed.
+ WARNING: there is no wait&notify mechanic for consumer,
+ consumer receives nullptr if queue was empty.
+ WARNING: though the algorithm itself is completely wait-free
+ but producers and consumer could be blocked by memory allocator
WARNING: copy constructors of the queue are not thread-safe
- */
-#include <util/generic/deque.h>
-#include <util/generic/ptr.h>
-#include <util/system/atomic.h>
-#include <util/system/spinlock.h>
-#include "tune.h"
-namespace NThreading {
- namespace NReadAsFilledPrivate {
- typedef void* TMsgLink;
- static constexpr ui32 DEFAULT_BUNCH_SIZE = 251;
- struct TEmpty {
- };
- struct TEmptyAux {
- TEmptyAux Retrieve() const {
- return TEmptyAux();
- }
- void Store(TEmptyAux&) {
- }
- static constexpr TEmptyAux Zero() {
- return TEmptyAux();
- }
- };
- template <typename TAux>
- struct TSlot {
- TMsgLink volatile Msg;
- TAux AuxiliaryData;
- inline void Store(TAux& aux) {
- AuxiliaryData.Store(aux);
- }
- inline TAux Retrieve() const {
- return AuxiliaryData.Retrieve();
- }
- static TSlot<TAux> NullElem() {
- return {nullptr, TAux::Zero()};
- }
- static TSlot<TAux> Pair(TMsgLink msg, TAux aux) {
- return {msg, std::move(aux)};
- }
- };
- template <>
- struct TSlot<TEmptyAux> {
- TMsgLink volatile Msg;
- inline void Store(TEmptyAux&) {
- }
- inline TEmptyAux Retrieve() const {
- return TEmptyAux();
- }
- static TSlot<TEmptyAux> NullElem() {
- return {nullptr};
- }
- static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) {
- return {msg};
- }
- };
- enum TPushResult {
- };
- typename TBase = TEmpty,
- typename TAux = TEmptyAux>
- struct TMsgBunch: public TBase {
- static constexpr size_t RELEASE_SIZE = BUNCH_SIZE * 2;
- ui64 FirstSlot;
- TSlot<TAux> LinkArray[BUNCH_SIZE];
- TMsgBunch* volatile NextBunch;
- TMsgBunch* volatile BackLink;
- ui64 volatile Token;
- TMsgBunch* volatile NextToken;
- /* this push can return PUSH_RESULT_BLOCKED */
+ */
+#include <util/generic/deque.h>
+#include <util/generic/ptr.h>
+#include <util/system/atomic.h>
+#include <util/system/spinlock.h>
+#include "tune.h"
+namespace NThreading {
+ namespace NReadAsFilledPrivate {
+ typedef void* TMsgLink;
+ static constexpr ui32 DEFAULT_BUNCH_SIZE = 251;
+ struct TEmpty {
+ };
+ struct TEmptyAux {
+ TEmptyAux Retrieve() const {
+ return TEmptyAux();
+ }
+ void Store(TEmptyAux&) {
+ }
+ static constexpr TEmptyAux Zero() {
+ return TEmptyAux();
+ }
+ };
+ template <typename TAux>
+ struct TSlot {
+ TMsgLink volatile Msg;
+ TAux AuxiliaryData;
+ inline void Store(TAux& aux) {
+ AuxiliaryData.Store(aux);
+ }
+ inline TAux Retrieve() const {
+ return AuxiliaryData.Retrieve();
+ }
+ static TSlot<TAux> NullElem() {
+ return {nullptr, TAux::Zero()};
+ }
+ static TSlot<TAux> Pair(TMsgLink msg, TAux aux) {
+ return {msg, std::move(aux)};
+ }
+ };
+ template <>
+ struct TSlot<TEmptyAux> {
+ TMsgLink volatile Msg;
+ inline void Store(TEmptyAux&) {
+ }
+ inline TEmptyAux Retrieve() const {
+ return TEmptyAux();
+ }
+ static TSlot<TEmptyAux> NullElem() {
+ return {nullptr};
+ }
+ static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) {
+ return {msg};
+ }
+ };
+ enum TPushResult {
+ };
+ typename TBase = TEmpty,
+ typename TAux = TEmptyAux>
+ struct TMsgBunch: public TBase {
+ static constexpr size_t RELEASE_SIZE = BUNCH_SIZE * 2;
+ ui64 FirstSlot;
+ TSlot<TAux> LinkArray[BUNCH_SIZE];
+ TMsgBunch* volatile NextBunch;
+ TMsgBunch* volatile BackLink;
+ ui64 volatile Token;
+ TMsgBunch* volatile NextToken;
+ /* this push can return PUSH_RESULT_BLOCKED */
inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) {
- if (Y_UNLIKELY(slot < FirstSlot)) {
- }
- if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) {
- }
- LinkArray[slot - FirstSlot].Store(auxiliary);
- AtomicSet(LinkArray[slot - FirstSlot].Msg, msg);
- return PUSH_RESULT_OK;
- }
- inline bool IsSlotHere(ui64 slot) {
- return slot < FirstSlot + BUNCH_SIZE;
- }
- inline TMsgLink GetSlot(ui64 slot) const {
- return AtomicGet(LinkArray[slot - FirstSlot].Msg);
- }
- inline TSlot<TAux> GetSlotAux(ui64 slot) const {
- auto msg = GetSlot(slot);
- auto aux = LinkArray[slot - FirstSlot].Retrieve();
- return TSlot<TAux>::Pair(msg, aux);
- }
- inline TMsgBunch* GetNextBunch() const {
- return AtomicGet(NextBunch);
- }
- inline bool SetNextBunch(TMsgBunch* ptr) {
- return AtomicCas(&NextBunch, ptr, nullptr);
- }
- inline TMsgBunch* GetBackLink() const {
- return AtomicGet(BackLink);
- }
- inline TMsgBunch* GetToken(ui64 slot) {
- return reinterpret_cast<TMsgBunch*>(
- LinkArray[slot - FirstSlot].Msg);
- }
- inline void IncrementToken() {
- AtomicIncrement(Token);
- }
- // the object could be destroyed after this method
- inline void DecrementToken() {
- if (Y_UNLIKELY(AtomicDecrement(Token) == RELEASE_SIZE)) {
- Release(this);
- AtomicGet(NextToken)->DecrementToken();
- // this could be invalid here
- }
- }
- // the object could be destroyed after this method
- inline void SetNextToken(TMsgBunch* next) {
- AtomicSet(NextToken, next);
+ if (Y_UNLIKELY(slot < FirstSlot)) {
+ }
+ if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) {
+ }
+ LinkArray[slot - FirstSlot].Store(auxiliary);
+ AtomicSet(LinkArray[slot - FirstSlot].Msg, msg);
+ return PUSH_RESULT_OK;
+ }
+ inline bool IsSlotHere(ui64 slot) {
+ return slot < FirstSlot + BUNCH_SIZE;
+ }
+ inline TMsgLink GetSlot(ui64 slot) const {
+ return AtomicGet(LinkArray[slot - FirstSlot].Msg);
+ }
+ inline TSlot<TAux> GetSlotAux(ui64 slot) const {
+ auto msg = GetSlot(slot);
+ auto aux = LinkArray[slot - FirstSlot].Retrieve();
+ return TSlot<TAux>::Pair(msg, aux);
+ }
+ inline TMsgBunch* GetNextBunch() const {
+ return AtomicGet(NextBunch);
+ }
+ inline bool SetNextBunch(TMsgBunch* ptr) {
+ return AtomicCas(&NextBunch, ptr, nullptr);
+ }
+ inline TMsgBunch* GetBackLink() const {
+ return AtomicGet(BackLink);
+ }
+ inline TMsgBunch* GetToken(ui64 slot) {
+ return reinterpret_cast<TMsgBunch*>(
+ LinkArray[slot - FirstSlot].Msg);
+ }
+ inline void IncrementToken() {
+ AtomicIncrement(Token);
+ }
+ // the object could be destroyed after this method
+ inline void DecrementToken() {
+ if (Y_UNLIKELY(AtomicDecrement(Token) == RELEASE_SIZE)) {
+ Release(this);
+ AtomicGet(NextToken)->DecrementToken();
+ // this could be invalid here
+ }
+ }
+ // the object could be destroyed after this method
+ inline void SetNextToken(TMsgBunch* next) {
+ AtomicSet(NextToken, next);
- Release(this);
- next->DecrementToken();
- }
- // this could be invalid here
- }
- TMsgBunch(ui64 start, TMsgBunch* backLink) {
- AtomicSet(FirstSlot, start);
- memset(&LinkArray, 0, sizeof(LinkArray));
- AtomicSet(NextBunch, nullptr);
- AtomicSet(BackLink, backLink);
- AtomicSet(Token, 1);
- AtomicSet(NextToken, nullptr);
- }
- static void Release(TMsgBunch* block) {
- auto backLink = AtomicGet(block->BackLink);
- if (backLink == nullptr) {
- return;
- }
- AtomicSet(block->BackLink, nullptr);
- do {
- auto bbackLink = backLink->BackLink;
- delete backLink;
- backLink = bbackLink;
- } while (backLink != nullptr);
- }
- void Destroy() {
- for (auto tail = BackLink; tail != nullptr;) {
- auto next = tail->BackLink;
- delete tail;
- tail = next;
- }
- for (auto next = this; next != nullptr;) {
- auto nnext = next->NextBunch;
- delete next;
- next = nnext;
- }
- }
- };
- typename TBunchBase = NReadAsFilledPrivate::TEmpty,
- typename TAux = TEmptyAux>
- class TWriteBucket {
- public:
- using TUsingAux = TAux; // for TReadBucket binding
- using TBunch = TMsgBunch<BUNCH_SIZE, TBunchBase, TAux>;
- TWriteBucket(TBunch* bunch = new TBunch(0, nullptr)) {
- AtomicSet(LastBunch, bunch);
- AtomicSet(SlotCounter, 0);
- }
- TWriteBucket(TWriteBucket&& move)
- : LastBunch(move.LastBunch)
- , SlotCounter(move.SlotCounter)
- {
- move.LastBunch = nullptr;
- }
- ~TWriteBucket() {
- if (LastBunch != nullptr) {
- LastBunch->Destroy();
- }
- }
- inline void Push(TMsgLink msg, TAux aux) {
- ui64 pushSlot = AtomicGetAndIncrement(SlotCounter);
- TBunch* hintBunch = GetLastBunch();
- for (;;) {
- auto hint = hintBunch->Push(msg, pushSlot, aux);
- if (Y_LIKELY(hint == PUSH_RESULT_OK)) {
- return;
- }
- HandleHint(hintBunch, hint);
- }
- }
- protected:
- template <typename, template <typename, typename...> class>
- friend class TReadBucket;
- TBunch* volatile LastBunch; // Hint
- volatile ui64 SlotCounter;
- inline TBunch* GetLastBunch() const {
- return AtomicGet(LastBunch);
- }
- void HandleHint(TBunch*& hintBunch, TPushResult hint) {
- hintBunch = hintBunch->GetBackLink();
- return;
- }
- auto nextBunch = hintBunch->GetNextBunch();
- if (nextBunch == nullptr) {
- auto first = hintBunch->FirstSlot + BUNCH_SIZE;
- nextBunch = new TBunch(first, hintBunch);
- if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) {
- delete nextBunch;
- nextBunch = hintBunch->GetNextBunch();
- }
- }
- // hintBunch could not be freed here so it cannot be reused
- // it's alright if this CAS was not succeeded,
- // it means that other thread did that recently
- AtomicCas(&LastBunch, nextBunch, hintBunch);
- hintBunch = nextBunch;
- }
- };
+ Release(this);
+ next->DecrementToken();
+ }
+ // this could be invalid here
+ }
+ TMsgBunch(ui64 start, TMsgBunch* backLink) {
+ AtomicSet(FirstSlot, start);
+ memset(&LinkArray, 0, sizeof(LinkArray));
+ AtomicSet(NextBunch, nullptr);
+ AtomicSet(BackLink, backLink);
+ AtomicSet(Token, 1);
+ AtomicSet(NextToken, nullptr);
+ }
+ static void Release(TMsgBunch* block) {
+ auto backLink = AtomicGet(block->BackLink);
+ if (backLink == nullptr) {
+ return;
+ }
+ AtomicSet(block->BackLink, nullptr);
+ do {
+ auto bbackLink = backLink->BackLink;
+ delete backLink;
+ backLink = bbackLink;
+ } while (backLink != nullptr);
+ }
+ void Destroy() {
+ for (auto tail = BackLink; tail != nullptr;) {
+ auto next = tail->BackLink;
+ delete tail;
+ tail = next;
+ }
+ for (auto next = this; next != nullptr;) {
+ auto nnext = next->NextBunch;
+ delete next;
+ next = nnext;
+ }
+ }
+ };
+ typename TBunchBase = NReadAsFilledPrivate::TEmpty,
+ typename TAux = TEmptyAux>
+ class TWriteBucket {
+ public:
+ using TUsingAux = TAux; // for TReadBucket binding
+ using TBunch = TMsgBunch<BUNCH_SIZE, TBunchBase, TAux>;
+ TWriteBucket(TBunch* bunch = new TBunch(0, nullptr)) {
+ AtomicSet(LastBunch, bunch);
+ AtomicSet(SlotCounter, 0);
+ }
+ TWriteBucket(TWriteBucket&& move)
+ : LastBunch(move.LastBunch)
+ , SlotCounter(move.SlotCounter)
+ {
+ move.LastBunch = nullptr;
+ }
+ ~TWriteBucket() {
+ if (LastBunch != nullptr) {
+ LastBunch->Destroy();
+ }
+ }
+ inline void Push(TMsgLink msg, TAux aux) {
+ ui64 pushSlot = AtomicGetAndIncrement(SlotCounter);
+ TBunch* hintBunch = GetLastBunch();
+ for (;;) {
+ auto hint = hintBunch->Push(msg, pushSlot, aux);
+ if (Y_LIKELY(hint == PUSH_RESULT_OK)) {
+ return;
+ }
+ HandleHint(hintBunch, hint);
+ }
+ }
+ protected:
+ template <typename, template <typename, typename...> class>
+ friend class TReadBucket;
+ TBunch* volatile LastBunch; // Hint
+ volatile ui64 SlotCounter;
+ inline TBunch* GetLastBunch() const {
+ return AtomicGet(LastBunch);
+ }
+ void HandleHint(TBunch*& hintBunch, TPushResult hint) {
+ hintBunch = hintBunch->GetBackLink();
+ return;
+ }
+ auto nextBunch = hintBunch->GetNextBunch();
+ if (nextBunch == nullptr) {
+ auto first = hintBunch->FirstSlot + BUNCH_SIZE;
+ nextBunch = new TBunch(first, hintBunch);
+ if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) {
+ delete nextBunch;
+ nextBunch = hintBunch->GetNextBunch();
+ }
+ }
+ // hintBunch could not be freed here so it cannot be reused
+ // it's alright if this CAS was not succeeded,
+ // it means that other thread did that recently
+ AtomicCas(&LastBunch, nextBunch, hintBunch);
+ hintBunch = nextBunch;
+ }
+ };
template <typename TWBucket = TWriteBucket<>,
template <typename, typename...> class TContainer = TDeque>
- class TReadBucket {
- public:
- using TAux = typename TWBucket::TUsingAux;
- using TBunch = typename TWBucket::TBunch;
- static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 5;
- TReadBucket(TWBucket* writer)
- : Writer(writer)
- , ReadBunch(writer->GetLastBunch())
- , LastKnownPushBunch(writer->GetLastBunch())
- {
- ReadBunch->DecrementToken(); // no previous token
- }
- TReadBucket(TReadBucket toCopy, TWBucket* writer)
- : TReadBucket(std::move(toCopy))
- {
- Writer = writer;
- }
- ui64 ReadyCount() const {
- return AtomicGet(Writer->SlotCounter) - ReadSlot;
- }
- TMsgLink Pop() {
- return PopAux().Msg;
- }
- TMsgLink Peek() {
- return PeekAux().Msg;
- }
- TSlot<TAux> PopAux() {
- for (;;) {
- if (Y_UNLIKELY(ReadNow.size() != 0)) {
- auto result = PopSkipped();
- if (Y_LIKELY(result.Msg != nullptr)) {
- return result;
- }
- }
- if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {
- if (Y_LIKELY(!RereadPushSlot())) {
- return TSlot<TAux>::NullElem();
- }
- continue;
- }
- if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {
- if (Y_UNLIKELY(!SwitchToNextBunch())) {
- return TSlot<TAux>::NullElem();
- }
- }
- auto result = ReadBunch->GetSlotAux(ReadSlot);
- if (Y_LIKELY(result.Msg != nullptr)) {
- ++ReadSlot;
- return result;
- }
- result = StubbornPop();
- if (Y_LIKELY(result.Msg != nullptr)) {
- return result;
- }
- }
- }
- TSlot<TAux> PeekAux() {
- for (;;) {
- if (Y_UNLIKELY(ReadNow.size() != 0)) {
- auto result = PeekSkipped();
- if (Y_LIKELY(result.Msg != nullptr)) {
- return result;
- }
- }
- if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {
- if (Y_LIKELY(!RereadPushSlot())) {
- return TSlot<TAux>::NullElem();
- }
- continue;
- }
- if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {
- if (Y_UNLIKELY(!SwitchToNextBunch())) {
- return TSlot<TAux>::NullElem();
- }
- }
- auto result = ReadBunch->GetSlotAux(ReadSlot);
- if (Y_LIKELY(result.Msg != nullptr)) {
- return result;
- }
- result = StubbornPeek();
- if (Y_LIKELY(result.Msg != nullptr)) {
- return result;
- }
- }
- }
- private:
- TWBucket* Writer;
- TBunch* ReadBunch;
- ui64 ReadSlot = 0;
- TBunch* LastKnownPushBunch;
- ui64 LastKnownPushSlot = 0;
- struct TSkipItem {
- TBunch* Bunch;
- ui64 Slot;
- TBunch* Token;
- };
- TContainer<TSkipItem> ReadNow;
- TContainer<TSkipItem> ReadLater;
- void AddToReadLater() {
- ReadLater.push_back({ReadBunch, ReadSlot, LastKnownPushBunch});
- LastKnownPushBunch->IncrementToken();
- ++ReadSlot;
- }
- // MUST BE: ReadSlot == LastKnownPushSlot
- bool RereadPushSlot() {
- ReadNow = std::move(ReadLater);
- ReadLater.clear();
- auto oldSlot = LastKnownPushSlot;
- auto currentPushBunch = Writer->GetLastBunch();
- auto currentPushSlot = AtomicGet(Writer->SlotCounter);
- if (currentPushBunch != LastKnownPushBunch) {
- // LastKnownPushBunch could be invalid after this line
- LastKnownPushBunch->SetNextToken(currentPushBunch);
- }
- LastKnownPushBunch = currentPushBunch;
- LastKnownPushSlot = currentPushSlot;
- return oldSlot != LastKnownPushSlot;
- }
- bool SwitchToNextBunch() {
- for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
- auto next = ReadBunch->GetNextBunch();
- if (next != nullptr) {
- ReadBunch = next;
- return true;
- }
- SpinLockPause();
- }
- return false;
- }
- TSlot<TAux> StubbornPop() {
- for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
- auto result = ReadBunch->GetSlotAux(ReadSlot);
- if (Y_LIKELY(result.Msg != nullptr)) {
- ++ReadSlot;
- return result;
- }
- SpinLockPause();
- }
- AddToReadLater();
- return TSlot<TAux>::NullElem();
- }
- TSlot<TAux> StubbornPeek() {
- for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
- auto result = ReadBunch->GetSlotAux(ReadSlot);
- if (Y_LIKELY(result.Msg != nullptr)) {
- return result;
- }
- SpinLockPause();
- }
- AddToReadLater();
- return TSlot<TAux>::NullElem();
- }
- TSlot<TAux> PopSkipped() {
- do {
- auto elem = ReadNow.front();
- ReadNow.pop_front();
- auto result = elem.Bunch->GetSlotAux(elem.Slot);
- if (Y_LIKELY(result.Msg != nullptr)) {
- elem.Token->DecrementToken();
- return result;
- }
- ReadLater.emplace_back(elem);
- } while (ReadNow.size() > 0);
- return TSlot<TAux>::NullElem();
- }
- TSlot<TAux> PeekSkipped() {
- do {
- auto elem = ReadNow.front();
- auto result = elem.Bunch->GetSlotAux(elem.Slot);
- if (Y_LIKELY(result.Msg != nullptr)) {
- return result;
- }
- ReadNow.pop_front();
- ReadLater.emplace_back(elem);
- } while (ReadNow.size() > 0);
- return TSlot<TAux>::NullElem();
- }
- };
- struct TDefaultParams {
- static constexpr ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE;
- using TBunchBase = TEmpty;
+ class TReadBucket {
+ public:
+ using TAux = typename TWBucket::TUsingAux;
+ using TBunch = typename TWBucket::TBunch;
+ static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 5;
+ TReadBucket(TWBucket* writer)
+ : Writer(writer)
+ , ReadBunch(writer->GetLastBunch())
+ , LastKnownPushBunch(writer->GetLastBunch())
+ {
+ ReadBunch->DecrementToken(); // no previous token
+ }
+ TReadBucket(TReadBucket toCopy, TWBucket* writer)
+ : TReadBucket(std::move(toCopy))
+ {
+ Writer = writer;
+ }
+ ui64 ReadyCount() const {
+ return AtomicGet(Writer->SlotCounter) - ReadSlot;
+ }
+ TMsgLink Pop() {
+ return PopAux().Msg;
+ }
+ TMsgLink Peek() {
+ return PeekAux().Msg;
+ }
+ TSlot<TAux> PopAux() {
+ for (;;) {
+ if (Y_UNLIKELY(ReadNow.size() != 0)) {
+ auto result = PopSkipped();
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ return result;
+ }
+ }
+ if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {
+ if (Y_LIKELY(!RereadPushSlot())) {
+ return TSlot<TAux>::NullElem();
+ }
+ continue;
+ }
+ if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {
+ if (Y_UNLIKELY(!SwitchToNextBunch())) {
+ return TSlot<TAux>::NullElem();
+ }
+ }
+ auto result = ReadBunch->GetSlotAux(ReadSlot);
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ ++ReadSlot;
+ return result;
+ }
+ result = StubbornPop();
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ return result;
+ }
+ }
+ }
+ TSlot<TAux> PeekAux() {
+ for (;;) {
+ if (Y_UNLIKELY(ReadNow.size() != 0)) {
+ auto result = PeekSkipped();
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ return result;
+ }
+ }
+ if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {
+ if (Y_LIKELY(!RereadPushSlot())) {
+ return TSlot<TAux>::NullElem();
+ }
+ continue;
+ }
+ if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {
+ if (Y_UNLIKELY(!SwitchToNextBunch())) {
+ return TSlot<TAux>::NullElem();
+ }
+ }
+ auto result = ReadBunch->GetSlotAux(ReadSlot);
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ return result;
+ }
+ result = StubbornPeek();
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ return result;
+ }
+ }
+ }
+ private:
+ TWBucket* Writer;
+ TBunch* ReadBunch;
+ ui64 ReadSlot = 0;
+ TBunch* LastKnownPushBunch;
+ ui64 LastKnownPushSlot = 0;
+ struct TSkipItem {
+ TBunch* Bunch;
+ ui64 Slot;
+ TBunch* Token;
+ };
+ TContainer<TSkipItem> ReadNow;
+ TContainer<TSkipItem> ReadLater;
+ void AddToReadLater() {
+ ReadLater.push_back({ReadBunch, ReadSlot, LastKnownPushBunch});
+ LastKnownPushBunch->IncrementToken();
+ ++ReadSlot;
+ }
+ // MUST BE: ReadSlot == LastKnownPushSlot
+ bool RereadPushSlot() {
+ ReadNow = std::move(ReadLater);
+ ReadLater.clear();
+ auto oldSlot = LastKnownPushSlot;
+ auto currentPushBunch = Writer->GetLastBunch();
+ auto currentPushSlot = AtomicGet(Writer->SlotCounter);
+ if (currentPushBunch != LastKnownPushBunch) {
+ // LastKnownPushBunch could be invalid after this line
+ LastKnownPushBunch->SetNextToken(currentPushBunch);
+ }
+ LastKnownPushBunch = currentPushBunch;
+ LastKnownPushSlot = currentPushSlot;
+ return oldSlot != LastKnownPushSlot;
+ }
+ bool SwitchToNextBunch() {
+ for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
+ auto next = ReadBunch->GetNextBunch();
+ if (next != nullptr) {
+ ReadBunch = next;
+ return true;
+ }
+ SpinLockPause();
+ }
+ return false;
+ }
+ TSlot<TAux> StubbornPop() {
+ for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
+ auto result = ReadBunch->GetSlotAux(ReadSlot);
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ ++ReadSlot;
+ return result;
+ }
+ SpinLockPause();
+ }
+ AddToReadLater();
+ return TSlot<TAux>::NullElem();
+ }
+ TSlot<TAux> StubbornPeek() {
+ for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
+ auto result = ReadBunch->GetSlotAux(ReadSlot);
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ return result;
+ }
+ SpinLockPause();
+ }
+ AddToReadLater();
+ return TSlot<TAux>::NullElem();
+ }
+ TSlot<TAux> PopSkipped() {
+ do {
+ auto elem = ReadNow.front();
+ ReadNow.pop_front();
+ auto result = elem.Bunch->GetSlotAux(elem.Slot);
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ elem.Token->DecrementToken();
+ return result;
+ }
+ ReadLater.emplace_back(elem);
+ } while (ReadNow.size() > 0);
+ return TSlot<TAux>::NullElem();
+ }
+ TSlot<TAux> PeekSkipped() {
+ do {
+ auto elem = ReadNow.front();
+ auto result = elem.Bunch->GetSlotAux(elem.Slot);
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ return result;
+ }
+ ReadNow.pop_front();
+ ReadLater.emplace_back(elem);
+ } while (ReadNow.size() > 0);
+ return TSlot<TAux>::NullElem();
+ }
+ };
+ struct TDefaultParams {
+ static constexpr ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE;
+ using TBunchBase = TEmpty;
template <typename TElem, typename... TRest>
using TContainer = TDeque<TElem, TRest...>;
- static constexpr bool DeleteItems = true;
- };
- } //namespace NReadAsFilledPrivate
- DeclareTuneValueParam(TRaFQueueBunchSize, ui32, BUNCH_SIZE);
- DeclareTuneTypeParam(TRaFQueueBunchBase, TBunchBase);
- DeclareTuneContainer(TRaFQueueSkipContainer, TContainer);
- DeclareTuneValueParam(TRaFQueueDeleteItems, bool, DeleteItems);
+ static constexpr bool DeleteItems = true;
+ };
+ } //namespace NReadAsFilledPrivate
+ DeclareTuneValueParam(TRaFQueueBunchSize, ui32, BUNCH_SIZE);
+ DeclareTuneTypeParam(TRaFQueueBunchBase, TBunchBase);
+ DeclareTuneContainer(TRaFQueueSkipContainer, TContainer);
+ DeclareTuneValueParam(TRaFQueueDeleteItems, bool, DeleteItems);
template <typename TItem = void, typename... TParams>
- class TReadAsFilledQueue {
- private:
- using TTuned = TTune<NReadAsFilledPrivate::TDefaultParams, TParams...>;
- static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE;
- using TBunchBase = typename TTuned::TBunchBase;
+ class TReadAsFilledQueue {
+ private:
+ using TTuned = TTune<NReadAsFilledPrivate::TDefaultParams, TParams...>;
+ static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE;
+ using TBunchBase = typename TTuned::TBunchBase;
template <typename TElem, typename... TRest>
- using TContainer =
- typename TTuned::template TContainer<TElem, TRest...>;
- using TWriteBucket =
- NReadAsFilledPrivate::TWriteBucket<BUNCH_SIZE, TBunchBase>;
- using TReadBucket =
- NReadAsFilledPrivate::TReadBucket<TWriteBucket, TContainer>;
- public:
- TReadAsFilledQueue()
- : RBucket(&WBucket)
- {
- }
- ~TReadAsFilledQueue() {
- if (TTuned::DeleteItems) {
- for (;;) {
- auto msg = Pop();
- if (msg == nullptr) {
- break;
- }
- TDelete::Destroy(msg);
- }
- }
- }
- void Push(TItem* msg) {
- WBucket.Push((void*)msg, NReadAsFilledPrivate::TEmptyAux());
- }
- TItem* Pop() {
- return (TItem*)RBucket.Pop();
- }
- TItem* Peek() {
- return (TItem*)RBucket.Peek();
- }
- protected:
- TWriteBucket WBucket;
- TReadBucket RBucket;
- };
+ using TContainer =
+ typename TTuned::template TContainer<TElem, TRest...>;
+ using TWriteBucket =
+ NReadAsFilledPrivate::TWriteBucket<BUNCH_SIZE, TBunchBase>;
+ using TReadBucket =
+ NReadAsFilledPrivate::TReadBucket<TWriteBucket, TContainer>;
+ public:
+ TReadAsFilledQueue()
+ : RBucket(&WBucket)
+ {
+ }
+ ~TReadAsFilledQueue() {
+ if (TTuned::DeleteItems) {
+ for (;;) {
+ auto msg = Pop();
+ if (msg == nullptr) {
+ break;
+ }
+ TDelete::Destroy(msg);
+ }
+ }
+ }
+ void Push(TItem* msg) {
+ WBucket.Push((void*)msg, NReadAsFilledPrivate::TEmptyAux());
+ }
+ TItem* Pop() {
+ return (TItem*)RBucket.Pop();
+ }
+ TItem* Peek() {
+ return (TItem*)RBucket.Peek();
+ }
+ protected:
+ TWriteBucket WBucket;
+ TReadBucket RBucket;
+ };
diff --git a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp
index 2bd0c29821..00dbfeaa64 100644
--- a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp
+++ b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp
@@ -1 +1 @@
-#include "mpsc_vinfarr_obstructive.h"
+#include "mpsc_vinfarr_obstructive.h"
diff --git a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h
index 5f91f1b5a8..3e1ae92342 100644
--- a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h
+++ b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h
@@ -1,528 +1,528 @@
-#pragma once
- Semi-wait-free queue, multiple producers - one consumer. Strict order.
- The queue algorithm is using concept of virtual infinite array.
- A producer takes a number from a counter and atomicaly increments the counter.
- The number taken is a number of a slot for the producer to put a new message
- into infinite array.
- Then producer constructs a virtual infinite array by bidirectional linked list
- of blocks. Each block contains several slots.
- There is a hint pointer which optimisticly points to the last block
- of the list and never goes backward.
- Consumer exploits the property of the hint pointer always going forward
- to free old blocks eventually. Consumer periodically read the hint pointer
- and the counter and thus deduce producers which potentially holds the pointer
- to a block. Consumer can free the block if all that producers filled their
- slots and left the queue.
- No producer can stop the progress for other producers.
- Consumer can obstruct a slot of a delayed producer by putting special mark.
- Thus no producer can stop the progress for consumer.
- But a slow producer may be forced to retry unlimited number of times.
- Though it's very unlikely for a non-preempted producer to be obstructed.
- That's why the algorithm is semi-wait-free.
- WARNING: there is no wait&notify mechanic for consumer,
- consumer receives nullptr if queue was empty.
- WARNING: though the algorithm itself is lock-free
- but producers and consumer could be blocked by memory allocator
- WARNING: copy constructers of the queue are not thread-safe
- */
-#include <util/generic/noncopyable.h>
-#include <util/generic/ptr.h>
-#include <util/system/atomic.h>
-#include <util/system/spinlock.h>
-#include "tune.h"
-namespace NThreading {
- namespace NObstructiveQueuePrivate {
- typedef void* TMsgLink;
- struct TEmpty {
- };
- struct TEmptyAux {
- TEmptyAux Retrieve() const {
- return TEmptyAux();
- }
- void Store(TEmptyAux&) {
- }
- static constexpr TEmptyAux Zero() {
- return TEmptyAux();
- }
- };
- template <typename TAux>
- struct TSlot {
- TMsgLink volatile Msg;
- TAux AuxiliaryData;
- inline void Store(TAux& aux) {
- AuxiliaryData.Store(aux);
- }
- inline TAux Retrieve() const {
- return AuxiliaryData.Retrieve();
- }
- static TSlot<TAux> NullElem() {
- return {nullptr, TAux::Zero()};
- }
- static TSlot<TAux> Pair(TMsgLink msg, TAux aux) {
- return {msg, std::move(aux)};
- }
- };
- template <>
- struct TSlot<TEmptyAux> {
- TMsgLink volatile Msg;
- inline void Store(TEmptyAux&) {
- }
- inline TEmptyAux Retrieve() const {
- return TEmptyAux();
- }
- static TSlot<TEmptyAux> NullElem() {
- return {nullptr};
- }
- static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) {
- return {msg};
- }
- };
- enum TPushResult {
- };
- template <typename TAux, ui32 BUNCH_SIZE, typename TBase = TEmpty>
- struct TMsgBunch: public TBase {
- ui64 FirstSlot;
- TSlot<TAux> LinkArray[BUNCH_SIZE];
- TMsgBunch* volatile NextBunch;
- TMsgBunch* volatile BackLink;
- ui64 volatile Token;
- TMsgBunch* volatile NextToken;
- /* this push can return PUSH_RESULT_BLOCKED */
- inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) {
- if (Y_UNLIKELY(slot < FirstSlot)) {
- }
- if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) {
- }
- LinkArray[slot - FirstSlot].Store(auxiliary);
- auto oldValue = AtomicSwap(&LinkArray[slot - FirstSlot].Msg, msg);
- if (Y_LIKELY(oldValue == nullptr)) {
- return PUSH_RESULT_OK;
- } else {
- LeaveBlocked(oldValue);
- }
- }
- inline bool IsSlotHere(ui64 slot) {
- return slot < FirstSlot + BUNCH_SIZE;
- }
- inline TMsgLink GetSlot(ui64 slot) const {
- return AtomicGet(LinkArray[slot - FirstSlot].Msg);
- }
- inline TSlot<TAux> GetSlotAux(ui64 slot) const {
- auto msg = GetSlot(slot);
- auto aux = LinkArray[slot - FirstSlot].Retrieve();
- return TSlot<TAux>::Pair(msg, aux);
- }
- void LeaveBlocked(ui64 slot) {
- auto token = GetToken(slot);
- token->DecrementToken();
- }
- void LeaveBlocked(TMsgLink msg) {
- auto token = reinterpret_cast<TMsgBunch*>(msg);
- token->DecrementToken();
- }
- TSlot<TAux> BlockSlotAux(ui64 slot, TMsgBunch* token) {
- auto old =
- AtomicSwap(&LinkArray[slot - FirstSlot].Msg, (TMsgLink)token);
- if (old == nullptr) {
- // It's valid to increment after AtomicCas
- // because token will release data only after SetNextToken
- token->IncrementToken();
- return TSlot<TAux>::NullElem();
- }
- return TSlot<TAux>::Pair(old, LinkArray[slot - FirstSlot].Retrieve());
- }
- inline TMsgBunch* GetNextBunch() const {
- return AtomicGet(NextBunch);
- }
- inline bool SetNextBunch(TMsgBunch* ptr) {
- return AtomicCas(&NextBunch, ptr, nullptr);
- }
- inline TMsgBunch* GetBackLink() const {
- return AtomicGet(BackLink);
- }
- inline TMsgBunch* GetToken(ui64 slot) {
- return reinterpret_cast<TMsgBunch*>(LinkArray[slot - FirstSlot].Msg);
- }
- inline void IncrementToken() {
- AtomicIncrement(Token);
- }
- // the object could be destroyed after this method
- inline void DecrementToken() {
- if (Y_UNLIKELY(AtomicDecrement(Token) == BUNCH_SIZE)) {
- Release(this);
- AtomicGet(NextToken)->DecrementToken();
- // this could be invalid here
- }
- }
- // the object could be destroyed after this method
- inline void SetNextToken(TMsgBunch* next) {
- AtomicSet(NextToken, next);
- if (Y_UNLIKELY(AtomicAdd(Token, BUNCH_SIZE) == BUNCH_SIZE)) {
- Release(this);
- next->DecrementToken();
- }
- // this could be invalid here
- }
- TMsgBunch(ui64 start, TMsgBunch* backLink) {
- AtomicSet(FirstSlot, start);
- memset(&LinkArray, 0, sizeof(LinkArray));
- AtomicSet(NextBunch, nullptr);
- AtomicSet(BackLink, backLink);
- AtomicSet(Token, 1);
- AtomicSet(NextToken, nullptr);
- }
- static void Release(TMsgBunch* bunch) {
- auto backLink = AtomicGet(bunch->BackLink);
- if (backLink == nullptr) {
- return;
- }
- AtomicSet(bunch->BackLink, nullptr);
- do {
- auto bbackLink = backLink->BackLink;
- delete backLink;
- backLink = bbackLink;
- } while (backLink != nullptr);
- }
- void Destroy() {
- for (auto tail = BackLink; tail != nullptr;) {
- auto next = tail->BackLink;
- delete tail;
- tail = next;
- }
- for (auto next = this; next != nullptr;) {
- auto nnext = next->NextBunch;
- delete next;
- next = nnext;
- }
- }
- };
- template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase = TEmpty>
- class TWriteBucket {
- public:
- static const ui64 GROSS_SIZE;
- using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>;
- TWriteBucket(TBunch* bunch = new TBunch(0, nullptr))
- : LastBunch(bunch)
- , SlotCounter(0)
- {
- }
- TWriteBucket(TWriteBucket&& move)
- : LastBunch(move.LastBunch)
- , SlotCounter(move.SlotCounter)
- {
- move.LastBunch = nullptr;
- }
- ~TWriteBucket() {
- if (LastBunch != nullptr) {
- LastBunch->Destroy();
- }
- }
- inline bool Push(TMsgLink msg, TAux aux) {
- ui64 pushSlot = AtomicGetAndIncrement(SlotCounter);
- TBunch* hintBunch = GetLastBunch();
- for (;;) {
- auto hint = hintBunch->Push(msg, pushSlot, aux);
- if (Y_LIKELY(hint == PUSH_RESULT_OK)) {
- return true;
- }
- bool hhResult = HandleHint(hintBunch, hint);
- if (Y_UNLIKELY(!hhResult)) {
- return false;
- }
- }
- }
- protected:
- template <typename, ui32, typename>
- friend class TReadBucket;
- TBunch* volatile LastBunch; // Hint
- volatile ui64 SlotCounter;
- inline TBunch* GetLastBunch() const {
- return AtomicGet(LastBunch);
- }
- bool HandleHint(TBunch*& hintBunch, TPushResult hint) {
- return false;
- }
- hintBunch = hintBunch->GetBackLink();
- return true;
- }
- auto nextBunch = hintBunch->GetNextBunch();
- if (nextBunch == nullptr) {
- auto first = hintBunch->FirstSlot + BUNCH_SIZE;
- nextBunch = new TBunch(first, hintBunch);
- if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) {
- delete nextBunch;
- nextBunch = hintBunch->GetNextBunch();
- }
- }
- // hintBunch could not be freed here so it cannot be reused
- // it's alright if this CAS was not succeeded,
- // it means that other thread did that recently
- AtomicCas(&LastBunch, nextBunch, hintBunch);
- hintBunch = nextBunch;
- return true;
- }
- };
- template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase>
- class TReadBucket {
- public:
- static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 20;
- using TWBucket = TWriteBucket<TAux, BUNCH_SIZE, TBunchBase>;
- using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>;
- TReadBucket(TWBucket* writer)
- : Writer(writer)
- , ReadBunch(writer->GetLastBunch())
- , LastKnownPushBunch(writer->GetLastBunch())
- {
- ReadBunch->DecrementToken(); // no previous token
- }
- TReadBucket(TReadBucket toCopy, TWBucket* writer)
- : TReadBucket(std::move(toCopy))
- {
- Writer = writer;
- }
- ui64 ReadyCount() const {
- return AtomicGet(Writer->SlotCounter) - ReadSlot;
- }
- inline TMsgLink Pop() {
- return PopAux().Msg;
- }
- inline TSlot<TAux> PopAux() {
- for (;;) {
- if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {
- if (Y_LIKELY(!RereadPushSlot())) {
- return TSlot<TAux>::NullElem();
- }
- }
- if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {
- if (Y_UNLIKELY(!SwitchToNextBunch())) {
- return TSlot<TAux>::NullElem();
- }
- }
- auto result = ReadBunch->GetSlotAux(ReadSlot);
- if (Y_LIKELY(result.Msg != nullptr)) {
- ++ReadSlot;
- return result;
- }
- if (ReadSlot + 1 == AtomicGet(Writer->SlotCounter)) {
- return TSlot<TAux>::NullElem();
- }
- result = StubbornPopAux();
- if (result.Msg != nullptr) {
- return result;
- }
- }
- }
- private:
- TWBucket* Writer;
- TBunch* ReadBunch;
- ui64 ReadSlot = 0;
- TBunch* LastKnownPushBunch;
- ui64 LastKnownPushSlot = 0;
- // MUST BE: ReadSlot == LastKnownPushSlot
- bool RereadPushSlot() {
- auto oldSlot = LastKnownPushSlot;
- auto currentPushBunch = Writer->GetLastBunch();
- auto currentPushSlot = AtomicGet(Writer->SlotCounter);
- if (currentPushBunch != LastKnownPushBunch) {
- // LastKnownPushBunch could be invalid after this line
- LastKnownPushBunch->SetNextToken(currentPushBunch);
- }
- LastKnownPushBunch = currentPushBunch;
- LastKnownPushSlot = currentPushSlot;
- return oldSlot != LastKnownPushSlot;
- }
- bool SwitchToNextBunch() {
- for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
- auto next = ReadBunch->GetNextBunch();
- if (next != nullptr) {
- ReadBunch = next;
- return true;
- }
- SpinLockPause();
- }
- return false;
- }
- TSlot<TAux> StubbornPopAux() {
- for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
- auto result = ReadBunch->GetSlotAux(ReadSlot);
- if (Y_LIKELY(result.Msg != nullptr)) {
- ++ReadSlot;
- return result;
- }
- SpinLockPause();
- }
- return ReadBunch->BlockSlotAux(ReadSlot++, LastKnownPushBunch);
- }
- };
- struct TDefaultParams {
- static constexpr bool DeleteItems = true;
- using TAux = NObstructiveQueuePrivate::TEmptyAux;
- using TBunchBase = NObstructiveQueuePrivate::TEmpty;
- static constexpr ui32 BUNCH_SIZE = 251;
- };
- } //namespace NObstructiveQueuePrivate
- DeclareTuneValueParam(TObstructiveQueueBunchSize, ui32, BUNCH_SIZE);
- DeclareTuneValueParam(TObstructiveQueueDeleteItems, bool, DeleteItems);
- DeclareTuneTypeParam(TObstructiveQueueBunchBase, TBunchBase);
- DeclareTuneTypeParam(TObstructiveQueueAux, TAux);
+#pragma once
+ Semi-wait-free queue, multiple producers - one consumer. Strict order.
+ The queue algorithm is using concept of virtual infinite array.
+ A producer takes a number from a counter and atomicaly increments the counter.
+ The number taken is a number of a slot for the producer to put a new message
+ into infinite array.
+ Then producer constructs a virtual infinite array by bidirectional linked list
+ of blocks. Each block contains several slots.
+ There is a hint pointer which optimisticly points to the last block
+ of the list and never goes backward.
+ Consumer exploits the property of the hint pointer always going forward
+ to free old blocks eventually. Consumer periodically read the hint pointer
+ and the counter and thus deduce producers which potentially holds the pointer
+ to a block. Consumer can free the block if all that producers filled their
+ slots and left the queue.
+ No producer can stop the progress for other producers.
+ Consumer can obstruct a slot of a delayed producer by putting special mark.
+ Thus no producer can stop the progress for consumer.
+ But a slow producer may be forced to retry unlimited number of times.
+ Though it's very unlikely for a non-preempted producer to be obstructed.
+ That's why the algorithm is semi-wait-free.
+ WARNING: there is no wait&notify mechanic for consumer,
+ consumer receives nullptr if queue was empty.
+ WARNING: though the algorithm itself is lock-free
+ but producers and consumer could be blocked by memory allocator
+ WARNING: copy constructers of the queue are not thread-safe
+ */
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <util/system/atomic.h>
+#include <util/system/spinlock.h>
+#include "tune.h"
+namespace NThreading {
+ namespace NObstructiveQueuePrivate {
+ typedef void* TMsgLink;
+ struct TEmpty {
+ };
+ struct TEmptyAux {
+ TEmptyAux Retrieve() const {
+ return TEmptyAux();
+ }
+ void Store(TEmptyAux&) {
+ }
+ static constexpr TEmptyAux Zero() {
+ return TEmptyAux();
+ }
+ };
+ template <typename TAux>
+ struct TSlot {
+ TMsgLink volatile Msg;
+ TAux AuxiliaryData;
+ inline void Store(TAux& aux) {
+ AuxiliaryData.Store(aux);
+ }
+ inline TAux Retrieve() const {
+ return AuxiliaryData.Retrieve();
+ }
+ static TSlot<TAux> NullElem() {
+ return {nullptr, TAux::Zero()};
+ }
+ static TSlot<TAux> Pair(TMsgLink msg, TAux aux) {
+ return {msg, std::move(aux)};
+ }
+ };
+ template <>
+ struct TSlot<TEmptyAux> {
+ TMsgLink volatile Msg;
+ inline void Store(TEmptyAux&) {
+ }
+ inline TEmptyAux Retrieve() const {
+ return TEmptyAux();
+ }
+ static TSlot<TEmptyAux> NullElem() {
+ return {nullptr};
+ }
+ static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) {
+ return {msg};
+ }
+ };
+ enum TPushResult {
+ };
+ template <typename TAux, ui32 BUNCH_SIZE, typename TBase = TEmpty>
+ struct TMsgBunch: public TBase {
+ ui64 FirstSlot;
+ TSlot<TAux> LinkArray[BUNCH_SIZE];
+ TMsgBunch* volatile NextBunch;
+ TMsgBunch* volatile BackLink;
+ ui64 volatile Token;
+ TMsgBunch* volatile NextToken;
+ /* this push can return PUSH_RESULT_BLOCKED */
+ inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) {
+ if (Y_UNLIKELY(slot < FirstSlot)) {
+ }
+ if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) {
+ }
+ LinkArray[slot - FirstSlot].Store(auxiliary);
+ auto oldValue = AtomicSwap(&LinkArray[slot - FirstSlot].Msg, msg);
+ if (Y_LIKELY(oldValue == nullptr)) {
+ return PUSH_RESULT_OK;
+ } else {
+ LeaveBlocked(oldValue);
+ }
+ }
+ inline bool IsSlotHere(ui64 slot) {
+ return slot < FirstSlot + BUNCH_SIZE;
+ }
+ inline TMsgLink GetSlot(ui64 slot) const {
+ return AtomicGet(LinkArray[slot - FirstSlot].Msg);
+ }
+ inline TSlot<TAux> GetSlotAux(ui64 slot) const {
+ auto msg = GetSlot(slot);
+ auto aux = LinkArray[slot - FirstSlot].Retrieve();
+ return TSlot<TAux>::Pair(msg, aux);
+ }
+ void LeaveBlocked(ui64 slot) {
+ auto token = GetToken(slot);
+ token->DecrementToken();
+ }
+ void LeaveBlocked(TMsgLink msg) {
+ auto token = reinterpret_cast<TMsgBunch*>(msg);
+ token->DecrementToken();
+ }
+ TSlot<TAux> BlockSlotAux(ui64 slot, TMsgBunch* token) {
+ auto old =
+ AtomicSwap(&LinkArray[slot - FirstSlot].Msg, (TMsgLink)token);
+ if (old == nullptr) {
+ // It's valid to increment after AtomicCas
+ // because token will release data only after SetNextToken
+ token->IncrementToken();
+ return TSlot<TAux>::NullElem();
+ }
+ return TSlot<TAux>::Pair(old, LinkArray[slot - FirstSlot].Retrieve());
+ }
+ inline TMsgBunch* GetNextBunch() const {
+ return AtomicGet(NextBunch);
+ }
+ inline bool SetNextBunch(TMsgBunch* ptr) {
+ return AtomicCas(&NextBunch, ptr, nullptr);
+ }
+ inline TMsgBunch* GetBackLink() const {
+ return AtomicGet(BackLink);
+ }
+ inline TMsgBunch* GetToken(ui64 slot) {
+ return reinterpret_cast<TMsgBunch*>(LinkArray[slot - FirstSlot].Msg);
+ }
+ inline void IncrementToken() {
+ AtomicIncrement(Token);
+ }
+ // the object could be destroyed after this method
+ inline void DecrementToken() {
+ if (Y_UNLIKELY(AtomicDecrement(Token) == BUNCH_SIZE)) {
+ Release(this);
+ AtomicGet(NextToken)->DecrementToken();
+ // this could be invalid here
+ }
+ }
+ // the object could be destroyed after this method
+ inline void SetNextToken(TMsgBunch* next) {
+ AtomicSet(NextToken, next);
+ if (Y_UNLIKELY(AtomicAdd(Token, BUNCH_SIZE) == BUNCH_SIZE)) {
+ Release(this);
+ next->DecrementToken();
+ }
+ // this could be invalid here
+ }
+ TMsgBunch(ui64 start, TMsgBunch* backLink) {
+ AtomicSet(FirstSlot, start);
+ memset(&LinkArray, 0, sizeof(LinkArray));
+ AtomicSet(NextBunch, nullptr);
+ AtomicSet(BackLink, backLink);
+ AtomicSet(Token, 1);
+ AtomicSet(NextToken, nullptr);
+ }
+ static void Release(TMsgBunch* bunch) {
+ auto backLink = AtomicGet(bunch->BackLink);
+ if (backLink == nullptr) {
+ return;
+ }
+ AtomicSet(bunch->BackLink, nullptr);
+ do {
+ auto bbackLink = backLink->BackLink;
+ delete backLink;
+ backLink = bbackLink;
+ } while (backLink != nullptr);
+ }
+ void Destroy() {
+ for (auto tail = BackLink; tail != nullptr;) {
+ auto next = tail->BackLink;
+ delete tail;
+ tail = next;
+ }
+ for (auto next = this; next != nullptr;) {
+ auto nnext = next->NextBunch;
+ delete next;
+ next = nnext;
+ }
+ }
+ };
+ template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase = TEmpty>
+ class TWriteBucket {
+ public:
+ static const ui64 GROSS_SIZE;
+ using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>;
+ TWriteBucket(TBunch* bunch = new TBunch(0, nullptr))
+ : LastBunch(bunch)
+ , SlotCounter(0)
+ {
+ }
+ TWriteBucket(TWriteBucket&& move)
+ : LastBunch(move.LastBunch)
+ , SlotCounter(move.SlotCounter)
+ {
+ move.LastBunch = nullptr;
+ }
+ ~TWriteBucket() {
+ if (LastBunch != nullptr) {
+ LastBunch->Destroy();
+ }
+ }
+ inline bool Push(TMsgLink msg, TAux aux) {
+ ui64 pushSlot = AtomicGetAndIncrement(SlotCounter);
+ TBunch* hintBunch = GetLastBunch();
+ for (;;) {
+ auto hint = hintBunch->Push(msg, pushSlot, aux);
+ if (Y_LIKELY(hint == PUSH_RESULT_OK)) {
+ return true;
+ }
+ bool hhResult = HandleHint(hintBunch, hint);
+ if (Y_UNLIKELY(!hhResult)) {
+ return false;
+ }
+ }
+ }
+ protected:
+ template <typename, ui32, typename>
+ friend class TReadBucket;
+ TBunch* volatile LastBunch; // Hint
+ volatile ui64 SlotCounter;
+ inline TBunch* GetLastBunch() const {
+ return AtomicGet(LastBunch);
+ }
+ bool HandleHint(TBunch*& hintBunch, TPushResult hint) {
+ return false;
+ }
+ hintBunch = hintBunch->GetBackLink();
+ return true;
+ }
+ auto nextBunch = hintBunch->GetNextBunch();
+ if (nextBunch == nullptr) {
+ auto first = hintBunch->FirstSlot + BUNCH_SIZE;
+ nextBunch = new TBunch(first, hintBunch);
+ if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) {
+ delete nextBunch;
+ nextBunch = hintBunch->GetNextBunch();
+ }
+ }
+ // hintBunch could not be freed here so it cannot be reused
+ // it's alright if this CAS was not succeeded,
+ // it means that other thread did that recently
+ AtomicCas(&LastBunch, nextBunch, hintBunch);
+ hintBunch = nextBunch;
+ return true;
+ }
+ };
+ template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase>
+ class TReadBucket {
+ public:
+ static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 20;
+ using TWBucket = TWriteBucket<TAux, BUNCH_SIZE, TBunchBase>;
+ using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>;
+ TReadBucket(TWBucket* writer)
+ : Writer(writer)
+ , ReadBunch(writer->GetLastBunch())
+ , LastKnownPushBunch(writer->GetLastBunch())
+ {
+ ReadBunch->DecrementToken(); // no previous token
+ }
+ TReadBucket(TReadBucket toCopy, TWBucket* writer)
+ : TReadBucket(std::move(toCopy))
+ {
+ Writer = writer;
+ }
+ ui64 ReadyCount() const {
+ return AtomicGet(Writer->SlotCounter) - ReadSlot;
+ }
+ inline TMsgLink Pop() {
+ return PopAux().Msg;
+ }
+ inline TSlot<TAux> PopAux() {
+ for (;;) {
+ if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {
+ if (Y_LIKELY(!RereadPushSlot())) {
+ return TSlot<TAux>::NullElem();
+ }
+ }
+ if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {
+ if (Y_UNLIKELY(!SwitchToNextBunch())) {
+ return TSlot<TAux>::NullElem();
+ }
+ }
+ auto result = ReadBunch->GetSlotAux(ReadSlot);
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ ++ReadSlot;
+ return result;
+ }
+ if (ReadSlot + 1 == AtomicGet(Writer->SlotCounter)) {
+ return TSlot<TAux>::NullElem();
+ }
+ result = StubbornPopAux();
+ if (result.Msg != nullptr) {
+ return result;
+ }
+ }
+ }
+ private:
+ TWBucket* Writer;
+ TBunch* ReadBunch;
+ ui64 ReadSlot = 0;
+ TBunch* LastKnownPushBunch;
+ ui64 LastKnownPushSlot = 0;
+ // MUST BE: ReadSlot == LastKnownPushSlot
+ bool RereadPushSlot() {
+ auto oldSlot = LastKnownPushSlot;
+ auto currentPushBunch = Writer->GetLastBunch();
+ auto currentPushSlot = AtomicGet(Writer->SlotCounter);
+ if (currentPushBunch != LastKnownPushBunch) {
+ // LastKnownPushBunch could be invalid after this line
+ LastKnownPushBunch->SetNextToken(currentPushBunch);
+ }
+ LastKnownPushBunch = currentPushBunch;
+ LastKnownPushSlot = currentPushSlot;
+ return oldSlot != LastKnownPushSlot;
+ }
+ bool SwitchToNextBunch() {
+ for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
+ auto next = ReadBunch->GetNextBunch();
+ if (next != nullptr) {
+ ReadBunch = next;
+ return true;
+ }
+ SpinLockPause();
+ }
+ return false;
+ }
+ TSlot<TAux> StubbornPopAux() {
+ for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
+ auto result = ReadBunch->GetSlotAux(ReadSlot);
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ ++ReadSlot;
+ return result;
+ }
+ SpinLockPause();
+ }
+ return ReadBunch->BlockSlotAux(ReadSlot++, LastKnownPushBunch);
+ }
+ };
+ struct TDefaultParams {
+ static constexpr bool DeleteItems = true;
+ using TAux = NObstructiveQueuePrivate::TEmptyAux;
+ using TBunchBase = NObstructiveQueuePrivate::TEmpty;
+ static constexpr ui32 BUNCH_SIZE = 251;
+ };
+ } //namespace NObstructiveQueuePrivate
+ DeclareTuneValueParam(TObstructiveQueueBunchSize, ui32, BUNCH_SIZE);
+ DeclareTuneValueParam(TObstructiveQueueDeleteItems, bool, DeleteItems);
+ DeclareTuneTypeParam(TObstructiveQueueBunchBase, TBunchBase);
+ DeclareTuneTypeParam(TObstructiveQueueAux, TAux);
template <typename TItem = void, typename... TParams>
- class TObstructiveConsumerAuxQueue {
- private:
- using TTuned =
- TTune<NObstructiveQueuePrivate::TDefaultParams, TParams...>;
- using TAux = typename TTuned::TAux;
- using TSlot = NObstructiveQueuePrivate::TSlot<TAux>;
- using TMsgLink = NObstructiveQueuePrivate::TMsgLink;
- using TBunchBase = typename TTuned::TBunchBase;
- static constexpr bool DeleteItems = TTuned::DeleteItems;
- static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE;
- public:
- TObstructiveConsumerAuxQueue()
- : RBuckets(&WBucket)
- {
- }
- ~TObstructiveConsumerAuxQueue() {
- if (DeleteItems) {
- for (;;) {
- auto msg = Pop();
- if (msg == nullptr) {
- break;
- }
- TDelete::Destroy(msg);
- }
- }
- }
- void Push(TItem* msg) {
- while (!WBucket.Push(reinterpret_cast<TMsgLink>(msg), TAux())) {
- }
- }
- TItem* Pop() {
- return reinterpret_cast<TItem*>(RBuckets.Pop());
- }
- TSlot PopAux() {
- return RBuckets.PopAux();
- }
- private:
- NObstructiveQueuePrivate::TWriteBucket<TAux, BUNCH_SIZE, TBunchBase>
- WBucket;
- NObstructiveQueuePrivate::TReadBucket<TAux, BUNCH_SIZE, TBunchBase>
- RBuckets;
- };
- template <typename TItem = void, bool DeleteItems = true>
- class TObstructiveConsumerQueue
+ class TObstructiveConsumerAuxQueue {
+ private:
+ using TTuned =
+ TTune<NObstructiveQueuePrivate::TDefaultParams, TParams...>;
+ using TAux = typename TTuned::TAux;
+ using TSlot = NObstructiveQueuePrivate::TSlot<TAux>;
+ using TMsgLink = NObstructiveQueuePrivate::TMsgLink;
+ using TBunchBase = typename TTuned::TBunchBase;
+ static constexpr bool DeleteItems = TTuned::DeleteItems;
+ static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE;
+ public:
+ TObstructiveConsumerAuxQueue()
+ : RBuckets(&WBucket)
+ {
+ }
+ ~TObstructiveConsumerAuxQueue() {
+ if (DeleteItems) {
+ for (;;) {
+ auto msg = Pop();
+ if (msg == nullptr) {
+ break;
+ }
+ TDelete::Destroy(msg);
+ }
+ }
+ }
+ void Push(TItem* msg) {
+ while (!WBucket.Push(reinterpret_cast<TMsgLink>(msg), TAux())) {
+ }
+ }
+ TItem* Pop() {
+ return reinterpret_cast<TItem*>(RBuckets.Pop());
+ }
+ TSlot PopAux() {
+ return RBuckets.PopAux();
+ }
+ private:
+ NObstructiveQueuePrivate::TWriteBucket<TAux, BUNCH_SIZE, TBunchBase>
+ WBucket;
+ NObstructiveQueuePrivate::TReadBucket<TAux, BUNCH_SIZE, TBunchBase>
+ RBuckets;
+ };
+ template <typename TItem = void, bool DeleteItems = true>
+ class TObstructiveConsumerQueue
: public TObstructiveConsumerAuxQueue<TItem,
TObstructiveQueueDeleteItems<DeleteItems>> {
- };
+ };
diff --git a/library/cpp/threading/queue/queue_ut.cpp b/library/cpp/threading/queue/queue_ut.cpp
index 80eca147da..8b36437034 100644
--- a/library/cpp/threading/queue/queue_ut.cpp
+++ b/library/cpp/threading/queue/queue_ut.cpp
@@ -1,242 +1,242 @@
#include <library/cpp/testing/unittest/registar.h>
-#include <util/system/thread.h>
-#include "ut_helpers.h"
-typedef void* TMsgLink;
+#include <util/system/thread.h>
+#include "ut_helpers.h"
+typedef void* TMsgLink;
template <typename TQueueType>
-class TQueueTestProcs: public TTestBase {
+class TQueueTestProcs: public TTestBase {
- UNIT_TEST(Threads2_Push1M_Threads1_Pop2M)
- UNIT_TEST(Threads4_Push1M_Threads1_Pop4M)
- UNIT_TEST(Threads8_RndPush100K_Threads8_Queues)
+ UNIT_TEST(Threads2_Push1M_Threads1_Pop2M)
+ UNIT_TEST(Threads4_Push1M_Threads1_Pop4M)
+ UNIT_TEST(Threads8_RndPush100K_Threads8_Queues)
- UNIT_TEST(Threads24_RndPush100K_Threads24_Queues)
- UNIT_TEST(Threads24_RndPush100K_Threads8_Queues)
- UNIT_TEST(Threads24_RndPush100K_Threads4_Queues)
- void Push1M_Pop1M() {
+ UNIT_TEST(Threads24_RndPush100K_Threads24_Queues)
+ UNIT_TEST(Threads24_RndPush100K_Threads8_Queues)
+ UNIT_TEST(Threads24_RndPush100K_Threads4_Queues)
+ void Push1M_Pop1M() {
TQueueType queue;
- TMsgLink msg = &queue;
- auto pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- for (int i = 0; i < 1000000; ++i) {
- queue.Push((char*)msg + i);
- }
- for (int i = 0; i < 1000000; ++i) {
- auto popped = queue.Pop();
- UNIT_ASSERT_EQUAL((char*)msg + i, popped);
- }
- pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- }
- void Threads2_Push1M_Threads1_Pop2M() {
+ TMsgLink msg = &queue;
+ auto pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ for (int i = 0; i < 1000000; ++i) {
+ queue.Push((char*)msg + i);
+ }
+ for (int i = 0; i < 1000000; ++i) {
+ auto popped = queue.Pop();
+ UNIT_ASSERT_EQUAL((char*)msg + i, popped);
+ }
+ pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ }
+ void Threads2_Push1M_Threads1_Pop2M() {
TQueueType queue;
class TPusherThread: public ISimpleThread {
- public:
+ public:
TPusherThread(TQueueType& theQueue, char* start)
: Queue(theQueue)
- , Arg(start)
- {
- }
+ , Arg(start)
+ {
+ }
TQueueType& Queue;
- char* Arg;
- void* ThreadProc() override {
- for (int i = 0; i < 1000000; ++i) {
- Queue.Push(Arg + i);
- }
- return nullptr;
- }
- };
- TPusherThread pusher1(queue, (char*)&queue);
- TPusherThread pusher2(queue, (char*)&queue + 2000000);
- pusher1.Start();
- pusher2.Start();
- for (int i = 0; i < 2000000; ++i) {
- while (queue.Pop() == nullptr) {
- SpinLockPause();
- }
- }
- auto pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- }
- void Threads4_Push1M_Threads1_Pop4M() {
+ char* Arg;
+ void* ThreadProc() override {
+ for (int i = 0; i < 1000000; ++i) {
+ Queue.Push(Arg + i);
+ }
+ return nullptr;
+ }
+ };
+ TPusherThread pusher1(queue, (char*)&queue);
+ TPusherThread pusher2(queue, (char*)&queue + 2000000);
+ pusher1.Start();
+ pusher2.Start();
+ for (int i = 0; i < 2000000; ++i) {
+ while (queue.Pop() == nullptr) {
+ SpinLockPause();
+ }
+ }
+ auto pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ }
+ void Threads4_Push1M_Threads1_Pop4M() {
TQueueType queue;
class TPusherThread: public ISimpleThread {
- public:
+ public:
TPusherThread(TQueueType& theQueue, char* start)
: Queue(theQueue)
- , Arg(start)
- {
- }
+ , Arg(start)
+ {
+ }
TQueueType& Queue;
- char* Arg;
- void* ThreadProc() override {
- for (int i = 0; i < 1000000; ++i) {
- Queue.Push(Arg + i);
- }
- return nullptr;
- }
- };
- TPusherThread pusher1(queue, (char*)&queue);
- TPusherThread pusher2(queue, (char*)&queue + 2000000);
- TPusherThread pusher3(queue, (char*)&queue + 4000000);
- TPusherThread pusher4(queue, (char*)&queue + 6000000);
- pusher1.Start();
- pusher2.Start();
- pusher3.Start();
- pusher4.Start();
- for (int i = 0; i < 4000000; ++i) {
- while (queue.Pop() == nullptr) {
- SpinLockPause();
- }
- }
- auto pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- }
- template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES>
- void ManyRndPush100K_ManyQueues() {
+ char* Arg;
+ void* ThreadProc() override {
+ for (int i = 0; i < 1000000; ++i) {
+ Queue.Push(Arg + i);
+ }
+ return nullptr;
+ }
+ };
+ TPusherThread pusher1(queue, (char*)&queue);
+ TPusherThread pusher2(queue, (char*)&queue + 2000000);
+ TPusherThread pusher3(queue, (char*)&queue + 4000000);
+ TPusherThread pusher4(queue, (char*)&queue + 6000000);
+ pusher1.Start();
+ pusher2.Start();
+ pusher3.Start();
+ pusher4.Start();
+ for (int i = 0; i < 4000000; ++i) {
+ while (queue.Pop() == nullptr) {
+ SpinLockPause();
+ }
+ }
+ auto pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ }
+ template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES>
+ void ManyRndPush100K_ManyQueues() {
TQueueType queue[NUMBER_OF_QUEUES];
class TPusherThread: public ISimpleThread {
- public:
+ public:
TPusherThread(TQueueType* queues, char* start)
- : Queues(queues)
- , Arg(start)
- {
- }
+ : Queues(queues)
+ , Arg(start)
+ {
+ }
TQueueType* Queues;
- char* Arg;
- void* ThreadProc() override {
- ui64 counters[NUMBER_OF_QUEUES];
- for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
- counters[i] = 0;
- }
- for (int i = 0; i < 100000; ++i) {
- size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES;
- int cookie = counters[rnd]++;
- Queues[rnd].Push(Arg + cookie);
- }
- for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
- Queues[i].Push((void*)2ULL);
- }
- return nullptr;
- }
- };
+ char* Arg;
+ void* ThreadProc() override {
+ ui64 counters[NUMBER_OF_QUEUES];
+ for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
+ counters[i] = 0;
+ }
+ for (int i = 0; i < 100000; ++i) {
+ size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES;
+ int cookie = counters[rnd]++;
+ Queues[rnd].Push(Arg + cookie);
+ }
+ for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
+ Queues[i].Push((void*)2ULL);
+ }
+ return nullptr;
+ }
+ };
class TPopperThread: public ISimpleThread {
- public:
+ public:
TPopperThread(TQueueType* theQueue, char* base)
: Queue(theQueue)
- , Base(base)
- {
- }
+ , Base(base)
+ {
+ }
TQueueType* Queue;
- char* Base;
- void* ThreadProc() override {
- ui64 counters[NUMBER_OF_PUSHERS];
- for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
- counters[i] = 0;
- }
- for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) {
- auto msg = Queue->Pop();
- if (msg == nullptr) {
- SpinLockPause();
- continue;
- }
- if (msg == (void*)2ULL) {
- ++fin;
- continue;
- }
- ui64 shift = (char*)msg - Base;
- auto pusherNum = shift / 200000000ULL;
- auto msgNum = shift % 200000000ULL;
- UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum);
- ++counters[pusherNum];
- }
- auto pmsg = Queue->Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- return nullptr;
- }
- };
+ char* Base;
+ void* ThreadProc() override {
+ ui64 counters[NUMBER_OF_PUSHERS];
+ for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
+ counters[i] = 0;
+ }
+ for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) {
+ auto msg = Queue->Pop();
+ if (msg == nullptr) {
+ SpinLockPause();
+ continue;
+ }
+ if (msg == (void*)2ULL) {
+ ++fin;
+ continue;
+ }
+ ui64 shift = (char*)msg - Base;
+ auto pusherNum = shift / 200000000ULL;
+ auto msgNum = shift % 200000000ULL;
+ UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum);
+ ++counters[pusherNum];
+ }
+ auto pmsg = Queue->Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ return nullptr;
+ }
+ };
TVector<TAutoPtr<TPopperThread>> poppers;
TVector<TAutoPtr<TPusherThread>> pushers;
- for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
- poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue));
- poppers.back()->Start();
- }
- for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
- pushers.emplace_back(
- new TPusherThread(queue, (char*)&queue + 200000000ULL * i));
- pushers.back()->Start();
- }
- for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
- poppers[i]->Join();
- }
- for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
- pushers[i]->Join();
- }
- }
- void Threads8_RndPush100K_Threads8_Queues() {
- ManyRndPush100K_ManyQueues<8, 8>();
- }
- /*
- void Threads24_RndPush100K_Threads24_Queues() {
- ManyRndPush100K_ManyQueues<24, 24>();
- }
- void Threads24_RndPush100K_Threads8_Queues() {
- ManyRndPush100K_ManyQueues<24, 8>();
- }
- void Threads24_RndPush100K_Threads4_Queues() {
- ManyRndPush100K_ManyQueues<24, 4>();
- }
- */
+ for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
+ poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue));
+ poppers.back()->Start();
+ }
+ for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
+ pushers.emplace_back(
+ new TPusherThread(queue, (char*)&queue + 200000000ULL * i));
+ pushers.back()->Start();
+ }
+ for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
+ poppers[i]->Join();
+ }
+ for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
+ pushers[i]->Join();
+ }
+ }
+ void Threads8_RndPush100K_Threads8_Queues() {
+ ManyRndPush100K_ManyQueues<8, 8>();
+ }
+ /*
+ void Threads24_RndPush100K_Threads24_Queues() {
+ ManyRndPush100K_ManyQueues<24, 24>();
+ }
+ void Threads24_RndPush100K_Threads8_Queues() {
+ ManyRndPush100K_ManyQueues<24, 8>();
+ }
+ void Threads24_RndPush100K_Threads4_Queues() {
+ ManyRndPush100K_ManyQueues<24, 4>();
+ }
+ */
diff --git a/library/cpp/threading/queue/tune.h b/library/cpp/threading/queue/tune.h
index 50fc3dc17c..43ad5efe3e 100644
--- a/library/cpp/threading/queue/tune.h
+++ b/library/cpp/threading/queue/tune.h
@@ -1,101 +1,101 @@
-#pragma once
- Motivation: consider you have a template class with many parameters
- with default associations
- template <typename A = TDefA,
- typename B = TDefB,
- typename C = TDefC,
- typename D = TDefD>
- class TExample {
- };
- consider you would like to provide easy to use interface to tune all
- these parameters in position independed manner,
- In that case TTune would be helpful for you.
- How to use:
- First step: declare a struct with all default associations
- struct TDefaultTune {
- using TStructA = TDefA;
- using TStructB = TDefB;
- using TStructC = TDefC;
- using TStructD = TDefD;
- };
- Second step: declare helper names visible to a user
- DeclareTuneTypeParam(TTuneParamA, TStructA);
- DeclareTuneTypeParam(TTuneParamB, TStructB);
- DeclareTuneTypeParam(TTuneParamC, TStructC);
- DeclareTuneTypeParam(TTuneParamD, TStructD);
- Third step: declare TExample this way:
- template <typename...TParams>
- class TExample {
- using TMyParams = TTune<TDefaultTune, TParams...>;
- using TActualA = TMyParams::TStructA;
- using TActualB = TMyParams::TStructB;
- ...
- };
- TTune<TDefaultTune, TParams...> is a struct with the default parameteres
- taken from TDefaultTune and overridden from "TParams...".
- for example: "TTune<TDefaultTune, TTuneParamC<TUserClass>>"
- will be virtually the same as:
- struct TTunedClass {
- using TStructA = TDefA;
- using TStructB = TDefB;
- using TStructC = TUserClass;
- using TStructD = TDefD;
- };
- From now on you can tune your TExample in the following manner:
- using TCustomClass =
- TExample <TTuneParamA<TUserStruct1>, TTuneParamD<TUserStruct2>>;
- You can also tweak constant expressions in your TDefaultTune.
- Consider you have:
- struct TDefaultTune {
- static constexpr ui32 MySize = 42;
- };
- declare an interface to modify the parameter this way:
- DeclareTuneValueParam(TStructSize, ui32, MySize);
- and tweak your class:
- using TTwiceBigger = TExample<TStructSize<84>>;
- */
-#define DeclareTuneTypeParam(TParamName, InternalName) \
- template <typename TNewType> \
- struct TParamName { \
- template <typename TBase> \
- struct TApply: public TBase { \
- using InternalName = TNewType; \
- }; \
- }
-#define DeclareTuneValueParam(TParamName, TValueType, InternalName) \
- template <TValueType NewValue> \
- struct TParamName { \
- template <typename TBase> \
- struct TApply: public TBase { \
- static constexpr TValueType InternalName = NewValue; \
- }; \
- }
+#pragma once
+ Motivation: consider you have a template class with many parameters
+ with default associations
+ template <typename A = TDefA,
+ typename B = TDefB,
+ typename C = TDefC,
+ typename D = TDefD>
+ class TExample {
+ };
+ consider you would like to provide easy to use interface to tune all
+ these parameters in position independed manner,
+ In that case TTune would be helpful for you.
+ How to use:
+ First step: declare a struct with all default associations
+ struct TDefaultTune {
+ using TStructA = TDefA;
+ using TStructB = TDefB;
+ using TStructC = TDefC;
+ using TStructD = TDefD;
+ };
+ Second step: declare helper names visible to a user
+ DeclareTuneTypeParam(TTuneParamA, TStructA);
+ DeclareTuneTypeParam(TTuneParamB, TStructB);
+ DeclareTuneTypeParam(TTuneParamC, TStructC);
+ DeclareTuneTypeParam(TTuneParamD, TStructD);
+ Third step: declare TExample this way:
+ template <typename...TParams>
+ class TExample {
+ using TMyParams = TTune<TDefaultTune, TParams...>;
+ using TActualA = TMyParams::TStructA;
+ using TActualB = TMyParams::TStructB;
+ ...
+ };
+ TTune<TDefaultTune, TParams...> is a struct with the default parameteres
+ taken from TDefaultTune and overridden from "TParams...".
+ for example: "TTune<TDefaultTune, TTuneParamC<TUserClass>>"
+ will be virtually the same as:
+ struct TTunedClass {
+ using TStructA = TDefA;
+ using TStructB = TDefB;
+ using TStructC = TUserClass;
+ using TStructD = TDefD;
+ };
+ From now on you can tune your TExample in the following manner:
+ using TCustomClass =
+ TExample <TTuneParamA<TUserStruct1>, TTuneParamD<TUserStruct2>>;
+ You can also tweak constant expressions in your TDefaultTune.
+ Consider you have:
+ struct TDefaultTune {
+ static constexpr ui32 MySize = 42;
+ };
+ declare an interface to modify the parameter this way:
+ DeclareTuneValueParam(TStructSize, ui32, MySize);
+ and tweak your class:
+ using TTwiceBigger = TExample<TStructSize<84>>;
+ */
+#define DeclareTuneTypeParam(TParamName, InternalName) \
+ template <typename TNewType> \
+ struct TParamName { \
+ template <typename TBase> \
+ struct TApply: public TBase { \
+ using InternalName = TNewType; \
+ }; \
+ }
+#define DeclareTuneValueParam(TParamName, TValueType, InternalName) \
+ template <TValueType NewValue> \
+ struct TParamName { \
+ template <typename TBase> \
+ struct TApply: public TBase { \
+ static constexpr TValueType InternalName = NewValue; \
+ }; \
+ }
#define DeclareTuneContainer(TParamName, InternalName) \
template <template <typename, typename...> class TNewContainer> \
struct TParamName { \
@@ -104,22 +104,22 @@
template <typename TElem, typename... TRest> \
using InternalName = TNewContainer<TElem, TRest...>; \
}; \
- }
-namespace NTunePrivate {
- template <typename TBase, typename... TParams>
- struct TFold;
- template <typename TBase>
- struct TFold<TBase>: public TBase {
- };
- template <typename TBase, typename TFirstArg, typename... TRest>
- struct TFold<TBase, TFirstArg, TRest...>
- : public TFold<typename TFirstArg::template TApply<TBase>, TRest...> {
- };
-template <typename TDefault, typename... TParams>
-struct TTune: public NTunePrivate::TFold<TDefault, TParams...> {
+ }
+namespace NTunePrivate {
+ template <typename TBase, typename... TParams>
+ struct TFold;
+ template <typename TBase>
+ struct TFold<TBase>: public TBase {
+ };
+ template <typename TBase, typename TFirstArg, typename... TRest>
+ struct TFold<TBase, TFirstArg, TRest...>
+ : public TFold<typename TFirstArg::template TApply<TBase>, TRest...> {
+ };
+template <typename TDefault, typename... TParams>
+struct TTune: public NTunePrivate::TFold<TDefault, TParams...> {
diff --git a/library/cpp/threading/queue/tune_ut.cpp b/library/cpp/threading/queue/tune_ut.cpp
index 7e980d3e27..64bc8fd427 100644
--- a/library/cpp/threading/queue/tune_ut.cpp
+++ b/library/cpp/threading/queue/tune_ut.cpp
@@ -1,118 +1,118 @@
#include <library/cpp/testing/unittest/registar.h>
-#include "tune.h"
-struct TDefaultStructA {
-struct TDefaultStructB {
-struct TDefaults {
- using TStructA = TDefaultStructA;
- using TStructB = TDefaultStructB;
- static constexpr ui32 Param1 = 42;
- static constexpr ui32 Param2 = 42;
-DeclareTuneTypeParam(TweakStructA, TStructA);
-DeclareTuneTypeParam(TweakStructB, TStructB);
-DeclareTuneValueParam(TweakParam1, ui32, Param1);
-DeclareTuneValueParam(TweakParam2, ui32, Param2);
+#include "tune.h"
+struct TDefaultStructA {
+struct TDefaultStructB {
+struct TDefaults {
+ using TStructA = TDefaultStructA;
+ using TStructB = TDefaultStructB;
+ static constexpr ui32 Param1 = 42;
+ static constexpr ui32 Param2 = 42;
+DeclareTuneTypeParam(TweakStructA, TStructA);
+DeclareTuneTypeParam(TweakStructB, TStructB);
+DeclareTuneValueParam(TweakParam1, ui32, Param1);
+DeclareTuneValueParam(TweakParam2, ui32, Param2);
Y_UNIT_TEST(Defaults) {
- using TTuned = TTune<TDefaults>;
- using TunedA = TTuned::TStructA;
- using TunedB = TTuned::TStructB;
- auto sameA = std::is_same<TDefaultStructA, TunedA>::value;
- auto sameB = std::is_same<TDefaultStructB, TunedB>::value;
- auto param1 = TTuned::Param1;
- auto param2 = TTuned::Param2;
- UNIT_ASSERT_EQUAL(param1, 42);
- UNIT_ASSERT_EQUAL(param2, 42);
- }
+ using TTuned = TTune<TDefaults>;
+ using TunedA = TTuned::TStructA;
+ using TunedB = TTuned::TStructB;
+ auto sameA = std::is_same<TDefaultStructA, TunedA>::value;
+ auto sameB = std::is_same<TDefaultStructB, TunedB>::value;
+ auto param1 = TTuned::Param1;
+ auto param2 = TTuned::Param2;
+ UNIT_ASSERT_EQUAL(param1, 42);
+ UNIT_ASSERT_EQUAL(param2, 42);
+ }
Y_UNIT_TEST(TuneStructA) {
- struct TMyStruct {
- };
- using TTuned = TTune<TDefaults, TweakStructA<TMyStruct>>;
- using TunedA = TTuned::TStructA;
- using TunedB = TTuned::TStructB;
- //auto sameA = std::is_same<TDefaultStructA, TunedA>::value;
- auto sameB = std::is_same<TDefaultStructB, TunedB>::value;
- auto param1 = TTuned::Param1;
- auto param2 = TTuned::Param2;
- auto sameA = std::is_same<TMyStruct, TunedA>::value;
- UNIT_ASSERT_EQUAL(param1, 42);
- UNIT_ASSERT_EQUAL(param2, 42);
- }
+ struct TMyStruct {
+ };
+ using TTuned = TTune<TDefaults, TweakStructA<TMyStruct>>;
+ using TunedA = TTuned::TStructA;
+ using TunedB = TTuned::TStructB;
+ //auto sameA = std::is_same<TDefaultStructA, TunedA>::value;
+ auto sameB = std::is_same<TDefaultStructB, TunedB>::value;
+ auto param1 = TTuned::Param1;
+ auto param2 = TTuned::Param2;
+ auto sameA = std::is_same<TMyStruct, TunedA>::value;
+ UNIT_ASSERT_EQUAL(param1, 42);
+ UNIT_ASSERT_EQUAL(param2, 42);
+ }
Y_UNIT_TEST(TuneParam1) {
- using TTuned = TTune<TDefaults, TweakParam1<24>>;
- using TunedA = TTuned::TStructA;
- using TunedB = TTuned::TStructB;
- auto sameA = std::is_same<TDefaultStructA, TunedA>::value;
- auto sameB = std::is_same<TDefaultStructB, TunedB>::value;
- auto param1 = TTuned::Param1;
- auto param2 = TTuned::Param2;
- UNIT_ASSERT_EQUAL(param1, 24);
- UNIT_ASSERT_EQUAL(param2, 42);
- }
+ using TTuned = TTune<TDefaults, TweakParam1<24>>;
+ using TunedA = TTuned::TStructA;
+ using TunedB = TTuned::TStructB;
+ auto sameA = std::is_same<TDefaultStructA, TunedA>::value;
+ auto sameB = std::is_same<TDefaultStructB, TunedB>::value;
+ auto param1 = TTuned::Param1;
+ auto param2 = TTuned::Param2;
+ UNIT_ASSERT_EQUAL(param1, 24);
+ UNIT_ASSERT_EQUAL(param2, 42);
+ }
Y_UNIT_TEST(TuneStructAAndParam1) {
- struct TMyStruct {
- };
- using TTuned =
- TTune<TDefaults, TweakStructA<TMyStruct>, TweakParam1<24>>;
- using TunedA = TTuned::TStructA;
- using TunedB = TTuned::TStructB;
- //auto sameA = std::is_same<TDefaultStructA, TunedA>::value;
- auto sameB = std::is_same<TDefaultStructB, TunedB>::value;
- auto param1 = TTuned::Param1;
- auto param2 = TTuned::Param2;
- auto sameA = std::is_same<TMyStruct, TunedA>::value;
- UNIT_ASSERT_EQUAL(param1, 24);
- UNIT_ASSERT_EQUAL(param2, 42);
- }
+ struct TMyStruct {
+ };
+ using TTuned =
+ TTune<TDefaults, TweakStructA<TMyStruct>, TweakParam1<24>>;
+ using TunedA = TTuned::TStructA;
+ using TunedB = TTuned::TStructB;
+ //auto sameA = std::is_same<TDefaultStructA, TunedA>::value;
+ auto sameB = std::is_same<TDefaultStructB, TunedB>::value;
+ auto param1 = TTuned::Param1;
+ auto param2 = TTuned::Param2;
+ auto sameA = std::is_same<TMyStruct, TunedA>::value;
+ UNIT_ASSERT_EQUAL(param1, 24);
+ UNIT_ASSERT_EQUAL(param2, 42);
+ }
Y_UNIT_TEST(TuneParam1AndStructA) {
- struct TMyStruct {
- };
- using TTuned =
- TTune<TDefaults, TweakParam1<24>, TweakStructA<TMyStruct>>;
- using TunedA = TTuned::TStructA;
- using TunedB = TTuned::TStructB;
- //auto sameA = std::is_same<TDefaultStructA, TunedA>::value;
- auto sameB = std::is_same<TDefaultStructB, TunedB>::value;
- auto param1 = TTuned::Param1;
- auto param2 = TTuned::Param2;
- auto sameA = std::is_same<TMyStruct, TunedA>::value;
- UNIT_ASSERT_EQUAL(param1, 24);
- UNIT_ASSERT_EQUAL(param2, 42);
- }
+ struct TMyStruct {
+ };
+ using TTuned =
+ TTune<TDefaults, TweakParam1<24>, TweakStructA<TMyStruct>>;
+ using TunedA = TTuned::TStructA;
+ using TunedB = TTuned::TStructB;
+ //auto sameA = std::is_same<TDefaultStructA, TunedA>::value;
+ auto sameB = std::is_same<TDefaultStructB, TunedB>::value;
+ auto param1 = TTuned::Param1;
+ auto param2 = TTuned::Param2;
+ auto sameA = std::is_same<TMyStruct, TunedA>::value;
+ UNIT_ASSERT_EQUAL(param1, 24);
+ UNIT_ASSERT_EQUAL(param2, 42);
+ }
diff --git a/library/cpp/threading/queue/unordered_ut.cpp b/library/cpp/threading/queue/unordered_ut.cpp
index a43b7f520e..2018538bf7 100644
--- a/library/cpp/threading/queue/unordered_ut.cpp
+++ b/library/cpp/threading/queue/unordered_ut.cpp
@@ -1,154 +1,154 @@
#include <library/cpp/testing/unittest/registar.h>
-#include <util/system/thread.h>
-#include <algorithm>
-#include <util/generic/vector.h>
-#include <util/random/fast.h>
-#include "ut_helpers.h"
+#include <util/system/thread.h>
+#include <algorithm>
+#include <util/generic/vector.h>
+#include <util/random/fast.h>
+#include "ut_helpers.h"
template <typename TQueueType>
-class TTestUnorderedQueue: public TTestBase {
- using TLink = TIntrusiveLink;
+class TTestUnorderedQueue: public TTestBase {
+ using TLink = TIntrusiveLink;
- UNIT_TEST(Push1M_Pop1M_Unordered)
- void Push1M_Pop1M_Unordered() {
- constexpr int REPEAT = 1000000;
+ UNIT_TEST(Push1M_Pop1M_Unordered)
+ void Push1M_Pop1M_Unordered() {
+ constexpr int REPEAT = 1000000;
TQueueType queue;
- TLink msg[REPEAT];
- auto pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- for (int i = 0; i < REPEAT; ++i) {
- queue.Push(&msg[i]);
- }
+ TLink msg[REPEAT];
+ auto pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ for (int i = 0; i < REPEAT; ++i) {
+ queue.Push(&msg[i]);
+ }
TVector<TLink*> popped;
- popped.reserve(REPEAT);
- for (int i = 0; i < REPEAT; ++i) {
- popped.push_back((TLink*)queue.Pop());
- }
- pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- std::sort(popped.begin(), popped.end());
- for (int i = 0; i < REPEAT; ++i) {
- UNIT_ASSERT_VALUES_EQUAL(&msg[i], popped[i]);
- }
- }
+ popped.reserve(REPEAT);
+ for (int i = 0; i < REPEAT; ++i) {
+ popped.push_back((TLink*)queue.Pop());
+ }
+ pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ std::sort(popped.begin(), popped.end());
+ for (int i = 0; i < REPEAT; ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(&msg[i], popped[i]);
+ }
+ }
template <typename TQueueType>
-class TTestWeakQueue: public TTestBase {
+class TTestWeakQueue: public TTestBase {
- UNIT_TEST(Threads8_Rnd_Exchange)
- template <ui16 COUNT = 48, ui32 MSG_COUNT = 10000>
- void ManyThreadsRndExchange() {
+ UNIT_TEST(Threads8_Rnd_Exchange)
+ template <ui16 COUNT = 48, ui32 MSG_COUNT = 10000>
+ void ManyThreadsRndExchange() {
TQueueType queues[COUNT];
class TWorker: public ISimpleThread {
- public:
- TWorker(
+ public:
+ TWorker(
TQueueType* queues_,
ui16 mine,
TAtomic* pushDone)
- : Queues(queues_)
- , MineQueue(mine)
- , PushDone(pushDone)
- {
- }
+ : Queues(queues_)
+ , MineQueue(mine)
+ , PushDone(pushDone)
+ {
+ }
TQueueType* Queues;
- ui16 MineQueue;
+ ui16 MineQueue;
TVector<uintptr_t> Received;
- TAtomic* PushDone;
- void* ThreadProc() override {
- TReallyFastRng32 rng(GetCycleCount());
- Received.reserve(MSG_COUNT * 2);
- for (ui32 loop = 1; loop <= MSG_COUNT; ++loop) {
- for (;;) {
- auto msg = Queues[MineQueue].Pop();
- if (msg == nullptr) {
- break;
- }
- Received.push_back((uintptr_t)msg);
- }
- ui16 rnd = rng.GenRand64() % COUNT;
- ui64 msg = ((ui64)MineQueue << 32) + loop;
- while (!Queues[rnd].Push((void*)msg)) {
- }
- }
- AtomicIncrement(*PushDone);
- for (;;) {
- bool isItLast = AtomicGet(*PushDone) == COUNT;
- auto msg = Queues[MineQueue].Pop();
- if (msg != nullptr) {
- Received.push_back((uintptr_t)msg);
- } else {
- if (isItLast) {
- break;
- }
- SpinLockPause();
- }
- }
- for (ui64 last = 0;;) {
- auto msg = Queues[MineQueue].UnsafeScanningPop(&last);
- if (msg == nullptr) {
- break;
- }
- Received.push_back((uintptr_t)msg);
- }
- return nullptr;
- }
- };
+ TAtomic* PushDone;
+ void* ThreadProc() override {
+ TReallyFastRng32 rng(GetCycleCount());
+ Received.reserve(MSG_COUNT * 2);
+ for (ui32 loop = 1; loop <= MSG_COUNT; ++loop) {
+ for (;;) {
+ auto msg = Queues[MineQueue].Pop();
+ if (msg == nullptr) {
+ break;
+ }
+ Received.push_back((uintptr_t)msg);
+ }
+ ui16 rnd = rng.GenRand64() % COUNT;
+ ui64 msg = ((ui64)MineQueue << 32) + loop;
+ while (!Queues[rnd].Push((void*)msg)) {
+ }
+ }
+ AtomicIncrement(*PushDone);
+ for (;;) {
+ bool isItLast = AtomicGet(*PushDone) == COUNT;
+ auto msg = Queues[MineQueue].Pop();
+ if (msg != nullptr) {
+ Received.push_back((uintptr_t)msg);
+ } else {
+ if (isItLast) {
+ break;
+ }
+ SpinLockPause();
+ }
+ }
+ for (ui64 last = 0;;) {
+ auto msg = Queues[MineQueue].UnsafeScanningPop(&last);
+ if (msg == nullptr) {
+ break;
+ }
+ Received.push_back((uintptr_t)msg);
+ }
+ return nullptr;
+ }
+ };
TVector<TAutoPtr<TWorker>> workers;
- TAtomic pushDone = 0;
- for (ui32 i = 0; i < COUNT; ++i) {
- workers.emplace_back(new TWorker(&queues[0], i, &pushDone));
- workers.back()->Start();
- }
+ TAtomic pushDone = 0;
+ for (ui32 i = 0; i < COUNT; ++i) {
+ workers.emplace_back(new TWorker(&queues[0], i, &pushDone));
+ workers.back()->Start();
+ }
TVector<uintptr_t> all;
- for (ui32 i = 0; i < COUNT; ++i) {
- workers[i]->Join();
- all.insert(all.begin(),
+ for (ui32 i = 0; i < COUNT; ++i) {
+ workers[i]->Join();
+ all.insert(all.begin(),
workers[i]->Received.begin(), workers[i]->Received.end());
- }
- std::sort(all.begin(), all.end());
- auto iter = all.begin();
- for (ui32 i = 0; i < COUNT; ++i) {
- for (ui32 k = 1; k <= MSG_COUNT; ++k) {
- UNIT_ASSERT_VALUES_EQUAL(((ui64)i << 32) + k, *iter);
- ++iter;
- }
- }
- }
- void Threads8_Rnd_Exchange() {
- ManyThreadsRndExchange<8>();
- }
+ }
+ std::sort(all.begin(), all.end());
+ auto iter = all.begin();
+ for (ui32 i = 0; i < COUNT; ++i) {
+ for (ui32 k = 1; k <= MSG_COUNT; ++k) {
+ UNIT_ASSERT_VALUES_EQUAL(((ui64)i << 32) + k, *iter);
+ ++iter;
+ }
+ }
+ }
+ void Threads8_Rnd_Exchange() {
+ ManyThreadsRndExchange<8>();
+ }
diff --git a/library/cpp/threading/queue/ut/ya.make b/library/cpp/threading/queue/ut/ya.make
index 8883d9bf69..dda204155e 100644
--- a/library/cpp/threading/queue/ut/ya.make
+++ b/library/cpp/threading/queue/ut/ya.make
@@ -1,16 +1,16 @@
- basic_ut.cpp
- queue_ut.cpp
- tune_ut.cpp
- unordered_ut.cpp
- ut_helpers.cpp
- ut_helpers.h
+ basic_ut.cpp
+ queue_ut.cpp
+ tune_ut.cpp
+ unordered_ut.cpp
+ ut_helpers.cpp
+ ut_helpers.h
diff --git a/library/cpp/threading/queue/ut_helpers.cpp b/library/cpp/threading/queue/ut_helpers.cpp
index aa3a831441..342aa125a0 100644
--- a/library/cpp/threading/queue/ut_helpers.cpp
+++ b/library/cpp/threading/queue/ut_helpers.cpp
@@ -1 +1 @@
-#include "ut_helpers.h"
+#include "ut_helpers.h"
diff --git a/library/cpp/threading/queue/ut_helpers.h b/library/cpp/threading/queue/ut_helpers.h
index 2756b52601..c720366593 100644
--- a/library/cpp/threading/queue/ut_helpers.h
+++ b/library/cpp/threading/queue/ut_helpers.h
@@ -1,40 +1,40 @@
-#pragma once
-#include "mpsc_read_as_filled.h"
-#include "mpsc_htswap.h"
-#include "mpsc_vinfarr_obstructive.h"
-#include "mpsc_intrusive_unordered.h"
-#include "mpmc_unordered_ring.h"
-struct TBasicHTSwap: public NThreading::THTSwapQueue<> {
-struct TBasicReadAsFilled: public NThreading::TReadAsFilledQueue<> {
-struct TBasicObstructiveConsumer
+#pragma once
+#include "mpsc_read_as_filled.h"
+#include "mpsc_htswap.h"
+#include "mpsc_vinfarr_obstructive.h"
+#include "mpsc_intrusive_unordered.h"
+#include "mpmc_unordered_ring.h"
+struct TBasicHTSwap: public NThreading::THTSwapQueue<> {
+struct TBasicReadAsFilled: public NThreading::TReadAsFilledQueue<> {
+struct TBasicObstructiveConsumer
: public NThreading::TObstructiveConsumerQueue<> {
-struct TBasicMPSCIntrusiveUnordered
+struct TBasicMPSCIntrusiveUnordered
: public NThreading::TMPSCIntrusiveUnordered {
-struct TIntrusiveLink: public NThreading::TIntrusiveNode {
-struct TMPMCUnorderedRing: public NThreading::TMPMCUnorderedRing {
- TMPMCUnorderedRing()
- : NThreading::TMPMCUnorderedRing(10000000)
- {
- }
+struct TIntrusiveLink: public NThreading::TIntrusiveNode {
+struct TMPMCUnorderedRing: public NThreading::TMPMCUnorderedRing {
+ TMPMCUnorderedRing()
+ : NThreading::TMPMCUnorderedRing(10000000)
+ {
+ }
UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicReadAsFilled>); \
- UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicObstructiveConsumer>)
+ UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicObstructiveConsumer>)
- UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicMPSCIntrusiveUnordered>); \
+ UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicMPSCIntrusiveUnordered>); \
diff --git a/library/cpp/threading/queue/ya.make b/library/cpp/threading/queue/ya.make
index 6570b38ce5..3a11eb2d92 100644
--- a/library/cpp/threading/queue/ya.make
+++ b/library/cpp/threading/queue/ya.make
@@ -1,18 +1,18 @@
- mpmc_unordered_ring.cpp
- mpmc_unordered_ring.h
- mpsc_htswap.cpp
- mpsc_htswap.h
- mpsc_intrusive_unordered.cpp
- mpsc_intrusive_unordered.h
- mpsc_read_as_filled.cpp
- mpsc_read_as_filled.h
- mpsc_vinfarr_obstructive.cpp
- mpsc_vinfarr_obstructive.h
+ mpmc_unordered_ring.cpp
+ mpmc_unordered_ring.h
+ mpsc_htswap.cpp
+ mpsc_htswap.h
+ mpsc_intrusive_unordered.cpp
+ mpsc_intrusive_unordered.h
+ mpsc_read_as_filled.cpp
+ mpsc_read_as_filled.h
+ mpsc_vinfarr_obstructive.cpp
+ mpsc_vinfarr_obstructive.h