aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/queue
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/queue
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/queue')
-rw-r--r--library/cpp/threading/queue/basic_ut.cpp92
-rw-r--r--library/cpp/threading/queue/mpmc_unordered_ring.cpp74
-rw-r--r--library/cpp/threading/queue/mpmc_unordered_ring.h42
-rw-r--r--library/cpp/threading/queue/mpsc_htswap.cpp1
-rw-r--r--library/cpp/threading/queue/mpsc_htswap.h132
-rw-r--r--library/cpp/threading/queue/mpsc_intrusive_unordered.cpp79
-rw-r--r--library/cpp/threading/queue/mpsc_intrusive_unordered.h35
-rw-r--r--library/cpp/threading/queue/mpsc_read_as_filled.cpp1
-rw-r--r--library/cpp/threading/queue/mpsc_read_as_filled.h611
-rw-r--r--library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp1
-rw-r--r--library/cpp/threading/queue/mpsc_vinfarr_obstructive.h528
-rw-r--r--library/cpp/threading/queue/queue_ut.cpp242
-rw-r--r--library/cpp/threading/queue/tune.h125
-rw-r--r--library/cpp/threading/queue/tune_ut.cpp118
-rw-r--r--library/cpp/threading/queue/unordered_ut.cpp154
-rw-r--r--library/cpp/threading/queue/ut/ya.make16
-rw-r--r--library/cpp/threading/queue/ut_helpers.cpp1
-rw-r--r--library/cpp/threading/queue/ut_helpers.h40
-rw-r--r--library/cpp/threading/queue/ya.make18
19 files changed, 2310 insertions, 0 deletions
diff --git a/library/cpp/threading/queue/basic_ut.cpp b/library/cpp/threading/queue/basic_ut.cpp
new file mode 100644
index 0000000000..5f56f8583e
--- /dev/null
+++ b/library/cpp/threading/queue/basic_ut.cpp
@@ -0,0 +1,92 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/generic/vector.h>
+#include <util/system/thread.h>
+
+#include "ut_helpers.h"
+
+template <typename TQueueType>
+class TQueueTestsInSingleThread: public TTestBase {
+private:
+ using TSelf = TQueueTestsInSingleThread<TQueueType>;
+ using TLink = TIntrusiveLink;
+
+ UNIT_TEST_SUITE_DEMANGLE(TSelf);
+ UNIT_TEST(OnePushOnePop)
+ UNIT_TEST(OnePushOnePop_Repeat1M)
+ UNIT_TEST(Threads8_Repeat1M_Push1Pop1)
+ UNIT_TEST_SUITE_END();
+
+public:
+ void OnePushOnePop() {
+ TQueueType queue;
+
+ auto popped = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
+
+ TLink msg;
+ queue.Push(&msg);
+ popped = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(&msg, popped);
+
+ popped = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
+ };
+
+ void OnePushOnePop_Repeat1M() {
+ TQueueType queue;
+ TLink msg;
+
+ auto popped = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
+
+ for (int i = 0; i < 1000000; ++i) {
+ queue.Push(&msg);
+ popped = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(&msg, popped);
+
+ popped = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
+ }
+ }
+
+ template <size_t NUMBER_OF_THREADS>
+ void RepeatPush1Pop1_InManyThreads() {
+ class TCycleThread: public ISimpleThread {
+ public:
+ void* ThreadProc() override {
+ TQueueType queue;
+ TLink msg;
+ auto popped = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
+
+ for (size_t i = 0; i < 1000000; ++i) {
+ queue.Push(&msg);
+ popped = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(popped, &msg);
+
+ popped = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);
+ }
+ return nullptr;
+ }
+ };
+
+ TVector<TAutoPtr<TCycleThread>> cyclers;
+
+ for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) {
+ cyclers.emplace_back(new TCycleThread);
+ cyclers.back()->Start();
+ }
+
+ for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) {
+ cyclers[i]->Join();
+ }
+ }
+
+ void Threads8_Repeat1M_Push1Pop1() {
+ RepeatPush1Pop1_InManyThreads<8>();
+ }
+};
+
+REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestsInSingleThread);
+REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TQueueTestsInSingleThread)
diff --git a/library/cpp/threading/queue/mpmc_unordered_ring.cpp b/library/cpp/threading/queue/mpmc_unordered_ring.cpp
new file mode 100644
index 0000000000..160547f594
--- /dev/null
+++ b/library/cpp/threading/queue/mpmc_unordered_ring.cpp
@@ -0,0 +1,74 @@
+#include "mpmc_unordered_ring.h"
+
+namespace NThreading {
+ TMPMCUnorderedRing::TMPMCUnorderedRing(size_t size) {
+ Y_VERIFY(size > 0);
+ RingSize = size;
+ RingBuffer.Reset(new void*[size]);
+ memset(&RingBuffer[0], 0, sizeof(void*) * size);
+ }
+
+ bool TMPMCUnorderedRing::Push(void* msg, ui16 retryCount) noexcept {
+ if (retryCount == 0) {
+ StubbornPush(msg);
+ return true;
+ }
+ for (ui16 itry = retryCount; itry-- > 0;) {
+ if (WeakPush(msg)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ bool TMPMCUnorderedRing::WeakPush(void* msg) noexcept {
+ auto pawl = AtomicIncrement(WritePawl);
+ if (pawl - AtomicGet(ReadFront) >= RingSize) {
+ // Queue is full
+ AtomicDecrement(WritePawl);
+ return false;
+ }
+
+ auto writeSlot = AtomicGetAndIncrement(WriteFront);
+ if (AtomicCas(&RingBuffer[writeSlot % RingSize], msg, nullptr)) {
+ return true;
+ }
+ // slot is occupied for some reason, retry
+ return false;
+ }
+
+ void* TMPMCUnorderedRing::Pop() noexcept {
+ ui64 readSlot;
+
+ for (ui16 itry = MAX_POP_TRIES; itry-- > 0;) {
+ auto pawl = AtomicIncrement(ReadPawl);
+ if (pawl > AtomicGet(WriteFront)) {
+ // Queue is empty
+ AtomicDecrement(ReadPawl);
+ return nullptr;
+ }
+
+ readSlot = AtomicGetAndIncrement(ReadFront);
+
+ auto msg = AtomicSwap(&RingBuffer[readSlot % RingSize], nullptr);
+ if (msg != nullptr) {
+ return msg;
+ }
+ }
+
+ /* got no message in the slot, let's try to rollback readfront */
+ AtomicCas(&ReadFront, readSlot - 1, readSlot);
+ return nullptr;
+ }
+
+ void* TMPMCUnorderedRing::UnsafeScanningPop(ui64* last) noexcept {
+ for (; *last < RingSize;) {
+ auto msg = AtomicSwap(&RingBuffer[*last], nullptr);
+ ++*last;
+ if (msg != nullptr) {
+ return msg;
+ }
+ }
+ return nullptr;
+ }
+}
diff --git a/library/cpp/threading/queue/mpmc_unordered_ring.h b/library/cpp/threading/queue/mpmc_unordered_ring.h
new file mode 100644
index 0000000000..5042f7528e
--- /dev/null
+++ b/library/cpp/threading/queue/mpmc_unordered_ring.h
@@ -0,0 +1,42 @@
+#pragma once
+
+/*
+ It's not a general purpose queue.
+ No order guarantee, but it mostly ordered.
+ Items may stuck in almost empty queue.
+ Use UnsafeScanningPop to pop all stuck items.
+ Almost wait-free for producers and consumers.
+ */
+
+#include <util/system/atomic.h>
+#include <util/generic/ptr.h>
+
+namespace NThreading {
+ struct TMPMCUnorderedRing {
+ public:
+ static constexpr ui16 MAX_PUSH_TRIES = 4;
+ static constexpr ui16 MAX_POP_TRIES = 4;
+
+ TMPMCUnorderedRing(size_t size);
+
+ bool Push(void* msg, ui16 retryCount = MAX_PUSH_TRIES) noexcept;
+ void StubbornPush(void* msg) {
+ while (!WeakPush(msg)) {
+ }
+ }
+
+ void* Pop() noexcept;
+
+ void* UnsafeScanningPop(ui64* last) noexcept;
+
+ private:
+ bool WeakPush(void* msg) noexcept;
+
+ size_t RingSize;
+ TArrayPtr<void*> RingBuffer;
+ ui64 WritePawl = 0;
+ ui64 WriteFront = 0;
+ ui64 ReadPawl = 0;
+ ui64 ReadFront = 0;
+ };
+}
diff --git a/library/cpp/threading/queue/mpsc_htswap.cpp b/library/cpp/threading/queue/mpsc_htswap.cpp
new file mode 100644
index 0000000000..610c8f67f1
--- /dev/null
+++ b/library/cpp/threading/queue/mpsc_htswap.cpp
@@ -0,0 +1 @@
+#include "mpsc_htswap.h"
diff --git a/library/cpp/threading/queue/mpsc_htswap.h b/library/cpp/threading/queue/mpsc_htswap.h
new file mode 100644
index 0000000000..c42caa7ac0
--- /dev/null
+++ b/library/cpp/threading/queue/mpsc_htswap.h
@@ -0,0 +1,132 @@
+#pragma once
+
+/*
+ http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
+
+ Simple semi-wait-free queue. Many producers - one consumer.
+ Tracking of allocated memory is not required.
+ No CAS. Only atomic swap (exchange) operations.
+
+ WARNING: a sleeping producer can stop progress for consumer.
+
+ WARNING: there is no wait&notify mechanic for consumer,
+ consumer receives nullptr if queue was empty.
+
+ WARNING: the algorithm itself is lock-free
+ but producers and consumer could be blocked by memory allocator
+
+ Reference design: rtmapreduce/libs/threading/lfqueue.h
+ */
+
+#include <util/generic/noncopyable.h>
+#include <util/system/types.h>
+#include <util/system/atomic.h>
+
+#include "tune.h"
+
+namespace NThreading {
+ namespace NHTSwapPrivate {
+ template <typename T, typename TTuneup>
+ struct TNode
+ : public TTuneup::TNodeBase,
+ public TTuneup::template TNodeLayout<TNode<T, TTuneup>, T> {
+ TNode(const T& item) {
+ this->Next = nullptr;
+ this->Item = item;
+ }
+
+ TNode(T&& item) {
+ this->Next = nullptr;
+ this->Item = std::move(item);
+ }
+ };
+
+ struct TDefaultTuneup {
+ struct TNodeBase: private TNonCopyable {
+ };
+
+ template <typename TNode, typename T>
+ struct TNodeLayout {
+ TNode* Next;
+ T Item;
+ };
+
+ template <typename TNode>
+ struct TQueueLayout {
+ TNode* Head;
+ TNode* Tail;
+ };
+ };
+
+ template <typename T, typename TTuneup>
+ class THTSwapQueueImpl
+ : protected TTuneup::template TQueueLayout<TNode<T, TTuneup>> {
+ protected:
+ using TTunedNode = TNode<T, TTuneup>;
+
+ public:
+ using TItem = T;
+
+ THTSwapQueueImpl() {
+ this->Head = new TTunedNode(T());
+ this->Tail = this->Head;
+ }
+
+ ~THTSwapQueueImpl() {
+ TTunedNode* node = this->Head;
+ while (node != nullptr) {
+ TTunedNode* next = node->Next;
+ delete node;
+ node = next;
+ }
+ }
+
+ template <typename TT>
+ void Push(TT&& item) {
+ Enqueue(new TTunedNode(std::forward<TT>(item)));
+ }
+
+ T Peek() {
+ TTunedNode* next = AtomicGet(this->Head->Next);
+ if (next == nullptr) {
+ return T();
+ }
+ return next->Item;
+ }
+
+ void Enqueue(TTunedNode* node) {
+ // our goal is to avoid expensive CAS here,
+ // but now consumer will be blocked until new tail linked.
+ // fortunately 'window of inconsistency' is extremely small.
+ TTunedNode* prev = AtomicSwap(&this->Tail, node);
+ AtomicSet(prev->Next, node);
+ }
+
+ T Pop() {
+ TTunedNode* next = AtomicGet(this->Head->Next);
+ if (next == nullptr) {
+ return nullptr;
+ }
+ auto item = std::move(next->Item);
+ std::swap(this->Head, next); // no need atomic here
+ delete next;
+ return item;
+ }
+
+ bool IsEmpty() const {
+ TTunedNode* next = AtomicGet(this->Head->Next);
+ return (next == nullptr);
+ }
+ };
+ }
+
+ DeclareTuneTypeParam(THTSwapNodeBase, TNodeBase);
+ DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout);
+ DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout);
+
+ template <typename T = void*, typename... TParams>
+ class THTSwapQueue
+ : public NHTSwapPrivate::THTSwapQueueImpl<T,
+ TTune<NHTSwapPrivate::TDefaultTuneup, TParams...>> {
+ };
+}
diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp
new file mode 100644
index 0000000000..3bb1a04f7e
--- /dev/null
+++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp
@@ -0,0 +1,79 @@
+#include "mpsc_intrusive_unordered.h"
+#include <util/system/atomic.h>
+
+namespace NThreading {
+ void TMPSCIntrusiveUnordered::Push(TIntrusiveNode* node) noexcept {
+ auto head = AtomicGet(HeadForCaS);
+ for (ui32 i = NUMBER_OF_TRIES_FOR_CAS; i-- > 0;) {
+ // no ABA here, because Next is exactly head
+ // it does not matter how many travels head was made/
+ node->Next = head;
+ auto prev = AtomicGetAndCas(&HeadForCaS, node, head);
+ if (head == prev) {
+ return;
+ }
+ head = prev;
+ }
+ // boring of trying to do cas, let's just swap
+
+ // no need for atomic here, because the next is atomic swap
+ node->Next = 0;
+
+ head = AtomicSwap(&HeadForSwap, node);
+ if (head != nullptr) {
+ AtomicSet(node->Next, head);
+ } else {
+ // consumer must know if no other thread may access the memory,
+ // setting Next to node is a way to notify consumer
+ AtomicSet(node->Next, node);
+ }
+ }
+
+ TIntrusiveNode* TMPSCIntrusiveUnordered::PopMany() noexcept {
+ if (NotReadyChain == nullptr) {
+ auto head = AtomicSwap(&HeadForSwap, nullptr);
+ NotReadyChain = head;
+ }
+
+ if (NotReadyChain != nullptr) {
+ auto next = AtomicGet(NotReadyChain->Next);
+ if (next != nullptr) {
+ auto ready = NotReadyChain;
+ TIntrusiveNode* cut;
+ do {
+ cut = NotReadyChain;
+ NotReadyChain = next;
+ next = AtomicGet(NotReadyChain->Next);
+ if (next == NotReadyChain) {
+ cut = NotReadyChain;
+ NotReadyChain = nullptr;
+ break;
+ }
+ } while (next != nullptr);
+ cut->Next = nullptr;
+ return ready;
+ }
+ }
+
+ if (AtomicGet(HeadForCaS) != nullptr) {
+ return AtomicSwap(&HeadForCaS, nullptr);
+ }
+ return nullptr;
+ }
+
+ TIntrusiveNode* TMPSCIntrusiveUnordered::Pop() noexcept {
+ if (PopOneQueue != nullptr) {
+ auto head = PopOneQueue;
+ PopOneQueue = PopOneQueue->Next;
+ return head;
+ }
+
+ PopOneQueue = PopMany();
+ if (PopOneQueue != nullptr) {
+ auto head = PopOneQueue;
+ PopOneQueue = PopOneQueue->Next;
+ return head;
+ }
+ return nullptr;
+ }
+}
diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.h b/library/cpp/threading/queue/mpsc_intrusive_unordered.h
new file mode 100644
index 0000000000..6ac7537ae9
--- /dev/null
+++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.h
@@ -0,0 +1,35 @@
+#pragma once
+
+/*
+ Simple almost-wait-free unordered queue for low contention operations.
+
+ It's wait-free for producers.
+ Hanging producer can hide some items from consumer.
+ */
+
+#include <util/system/types.h>
+
+namespace NThreading {
+ struct TIntrusiveNode {
+ TIntrusiveNode* Next;
+ };
+
+ class TMPSCIntrusiveUnordered {
+ public:
+ static constexpr ui32 NUMBER_OF_TRIES_FOR_CAS = 3;
+
+ void Push(TIntrusiveNode* node) noexcept;
+ TIntrusiveNode* PopMany() noexcept;
+ TIntrusiveNode* Pop() noexcept;
+
+ void Push(void* node) noexcept {
+ Push(reinterpret_cast<TIntrusiveNode*>(node));
+ }
+
+ private:
+ TIntrusiveNode* HeadForCaS = nullptr;
+ TIntrusiveNode* HeadForSwap = nullptr;
+ TIntrusiveNode* NotReadyChain = nullptr;
+ TIntrusiveNode* PopOneQueue = nullptr;
+ };
+}
diff --git a/library/cpp/threading/queue/mpsc_read_as_filled.cpp b/library/cpp/threading/queue/mpsc_read_as_filled.cpp
new file mode 100644
index 0000000000..8b4664a6f3
--- /dev/null
+++ b/library/cpp/threading/queue/mpsc_read_as_filled.cpp
@@ -0,0 +1 @@
+#include "mpsc_read_as_filled.h"
diff --git a/library/cpp/threading/queue/mpsc_read_as_filled.h b/library/cpp/threading/queue/mpsc_read_as_filled.h
new file mode 100644
index 0000000000..be33ba5a58
--- /dev/null
+++ b/library/cpp/threading/queue/mpsc_read_as_filled.h
@@ -0,0 +1,611 @@
+#pragma once
+
+/*
+ Completely wait-free queue, multiple producers - one consumer. Strict order.
+ The queue algorithm is using concept of virtual infinite array.
+
+ A producer takes a number from a counter and atomically increments the counter.
+ The number taken is a number of a slot for the producer to put a new message
+ into infinite array.
+
+ Then producer constructs a virtual infinite array by bidirectional linked list
+ of blocks. Each block contains several slots.
+
+ There is a hint pointer which optimistically points to the last block
+ of the list and never goes backward.
+
+ Consumer exploits the property of the hint pointer always going forward
+ to free old blocks eventually. Consumer periodically read the hint pointer
+ and the counter and thus deduce producers which potentially holds the pointer
+ to a block. Consumer can free the block if all that producers filled their
+ slots and left the queue.
+
+ No producer can stop the progress for other producers.
+
+ Consumer can't stop the progress for producers.
+ Consumer can skip not-yet-filled slots and read them later.
+ Thus no producer can stop the progress for consumer.
+ The algorithm is virtually strictly ordered because it skips slots only
+ if it is really does not matter in which order the slots were produced and
+ consumed.
+
+ WARNING: there is no wait&notify mechanic for consumer,
+ consumer receives nullptr if queue was empty.
+
+ WARNING: though the algorithm itself is completely wait-free
+ but producers and consumer could be blocked by memory allocator
+
+ WARNING: copy constructors of the queue are not thread-safe
+ */
+
+#include <util/generic/deque.h>
+#include <util/generic/ptr.h>
+#include <util/system/atomic.h>
+#include <util/system/spinlock.h>
+
+#include "tune.h"
+
+namespace NThreading {
+ namespace NReadAsFilledPrivate {
+ typedef void* TMsgLink;
+
+ static constexpr ui32 DEFAULT_BUNCH_SIZE = 251;
+
+ struct TEmpty {
+ };
+
+ struct TEmptyAux {
+ TEmptyAux Retrieve() const {
+ return TEmptyAux();
+ }
+
+ void Store(TEmptyAux&) {
+ }
+
+ static constexpr TEmptyAux Zero() {
+ return TEmptyAux();
+ }
+ };
+
+ template <typename TAux>
+ struct TSlot {
+ TMsgLink volatile Msg;
+ TAux AuxiliaryData;
+
+ inline void Store(TAux& aux) {
+ AuxiliaryData.Store(aux);
+ }
+
+ inline TAux Retrieve() const {
+ return AuxiliaryData.Retrieve();
+ }
+
+ static TSlot<TAux> NullElem() {
+ return {nullptr, TAux::Zero()};
+ }
+
+ static TSlot<TAux> Pair(TMsgLink msg, TAux aux) {
+ return {msg, std::move(aux)};
+ }
+ };
+
+ template <>
+ struct TSlot<TEmptyAux> {
+ TMsgLink volatile Msg;
+
+ inline void Store(TEmptyAux&) {
+ }
+
+ inline TEmptyAux Retrieve() const {
+ return TEmptyAux();
+ }
+
+ static TSlot<TEmptyAux> NullElem() {
+ return {nullptr};
+ }
+
+ static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) {
+ return {msg};
+ }
+ };
+
+ enum TPushResult {
+ PUSH_RESULT_OK,
+ PUSH_RESULT_BACKWARD,
+ PUSH_RESULT_FORWARD,
+ };
+
+ template <ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE,
+ typename TBase = TEmpty,
+ typename TAux = TEmptyAux>
+ struct TMsgBunch: public TBase {
+ static constexpr size_t RELEASE_SIZE = BUNCH_SIZE * 2;
+
+ ui64 FirstSlot;
+
+ TSlot<TAux> LinkArray[BUNCH_SIZE];
+
+ TMsgBunch* volatile NextBunch;
+ TMsgBunch* volatile BackLink;
+
+ ui64 volatile Token;
+ TMsgBunch* volatile NextToken;
+
+ /* this push can return PUSH_RESULT_BLOCKED */
+ inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) {
+ if (Y_UNLIKELY(slot < FirstSlot)) {
+ return PUSH_RESULT_BACKWARD;
+ }
+
+ if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) {
+ return PUSH_RESULT_FORWARD;
+ }
+
+ LinkArray[slot - FirstSlot].Store(auxiliary);
+
+ AtomicSet(LinkArray[slot - FirstSlot].Msg, msg);
+ return PUSH_RESULT_OK;
+ }
+
+ inline bool IsSlotHere(ui64 slot) {
+ return slot < FirstSlot + BUNCH_SIZE;
+ }
+
+ inline TMsgLink GetSlot(ui64 slot) const {
+ return AtomicGet(LinkArray[slot - FirstSlot].Msg);
+ }
+
+ inline TSlot<TAux> GetSlotAux(ui64 slot) const {
+ auto msg = GetSlot(slot);
+ auto aux = LinkArray[slot - FirstSlot].Retrieve();
+ return TSlot<TAux>::Pair(msg, aux);
+ }
+
+ inline TMsgBunch* GetNextBunch() const {
+ return AtomicGet(NextBunch);
+ }
+
+ inline bool SetNextBunch(TMsgBunch* ptr) {
+ return AtomicCas(&NextBunch, ptr, nullptr);
+ }
+
+ inline TMsgBunch* GetBackLink() const {
+ return AtomicGet(BackLink);
+ }
+
+ inline TMsgBunch* GetToken(ui64 slot) {
+ return reinterpret_cast<TMsgBunch*>(
+ LinkArray[slot - FirstSlot].Msg);
+ }
+
+ inline void IncrementToken() {
+ AtomicIncrement(Token);
+ }
+
+ // the object could be destroyed after this method
+ inline void DecrementToken() {
+ if (Y_UNLIKELY(AtomicDecrement(Token) == RELEASE_SIZE)) {
+ Release(this);
+ AtomicGet(NextToken)->DecrementToken();
+ // this could be invalid here
+ }
+ }
+
+ // the object could be destroyed after this method
+ inline void SetNextToken(TMsgBunch* next) {
+ AtomicSet(NextToken, next);
+ if (Y_UNLIKELY(AtomicAdd(Token, RELEASE_SIZE) == RELEASE_SIZE)) {
+ Release(this);
+ next->DecrementToken();
+ }
+ // this could be invalid here
+ }
+
+ TMsgBunch(ui64 start, TMsgBunch* backLink) {
+ AtomicSet(FirstSlot, start);
+ memset(&LinkArray, 0, sizeof(LinkArray));
+ AtomicSet(NextBunch, nullptr);
+ AtomicSet(BackLink, backLink);
+
+ AtomicSet(Token, 1);
+ AtomicSet(NextToken, nullptr);
+ }
+
+ static void Release(TMsgBunch* block) {
+ auto backLink = AtomicGet(block->BackLink);
+ if (backLink == nullptr) {
+ return;
+ }
+ AtomicSet(block->BackLink, nullptr);
+
+ do {
+ auto bbackLink = backLink->BackLink;
+ delete backLink;
+ backLink = bbackLink;
+ } while (backLink != nullptr);
+ }
+
+ void Destroy() {
+ for (auto tail = BackLink; tail != nullptr;) {
+ auto next = tail->BackLink;
+ delete tail;
+ tail = next;
+ }
+
+ for (auto next = this; next != nullptr;) {
+ auto nnext = next->NextBunch;
+ delete next;
+ next = nnext;
+ }
+ }
+ };
+
+ template <ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE,
+ typename TBunchBase = NReadAsFilledPrivate::TEmpty,
+ typename TAux = TEmptyAux>
+ class TWriteBucket {
+ public:
+ using TUsingAux = TAux; // for TReadBucket binding
+ using TBunch = TMsgBunch<BUNCH_SIZE, TBunchBase, TAux>;
+
+ TWriteBucket(TBunch* bunch = new TBunch(0, nullptr)) {
+ AtomicSet(LastBunch, bunch);
+ AtomicSet(SlotCounter, 0);
+ }
+
+ TWriteBucket(TWriteBucket&& move)
+ : LastBunch(move.LastBunch)
+ , SlotCounter(move.SlotCounter)
+ {
+ move.LastBunch = nullptr;
+ }
+
+ ~TWriteBucket() {
+ if (LastBunch != nullptr) {
+ LastBunch->Destroy();
+ }
+ }
+
+ inline void Push(TMsgLink msg, TAux aux) {
+ ui64 pushSlot = AtomicGetAndIncrement(SlotCounter);
+ TBunch* hintBunch = GetLastBunch();
+
+ for (;;) {
+ auto hint = hintBunch->Push(msg, pushSlot, aux);
+ if (Y_LIKELY(hint == PUSH_RESULT_OK)) {
+ return;
+ }
+ HandleHint(hintBunch, hint);
+ }
+ }
+
+ protected:
+ template <typename, template <typename, typename...> class>
+ friend class TReadBucket;
+
+ TBunch* volatile LastBunch; // Hint
+ volatile ui64 SlotCounter;
+
+ inline TBunch* GetLastBunch() const {
+ return AtomicGet(LastBunch);
+ }
+
+ void HandleHint(TBunch*& hintBunch, TPushResult hint) {
+ if (Y_UNLIKELY(hint == PUSH_RESULT_BACKWARD)) {
+ hintBunch = hintBunch->GetBackLink();
+ return;
+ }
+
+ // PUSH_RESULT_FORWARD
+ auto nextBunch = hintBunch->GetNextBunch();
+
+ if (nextBunch == nullptr) {
+ auto first = hintBunch->FirstSlot + BUNCH_SIZE;
+ nextBunch = new TBunch(first, hintBunch);
+ if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) {
+ delete nextBunch;
+ nextBunch = hintBunch->GetNextBunch();
+ }
+ }
+
+ // hintBunch could not be freed here so it cannot be reused
+ // it's alright if this CAS was not succeeded,
+ // it means that other thread did that recently
+ AtomicCas(&LastBunch, nextBunch, hintBunch);
+
+ hintBunch = nextBunch;
+ }
+ };
+
+ template <typename TWBucket = TWriteBucket<>,
+ template <typename, typename...> class TContainer = TDeque>
+ class TReadBucket {
+ public:
+ using TAux = typename TWBucket::TUsingAux;
+ using TBunch = typename TWBucket::TBunch;
+
+ static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 5;
+
+ TReadBucket(TWBucket* writer)
+ : Writer(writer)
+ , ReadBunch(writer->GetLastBunch())
+ , LastKnownPushBunch(writer->GetLastBunch())
+ {
+ ReadBunch->DecrementToken(); // no previous token
+ }
+
+ TReadBucket(TReadBucket toCopy, TWBucket* writer)
+ : TReadBucket(std::move(toCopy))
+ {
+ Writer = writer;
+ }
+
+ ui64 ReadyCount() const {
+ return AtomicGet(Writer->SlotCounter) - ReadSlot;
+ }
+
+ TMsgLink Pop() {
+ return PopAux().Msg;
+ }
+
+ TMsgLink Peek() {
+ return PeekAux().Msg;
+ }
+
+ TSlot<TAux> PopAux() {
+ for (;;) {
+ if (Y_UNLIKELY(ReadNow.size() != 0)) {
+ auto result = PopSkipped();
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ return result;
+ }
+ }
+
+ if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {
+ if (Y_LIKELY(!RereadPushSlot())) {
+ return TSlot<TAux>::NullElem();
+ }
+ continue;
+ }
+
+ if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {
+ if (Y_UNLIKELY(!SwitchToNextBunch())) {
+ return TSlot<TAux>::NullElem();
+ }
+ }
+
+ auto result = ReadBunch->GetSlotAux(ReadSlot);
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ ++ReadSlot;
+ return result;
+ }
+
+ result = StubbornPop();
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ return result;
+ }
+ }
+ }
+
+ TSlot<TAux> PeekAux() {
+ for (;;) {
+ if (Y_UNLIKELY(ReadNow.size() != 0)) {
+ auto result = PeekSkipped();
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ return result;
+ }
+ }
+
+ if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {
+ if (Y_LIKELY(!RereadPushSlot())) {
+ return TSlot<TAux>::NullElem();
+ }
+ continue;
+ }
+
+ if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {
+ if (Y_UNLIKELY(!SwitchToNextBunch())) {
+ return TSlot<TAux>::NullElem();
+ }
+ }
+
+ auto result = ReadBunch->GetSlotAux(ReadSlot);
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ return result;
+ }
+
+ result = StubbornPeek();
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ return result;
+ }
+ }
+ }
+
+ private:
+ TWBucket* Writer;
+ TBunch* ReadBunch;
+ ui64 ReadSlot = 0;
+ TBunch* LastKnownPushBunch;
+ ui64 LastKnownPushSlot = 0;
+
+ struct TSkipItem {
+ TBunch* Bunch;
+ ui64 Slot;
+ TBunch* Token;
+ };
+
+ TContainer<TSkipItem> ReadNow;
+ TContainer<TSkipItem> ReadLater;
+
+ void AddToReadLater() {
+ ReadLater.push_back({ReadBunch, ReadSlot, LastKnownPushBunch});
+ LastKnownPushBunch->IncrementToken();
+ ++ReadSlot;
+ }
+
+ // MUST BE: ReadSlot == LastKnownPushSlot
+ bool RereadPushSlot() {
+ ReadNow = std::move(ReadLater);
+ ReadLater.clear();
+
+ auto oldSlot = LastKnownPushSlot;
+
+ auto currentPushBunch = Writer->GetLastBunch();
+ auto currentPushSlot = AtomicGet(Writer->SlotCounter);
+
+ if (currentPushBunch != LastKnownPushBunch) {
+ // LastKnownPushBunch could be invalid after this line
+ LastKnownPushBunch->SetNextToken(currentPushBunch);
+ }
+
+ LastKnownPushBunch = currentPushBunch;
+ LastKnownPushSlot = currentPushSlot;
+
+ return oldSlot != LastKnownPushSlot;
+ }
+
+ bool SwitchToNextBunch() {
+ for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
+ auto next = ReadBunch->GetNextBunch();
+ if (next != nullptr) {
+ ReadBunch = next;
+ return true;
+ }
+ SpinLockPause();
+ }
+ return false;
+ }
+
+ TSlot<TAux> StubbornPop() {
+ for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
+ auto result = ReadBunch->GetSlotAux(ReadSlot);
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ ++ReadSlot;
+ return result;
+ }
+ SpinLockPause();
+ }
+
+ AddToReadLater();
+ return TSlot<TAux>::NullElem();
+ }
+
+ TSlot<TAux> StubbornPeek() {
+ for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
+ auto result = ReadBunch->GetSlotAux(ReadSlot);
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ return result;
+ }
+ SpinLockPause();
+ }
+
+ AddToReadLater();
+ return TSlot<TAux>::NullElem();
+ }
+
+ TSlot<TAux> PopSkipped() {
+ do {
+ auto elem = ReadNow.front();
+ ReadNow.pop_front();
+
+ auto result = elem.Bunch->GetSlotAux(elem.Slot);
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ elem.Token->DecrementToken();
+ return result;
+ }
+
+ ReadLater.emplace_back(elem);
+
+ } while (ReadNow.size() > 0);
+
+ return TSlot<TAux>::NullElem();
+ }
+
+ TSlot<TAux> PeekSkipped() {
+ do {
+ auto elem = ReadNow.front();
+
+ auto result = elem.Bunch->GetSlotAux(elem.Slot);
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ return result;
+ }
+
+ ReadNow.pop_front();
+ ReadLater.emplace_back(elem);
+
+ } while (ReadNow.size() > 0);
+
+ return TSlot<TAux>::NullElem();
+ }
+ };
+
+ struct TDefaultParams {
+ static constexpr ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE;
+ using TBunchBase = TEmpty;
+
+ template <typename TElem, typename... TRest>
+ using TContainer = TDeque<TElem, TRest...>;
+
+ static constexpr bool DeleteItems = true;
+ };
+
+ } //namespace NReadAsFilledPrivate
+
+ DeclareTuneValueParam(TRaFQueueBunchSize, ui32, BUNCH_SIZE);
+ DeclareTuneTypeParam(TRaFQueueBunchBase, TBunchBase);
+ DeclareTuneContainer(TRaFQueueSkipContainer, TContainer);
+ DeclareTuneValueParam(TRaFQueueDeleteItems, bool, DeleteItems);
+
+ template <typename TItem = void, typename... TParams>
+ class TReadAsFilledQueue {
+ private:
+ using TTuned = TTune<NReadAsFilledPrivate::TDefaultParams, TParams...>;
+
+ static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE;
+
+ using TBunchBase = typename TTuned::TBunchBase;
+
+ template <typename TElem, typename... TRest>
+ using TContainer =
+ typename TTuned::template TContainer<TElem, TRest...>;
+
+ using TWriteBucket =
+ NReadAsFilledPrivate::TWriteBucket<BUNCH_SIZE, TBunchBase>;
+ using TReadBucket =
+ NReadAsFilledPrivate::TReadBucket<TWriteBucket, TContainer>;
+
+ public:
+ TReadAsFilledQueue()
+ : RBucket(&WBucket)
+ {
+ }
+
+ ~TReadAsFilledQueue() {
+ if (TTuned::DeleteItems) {
+ for (;;) {
+ auto msg = Pop();
+ if (msg == nullptr) {
+ break;
+ }
+ TDelete::Destroy(msg);
+ }
+ }
+ }
+
+ void Push(TItem* msg) {
+ WBucket.Push((void*)msg, NReadAsFilledPrivate::TEmptyAux());
+ }
+
+ TItem* Pop() {
+ return (TItem*)RBucket.Pop();
+ }
+
+ TItem* Peek() {
+ return (TItem*)RBucket.Peek();
+ }
+
+ protected:
+ TWriteBucket WBucket;
+ TReadBucket RBucket;
+ };
+}
diff --git a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp
new file mode 100644
index 0000000000..2bd0c29821
--- /dev/null
+++ b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp
@@ -0,0 +1 @@
+#include "mpsc_vinfarr_obstructive.h"
diff --git a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h
new file mode 100644
index 0000000000..5f91f1b5a8
--- /dev/null
+++ b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h
@@ -0,0 +1,528 @@
+#pragma once
+
+/*
+ Semi-wait-free queue, multiple producers - one consumer. Strict order.
+ The queue algorithm is using concept of virtual infinite array.
+
+ A producer takes a number from a counter and atomicaly increments the counter.
+ The number taken is a number of a slot for the producer to put a new message
+ into infinite array.
+
+ Then producer constructs a virtual infinite array by bidirectional linked list
+ of blocks. Each block contains several slots.
+
+ There is a hint pointer which optimisticly points to the last block
+ of the list and never goes backward.
+
+ Consumer exploits the property of the hint pointer always going forward
+ to free old blocks eventually. Consumer periodically read the hint pointer
+ and the counter and thus deduce producers which potentially holds the pointer
+ to a block. Consumer can free the block if all that producers filled their
+ slots and left the queue.
+
+ No producer can stop the progress for other producers.
+
+ Consumer can obstruct a slot of a delayed producer by putting special mark.
+ Thus no producer can stop the progress for consumer.
+ But a slow producer may be forced to retry unlimited number of times.
+ Though it's very unlikely for a non-preempted producer to be obstructed.
+ That's why the algorithm is semi-wait-free.
+
+ WARNING: there is no wait&notify mechanic for consumer,
+ consumer receives nullptr if queue was empty.
+
+ WARNING: though the algorithm itself is lock-free
+ but producers and consumer could be blocked by memory allocator
+
+ WARNING: copy constructers of the queue are not thread-safe
+ */
+
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <util/system/atomic.h>
+#include <util/system/spinlock.h>
+
+#include "tune.h"
+
+namespace NThreading {
+ namespace NObstructiveQueuePrivate {
+ typedef void* TMsgLink;
+
+ struct TEmpty {
+ };
+
+ struct TEmptyAux {
+ TEmptyAux Retrieve() const {
+ return TEmptyAux();
+ }
+ void Store(TEmptyAux&) {
+ }
+ static constexpr TEmptyAux Zero() {
+ return TEmptyAux();
+ }
+ };
+
+ template <typename TAux>
+ struct TSlot {
+ TMsgLink volatile Msg;
+ TAux AuxiliaryData;
+
+ inline void Store(TAux& aux) {
+ AuxiliaryData.Store(aux);
+ }
+
+ inline TAux Retrieve() const {
+ return AuxiliaryData.Retrieve();
+ }
+
+ static TSlot<TAux> NullElem() {
+ return {nullptr, TAux::Zero()};
+ }
+
+ static TSlot<TAux> Pair(TMsgLink msg, TAux aux) {
+ return {msg, std::move(aux)};
+ }
+ };
+
+ template <>
+ struct TSlot<TEmptyAux> {
+ TMsgLink volatile Msg;
+ inline void Store(TEmptyAux&) {
+ }
+ inline TEmptyAux Retrieve() const {
+ return TEmptyAux();
+ }
+
+ static TSlot<TEmptyAux> NullElem() {
+ return {nullptr};
+ }
+
+ static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) {
+ return {msg};
+ }
+ };
+
+ enum TPushResult {
+ PUSH_RESULT_OK,
+ PUSH_RESULT_BACKWARD,
+ PUSH_RESULT_FORWARD,
+ PUSH_RESULT_BLOCKED,
+ };
+
+ template <typename TAux, ui32 BUNCH_SIZE, typename TBase = TEmpty>
+ struct TMsgBunch: public TBase {
+ ui64 FirstSlot;
+
+ TSlot<TAux> LinkArray[BUNCH_SIZE];
+
+ TMsgBunch* volatile NextBunch;
+ TMsgBunch* volatile BackLink;
+
+ ui64 volatile Token;
+ TMsgBunch* volatile NextToken;
+
+ /* this push can return PUSH_RESULT_BLOCKED */
+ inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) {
+ if (Y_UNLIKELY(slot < FirstSlot)) {
+ return PUSH_RESULT_BACKWARD;
+ }
+
+ if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) {
+ return PUSH_RESULT_FORWARD;
+ }
+
+ LinkArray[slot - FirstSlot].Store(auxiliary);
+
+ auto oldValue = AtomicSwap(&LinkArray[slot - FirstSlot].Msg, msg);
+
+ if (Y_LIKELY(oldValue == nullptr)) {
+ return PUSH_RESULT_OK;
+ } else {
+ LeaveBlocked(oldValue);
+ return PUSH_RESULT_BLOCKED;
+ }
+ }
+
+ inline bool IsSlotHere(ui64 slot) {
+ return slot < FirstSlot + BUNCH_SIZE;
+ }
+
+ inline TMsgLink GetSlot(ui64 slot) const {
+ return AtomicGet(LinkArray[slot - FirstSlot].Msg);
+ }
+
+ inline TSlot<TAux> GetSlotAux(ui64 slot) const {
+ auto msg = GetSlot(slot);
+ auto aux = LinkArray[slot - FirstSlot].Retrieve();
+ return TSlot<TAux>::Pair(msg, aux);
+ }
+
+ void LeaveBlocked(ui64 slot) {
+ auto token = GetToken(slot);
+ token->DecrementToken();
+ }
+
+ void LeaveBlocked(TMsgLink msg) {
+ auto token = reinterpret_cast<TMsgBunch*>(msg);
+ token->DecrementToken();
+ }
+
+ TSlot<TAux> BlockSlotAux(ui64 slot, TMsgBunch* token) {
+ auto old =
+ AtomicSwap(&LinkArray[slot - FirstSlot].Msg, (TMsgLink)token);
+ if (old == nullptr) {
+ // It's valid to increment after AtomicCas
+ // because token will release data only after SetNextToken
+ token->IncrementToken();
+ return TSlot<TAux>::NullElem();
+ }
+ return TSlot<TAux>::Pair(old, LinkArray[slot - FirstSlot].Retrieve());
+ }
+
+ inline TMsgBunch* GetNextBunch() const {
+ return AtomicGet(NextBunch);
+ }
+
+ inline bool SetNextBunch(TMsgBunch* ptr) {
+ return AtomicCas(&NextBunch, ptr, nullptr);
+ }
+
+ inline TMsgBunch* GetBackLink() const {
+ return AtomicGet(BackLink);
+ }
+
+ inline TMsgBunch* GetToken(ui64 slot) {
+ return reinterpret_cast<TMsgBunch*>(LinkArray[slot - FirstSlot].Msg);
+ }
+
+ inline void IncrementToken() {
+ AtomicIncrement(Token);
+ }
+
+ // the object could be destroyed after this method
+ inline void DecrementToken() {
+ if (Y_UNLIKELY(AtomicDecrement(Token) == BUNCH_SIZE)) {
+ Release(this);
+ AtomicGet(NextToken)->DecrementToken();
+ // this could be invalid here
+ }
+ }
+
+ // the object could be destroyed after this method
+ inline void SetNextToken(TMsgBunch* next) {
+ AtomicSet(NextToken, next);
+ if (Y_UNLIKELY(AtomicAdd(Token, BUNCH_SIZE) == BUNCH_SIZE)) {
+ Release(this);
+ next->DecrementToken();
+ }
+ // this could be invalid here
+ }
+
+ TMsgBunch(ui64 start, TMsgBunch* backLink) {
+ AtomicSet(FirstSlot, start);
+ memset(&LinkArray, 0, sizeof(LinkArray));
+ AtomicSet(NextBunch, nullptr);
+ AtomicSet(BackLink, backLink);
+
+ AtomicSet(Token, 1);
+ AtomicSet(NextToken, nullptr);
+ }
+
+ static void Release(TMsgBunch* bunch) {
+ auto backLink = AtomicGet(bunch->BackLink);
+ if (backLink == nullptr) {
+ return;
+ }
+ AtomicSet(bunch->BackLink, nullptr);
+
+ do {
+ auto bbackLink = backLink->BackLink;
+ delete backLink;
+ backLink = bbackLink;
+ } while (backLink != nullptr);
+ }
+
+ void Destroy() {
+ for (auto tail = BackLink; tail != nullptr;) {
+ auto next = tail->BackLink;
+ delete tail;
+ tail = next;
+ }
+
+ for (auto next = this; next != nullptr;) {
+ auto nnext = next->NextBunch;
+ delete next;
+ next = nnext;
+ }
+ }
+ };
+
+ template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase = TEmpty>
+ class TWriteBucket {
+ public:
+ static const ui64 GROSS_SIZE;
+
+ using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>;
+
+ TWriteBucket(TBunch* bunch = new TBunch(0, nullptr))
+ : LastBunch(bunch)
+ , SlotCounter(0)
+ {
+ }
+
+ TWriteBucket(TWriteBucket&& move)
+ : LastBunch(move.LastBunch)
+ , SlotCounter(move.SlotCounter)
+ {
+ move.LastBunch = nullptr;
+ }
+
+ ~TWriteBucket() {
+ if (LastBunch != nullptr) {
+ LastBunch->Destroy();
+ }
+ }
+
+ inline bool Push(TMsgLink msg, TAux aux) {
+ ui64 pushSlot = AtomicGetAndIncrement(SlotCounter);
+ TBunch* hintBunch = GetLastBunch();
+
+ for (;;) {
+ auto hint = hintBunch->Push(msg, pushSlot, aux);
+ if (Y_LIKELY(hint == PUSH_RESULT_OK)) {
+ return true;
+ }
+ bool hhResult = HandleHint(hintBunch, hint);
+ if (Y_UNLIKELY(!hhResult)) {
+ return false;
+ }
+ }
+ }
+
+ protected:
+ template <typename, ui32, typename>
+ friend class TReadBucket;
+
+ TBunch* volatile LastBunch; // Hint
+ volatile ui64 SlotCounter;
+
+ inline TBunch* GetLastBunch() const {
+ return AtomicGet(LastBunch);
+ }
+
+ bool HandleHint(TBunch*& hintBunch, TPushResult hint) {
+ if (Y_UNLIKELY(hint == PUSH_RESULT_BLOCKED)) {
+ return false;
+ }
+
+ if (Y_UNLIKELY(hint == PUSH_RESULT_BACKWARD)) {
+ hintBunch = hintBunch->GetBackLink();
+ return true;
+ }
+
+ // PUSH_RESULT_FORWARD
+ auto nextBunch = hintBunch->GetNextBunch();
+
+ if (nextBunch == nullptr) {
+ auto first = hintBunch->FirstSlot + BUNCH_SIZE;
+ nextBunch = new TBunch(first, hintBunch);
+ if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) {
+ delete nextBunch;
+ nextBunch = hintBunch->GetNextBunch();
+ }
+ }
+
+ // hintBunch could not be freed here so it cannot be reused
+ // it's alright if this CAS was not succeeded,
+ // it means that other thread did that recently
+ AtomicCas(&LastBunch, nextBunch, hintBunch);
+
+ hintBunch = nextBunch;
+ return true;
+ }
+ };
+
+ template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase>
+ class TReadBucket {
+ public:
+ static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 20;
+
+ using TWBucket = TWriteBucket<TAux, BUNCH_SIZE, TBunchBase>;
+ using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>;
+
+ TReadBucket(TWBucket* writer)
+ : Writer(writer)
+ , ReadBunch(writer->GetLastBunch())
+ , LastKnownPushBunch(writer->GetLastBunch())
+ {
+ ReadBunch->DecrementToken(); // no previous token
+ }
+
+ TReadBucket(TReadBucket toCopy, TWBucket* writer)
+ : TReadBucket(std::move(toCopy))
+ {
+ Writer = writer;
+ }
+
+ ui64 ReadyCount() const {
+ return AtomicGet(Writer->SlotCounter) - ReadSlot;
+ }
+
+ inline TMsgLink Pop() {
+ return PopAux().Msg;
+ }
+
+ inline TSlot<TAux> PopAux() {
+ for (;;) {
+ if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {
+ if (Y_LIKELY(!RereadPushSlot())) {
+ return TSlot<TAux>::NullElem();
+ }
+ }
+
+ if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {
+ if (Y_UNLIKELY(!SwitchToNextBunch())) {
+ return TSlot<TAux>::NullElem();
+ }
+ }
+
+ auto result = ReadBunch->GetSlotAux(ReadSlot);
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ ++ReadSlot;
+ return result;
+ }
+
+ if (ReadSlot + 1 == AtomicGet(Writer->SlotCounter)) {
+ return TSlot<TAux>::NullElem();
+ }
+
+ result = StubbornPopAux();
+
+ if (result.Msg != nullptr) {
+ return result;
+ }
+ }
+ }
+
+ private:
+ TWBucket* Writer;
+ TBunch* ReadBunch;
+ ui64 ReadSlot = 0;
+ TBunch* LastKnownPushBunch;
+ ui64 LastKnownPushSlot = 0;
+
+ // MUST BE: ReadSlot == LastKnownPushSlot
+ bool RereadPushSlot() {
+ auto oldSlot = LastKnownPushSlot;
+
+ auto currentPushBunch = Writer->GetLastBunch();
+ auto currentPushSlot = AtomicGet(Writer->SlotCounter);
+
+ if (currentPushBunch != LastKnownPushBunch) {
+ // LastKnownPushBunch could be invalid after this line
+ LastKnownPushBunch->SetNextToken(currentPushBunch);
+ }
+
+ LastKnownPushBunch = currentPushBunch;
+ LastKnownPushSlot = currentPushSlot;
+
+ return oldSlot != LastKnownPushSlot;
+ }
+
+ bool SwitchToNextBunch() {
+ for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
+ auto next = ReadBunch->GetNextBunch();
+ if (next != nullptr) {
+ ReadBunch = next;
+ return true;
+ }
+ SpinLockPause();
+ }
+ return false;
+ }
+
+ TSlot<TAux> StubbornPopAux() {
+ for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
+ auto result = ReadBunch->GetSlotAux(ReadSlot);
+ if (Y_LIKELY(result.Msg != nullptr)) {
+ ++ReadSlot;
+ return result;
+ }
+ SpinLockPause();
+ }
+
+ return ReadBunch->BlockSlotAux(ReadSlot++, LastKnownPushBunch);
+ }
+ };
+
+ struct TDefaultParams {
+ static constexpr bool DeleteItems = true;
+ using TAux = NObstructiveQueuePrivate::TEmptyAux;
+ using TBunchBase = NObstructiveQueuePrivate::TEmpty;
+ static constexpr ui32 BUNCH_SIZE = 251;
+ };
+
+ } //namespace NObstructiveQueuePrivate
+
+ DeclareTuneValueParam(TObstructiveQueueBunchSize, ui32, BUNCH_SIZE);
+ DeclareTuneValueParam(TObstructiveQueueDeleteItems, bool, DeleteItems);
+ DeclareTuneTypeParam(TObstructiveQueueBunchBase, TBunchBase);
+ DeclareTuneTypeParam(TObstructiveQueueAux, TAux);
+
+ template <typename TItem = void, typename... TParams>
+ class TObstructiveConsumerAuxQueue {
+ private:
+ using TTuned =
+ TTune<NObstructiveQueuePrivate::TDefaultParams, TParams...>;
+
+ using TAux = typename TTuned::TAux;
+ using TSlot = NObstructiveQueuePrivate::TSlot<TAux>;
+ using TMsgLink = NObstructiveQueuePrivate::TMsgLink;
+ using TBunchBase = typename TTuned::TBunchBase;
+ static constexpr bool DeleteItems = TTuned::DeleteItems;
+ static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE;
+
+ public:
+ TObstructiveConsumerAuxQueue()
+ : RBuckets(&WBucket)
+ {
+ }
+
+ ~TObstructiveConsumerAuxQueue() {
+ if (DeleteItems) {
+ for (;;) {
+ auto msg = Pop();
+ if (msg == nullptr) {
+ break;
+ }
+ TDelete::Destroy(msg);
+ }
+ }
+ }
+
+ void Push(TItem* msg) {
+ while (!WBucket.Push(reinterpret_cast<TMsgLink>(msg), TAux())) {
+ }
+ }
+
+ TItem* Pop() {
+ return reinterpret_cast<TItem*>(RBuckets.Pop());
+ }
+
+ TSlot PopAux() {
+ return RBuckets.PopAux();
+ }
+
+ private:
+ NObstructiveQueuePrivate::TWriteBucket<TAux, BUNCH_SIZE, TBunchBase>
+ WBucket;
+ NObstructiveQueuePrivate::TReadBucket<TAux, BUNCH_SIZE, TBunchBase>
+ RBuckets;
+ };
+
+ template <typename TItem = void, bool DeleteItems = true>
+ class TObstructiveConsumerQueue
+ : public TObstructiveConsumerAuxQueue<TItem,
+ TObstructiveQueueDeleteItems<DeleteItems>> {
+ };
+}
diff --git a/library/cpp/threading/queue/queue_ut.cpp b/library/cpp/threading/queue/queue_ut.cpp
new file mode 100644
index 0000000000..80eca147da
--- /dev/null
+++ b/library/cpp/threading/queue/queue_ut.cpp
@@ -0,0 +1,242 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/system/thread.h>
+
+#include "ut_helpers.h"
+
+typedef void* TMsgLink;
+
+template <typename TQueueType>
+class TQueueTestProcs: public TTestBase {
+private:
+ UNIT_TEST_SUITE_DEMANGLE(TQueueTestProcs<TQueueType>);
+ UNIT_TEST(Threads2_Push1M_Threads1_Pop2M)
+ UNIT_TEST(Threads4_Push1M_Threads1_Pop4M)
+ UNIT_TEST(Threads8_RndPush100K_Threads8_Queues)
+ /*
+ UNIT_TEST(Threads24_RndPush100K_Threads24_Queues)
+ UNIT_TEST(Threads24_RndPush100K_Threads8_Queues)
+ UNIT_TEST(Threads24_RndPush100K_Threads4_Queues)
+*/
+ UNIT_TEST_SUITE_END();
+
+public:
+ void Push1M_Pop1M() {
+ TQueueType queue;
+ TMsgLink msg = &queue;
+
+ auto pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+
+ for (int i = 0; i < 1000000; ++i) {
+ queue.Push((char*)msg + i);
+ }
+
+ for (int i = 0; i < 1000000; ++i) {
+ auto popped = queue.Pop();
+ UNIT_ASSERT_EQUAL((char*)msg + i, popped);
+ }
+
+ pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ }
+
+ void Threads2_Push1M_Threads1_Pop2M() {
+ TQueueType queue;
+
+ class TPusherThread: public ISimpleThread {
+ public:
+ TPusherThread(TQueueType& theQueue, char* start)
+ : Queue(theQueue)
+ , Arg(start)
+ {
+ }
+
+ TQueueType& Queue;
+ char* Arg;
+
+ void* ThreadProc() override {
+ for (int i = 0; i < 1000000; ++i) {
+ Queue.Push(Arg + i);
+ }
+ return nullptr;
+ }
+ };
+
+ TPusherThread pusher1(queue, (char*)&queue);
+ TPusherThread pusher2(queue, (char*)&queue + 2000000);
+
+ pusher1.Start();
+ pusher2.Start();
+
+ for (int i = 0; i < 2000000; ++i) {
+ while (queue.Pop() == nullptr) {
+ SpinLockPause();
+ }
+ }
+
+ auto pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ }
+
+ void Threads4_Push1M_Threads1_Pop4M() {
+ TQueueType queue;
+
+ class TPusherThread: public ISimpleThread {
+ public:
+ TPusherThread(TQueueType& theQueue, char* start)
+ : Queue(theQueue)
+ , Arg(start)
+ {
+ }
+
+ TQueueType& Queue;
+ char* Arg;
+
+ void* ThreadProc() override {
+ for (int i = 0; i < 1000000; ++i) {
+ Queue.Push(Arg + i);
+ }
+ return nullptr;
+ }
+ };
+
+ TPusherThread pusher1(queue, (char*)&queue);
+ TPusherThread pusher2(queue, (char*)&queue + 2000000);
+ TPusherThread pusher3(queue, (char*)&queue + 4000000);
+ TPusherThread pusher4(queue, (char*)&queue + 6000000);
+
+ pusher1.Start();
+ pusher2.Start();
+ pusher3.Start();
+ pusher4.Start();
+
+ for (int i = 0; i < 4000000; ++i) {
+ while (queue.Pop() == nullptr) {
+ SpinLockPause();
+ }
+ }
+
+ auto pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ }
+
+ template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES>
+ void ManyRndPush100K_ManyQueues() {
+ TQueueType queue[NUMBER_OF_QUEUES];
+
+ class TPusherThread: public ISimpleThread {
+ public:
+ TPusherThread(TQueueType* queues, char* start)
+ : Queues(queues)
+ , Arg(start)
+ {
+ }
+
+ TQueueType* Queues;
+ char* Arg;
+
+ void* ThreadProc() override {
+ ui64 counters[NUMBER_OF_QUEUES];
+ for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
+ counters[i] = 0;
+ }
+
+ for (int i = 0; i < 100000; ++i) {
+ size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES;
+ int cookie = counters[rnd]++;
+ Queues[rnd].Push(Arg + cookie);
+ }
+
+ for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
+ Queues[i].Push((void*)2ULL);
+ }
+
+ return nullptr;
+ }
+ };
+
+ class TPopperThread: public ISimpleThread {
+ public:
+ TPopperThread(TQueueType* theQueue, char* base)
+ : Queue(theQueue)
+ , Base(base)
+ {
+ }
+
+ TQueueType* Queue;
+ char* Base;
+
+ void* ThreadProc() override {
+ ui64 counters[NUMBER_OF_PUSHERS];
+ for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
+ counters[i] = 0;
+ }
+
+ for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) {
+ auto msg = Queue->Pop();
+ if (msg == nullptr) {
+ SpinLockPause();
+ continue;
+ }
+ if (msg == (void*)2ULL) {
+ ++fin;
+ continue;
+ }
+ ui64 shift = (char*)msg - Base;
+ auto pusherNum = shift / 200000000ULL;
+ auto msgNum = shift % 200000000ULL;
+
+ UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum);
+ ++counters[pusherNum];
+ }
+
+ auto pmsg = Queue->Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+
+ return nullptr;
+ }
+ };
+
+ TVector<TAutoPtr<TPopperThread>> poppers;
+ TVector<TAutoPtr<TPusherThread>> pushers;
+
+ for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
+ poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue));
+ poppers.back()->Start();
+ }
+
+ for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
+ pushers.emplace_back(
+ new TPusherThread(queue, (char*)&queue + 200000000ULL * i));
+ pushers.back()->Start();
+ }
+
+ for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
+ poppers[i]->Join();
+ }
+
+ for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
+ pushers[i]->Join();
+ }
+ }
+
+ void Threads8_RndPush100K_Threads8_Queues() {
+ ManyRndPush100K_ManyQueues<8, 8>();
+ }
+
+ /*
+ void Threads24_RndPush100K_Threads24_Queues() {
+ ManyRndPush100K_ManyQueues<24, 24>();
+ }
+
+ void Threads24_RndPush100K_Threads8_Queues() {
+ ManyRndPush100K_ManyQueues<24, 8>();
+ }
+
+ void Threads24_RndPush100K_Threads4_Queues() {
+ ManyRndPush100K_ManyQueues<24, 4>();
+ }
+ */
+};
+
+REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestProcs);
diff --git a/library/cpp/threading/queue/tune.h b/library/cpp/threading/queue/tune.h
new file mode 100644
index 0000000000..50fc3dc17c
--- /dev/null
+++ b/library/cpp/threading/queue/tune.h
@@ -0,0 +1,125 @@
+#pragma once
+
+/*
+ Motivation: consider you have a template class with many parameters
+ with default associations
+
+ template <typename A = TDefA,
+ typename B = TDefB,
+ typename C = TDefC,
+ typename D = TDefD>
+ class TExample {
+ };
+
+ consider you would like to provide easy to use interface to tune all
+ these parameters in position independed manner,
+ In that case TTune would be helpful for you.
+
+ How to use:
+ First step: declare a struct with all default associations
+
+ struct TDefaultTune {
+ using TStructA = TDefA;
+ using TStructB = TDefB;
+ using TStructC = TDefC;
+ using TStructD = TDefD;
+ };
+
+ Second step: declare helper names visible to a user
+
+ DeclareTuneTypeParam(TTuneParamA, TStructA);
+ DeclareTuneTypeParam(TTuneParamB, TStructB);
+ DeclareTuneTypeParam(TTuneParamC, TStructC);
+ DeclareTuneTypeParam(TTuneParamD, TStructD);
+
+ Third step: declare TExample this way:
+
+ template <typename...TParams>
+ class TExample {
+ using TMyParams = TTune<TDefaultTune, TParams...>;
+
+ using TActualA = TMyParams::TStructA;
+ using TActualB = TMyParams::TStructB;
+ ...
+ };
+
+ TTune<TDefaultTune, TParams...> is a struct with the default parameteres
+ taken from TDefaultTune and overridden from "TParams...".
+
+ for example: "TTune<TDefaultTune, TTuneParamC<TUserClass>>"
+ will be virtually the same as:
+
+ struct TTunedClass {
+ using TStructA = TDefA;
+ using TStructB = TDefB;
+ using TStructC = TUserClass;
+ using TStructD = TDefD;
+ };
+
+ From now on you can tune your TExample in the following manner:
+
+ using TCustomClass =
+ TExample <TTuneParamA<TUserStruct1>, TTuneParamD<TUserStruct2>>;
+
+ You can also tweak constant expressions in your TDefaultTune.
+ Consider you have:
+
+ struct TDefaultTune {
+ static constexpr ui32 MySize = 42;
+ };
+
+ declare an interface to modify the parameter this way:
+
+ DeclareTuneValueParam(TStructSize, ui32, MySize);
+
+ and tweak your class:
+
+ using TTwiceBigger = TExample<TStructSize<84>>;
+
+ */
+
+#define DeclareTuneTypeParam(TParamName, InternalName) \
+ template <typename TNewType> \
+ struct TParamName { \
+ template <typename TBase> \
+ struct TApply: public TBase { \
+ using InternalName = TNewType; \
+ }; \
+ }
+
+#define DeclareTuneValueParam(TParamName, TValueType, InternalName) \
+ template <TValueType NewValue> \
+ struct TParamName { \
+ template <typename TBase> \
+ struct TApply: public TBase { \
+ static constexpr TValueType InternalName = NewValue; \
+ }; \
+ }
+
+#define DeclareTuneContainer(TParamName, InternalName) \
+ template <template <typename, typename...> class TNewContainer> \
+ struct TParamName { \
+ template <typename TBase> \
+ struct TApply: public TBase { \
+ template <typename TElem, typename... TRest> \
+ using InternalName = TNewContainer<TElem, TRest...>; \
+ }; \
+ }
+
+namespace NTunePrivate {
+ template <typename TBase, typename... TParams>
+ struct TFold;
+
+ template <typename TBase>
+ struct TFold<TBase>: public TBase {
+ };
+
+ template <typename TBase, typename TFirstArg, typename... TRest>
+ struct TFold<TBase, TFirstArg, TRest...>
+ : public TFold<typename TFirstArg::template TApply<TBase>, TRest...> {
+ };
+}
+
+template <typename TDefault, typename... TParams>
+struct TTune: public NTunePrivate::TFold<TDefault, TParams...> {
+};
diff --git a/library/cpp/threading/queue/tune_ut.cpp b/library/cpp/threading/queue/tune_ut.cpp
new file mode 100644
index 0000000000..7e980d3e27
--- /dev/null
+++ b/library/cpp/threading/queue/tune_ut.cpp
@@ -0,0 +1,118 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include "tune.h"
+
+struct TDefaultStructA {
+};
+
+struct TDefaultStructB {
+};
+
+struct TDefaults {
+ using TStructA = TDefaultStructA;
+ using TStructB = TDefaultStructB;
+ static constexpr ui32 Param1 = 42;
+ static constexpr ui32 Param2 = 42;
+};
+
+DeclareTuneTypeParam(TweakStructA, TStructA);
+DeclareTuneTypeParam(TweakStructB, TStructB);
+DeclareTuneValueParam(TweakParam1, ui32, Param1);
+DeclareTuneValueParam(TweakParam2, ui32, Param2);
+
+Y_UNIT_TEST_SUITE(TestTuning) {
+ Y_UNIT_TEST(Defaults) {
+ using TTuned = TTune<TDefaults>;
+ using TunedA = TTuned::TStructA;
+ using TunedB = TTuned::TStructB;
+ auto sameA = std::is_same<TDefaultStructA, TunedA>::value;
+ auto sameB = std::is_same<TDefaultStructB, TunedB>::value;
+ auto param1 = TTuned::Param1;
+ auto param2 = TTuned::Param2;
+
+ UNIT_ASSERT(sameA);
+ UNIT_ASSERT(sameB);
+ UNIT_ASSERT_EQUAL(param1, 42);
+ UNIT_ASSERT_EQUAL(param2, 42);
+ }
+
+ Y_UNIT_TEST(TuneStructA) {
+ struct TMyStruct {
+ };
+
+ using TTuned = TTune<TDefaults, TweakStructA<TMyStruct>>;
+
+ using TunedA = TTuned::TStructA;
+ using TunedB = TTuned::TStructB;
+ //auto sameA = std::is_same<TDefaultStructA, TunedA>::value;
+ auto sameB = std::is_same<TDefaultStructB, TunedB>::value;
+ auto param1 = TTuned::Param1;
+ auto param2 = TTuned::Param2;
+
+ auto sameA = std::is_same<TMyStruct, TunedA>::value;
+
+ UNIT_ASSERT(sameA);
+ UNIT_ASSERT(sameB);
+ UNIT_ASSERT_EQUAL(param1, 42);
+ UNIT_ASSERT_EQUAL(param2, 42);
+ }
+
+ Y_UNIT_TEST(TuneParam1) {
+ using TTuned = TTune<TDefaults, TweakParam1<24>>;
+
+ using TunedA = TTuned::TStructA;
+ using TunedB = TTuned::TStructB;
+ auto sameA = std::is_same<TDefaultStructA, TunedA>::value;
+ auto sameB = std::is_same<TDefaultStructB, TunedB>::value;
+ auto param1 = TTuned::Param1;
+ auto param2 = TTuned::Param2;
+
+ UNIT_ASSERT(sameA);
+ UNIT_ASSERT(sameB);
+ UNIT_ASSERT_EQUAL(param1, 24);
+ UNIT_ASSERT_EQUAL(param2, 42);
+ }
+
+ Y_UNIT_TEST(TuneStructAAndParam1) {
+ struct TMyStruct {
+ };
+
+ using TTuned =
+ TTune<TDefaults, TweakStructA<TMyStruct>, TweakParam1<24>>;
+
+ using TunedA = TTuned::TStructA;
+ using TunedB = TTuned::TStructB;
+ //auto sameA = std::is_same<TDefaultStructA, TunedA>::value;
+ auto sameB = std::is_same<TDefaultStructB, TunedB>::value;
+ auto param1 = TTuned::Param1;
+ auto param2 = TTuned::Param2;
+
+ auto sameA = std::is_same<TMyStruct, TunedA>::value;
+
+ UNIT_ASSERT(sameA);
+ UNIT_ASSERT(sameB);
+ UNIT_ASSERT_EQUAL(param1, 24);
+ UNIT_ASSERT_EQUAL(param2, 42);
+ }
+
+ Y_UNIT_TEST(TuneParam1AndStructA) {
+ struct TMyStruct {
+ };
+
+ using TTuned =
+ TTune<TDefaults, TweakParam1<24>, TweakStructA<TMyStruct>>;
+
+ using TunedA = TTuned::TStructA;
+ using TunedB = TTuned::TStructB;
+ //auto sameA = std::is_same<TDefaultStructA, TunedA>::value;
+ auto sameB = std::is_same<TDefaultStructB, TunedB>::value;
+ auto param1 = TTuned::Param1;
+ auto param2 = TTuned::Param2;
+
+ auto sameA = std::is_same<TMyStruct, TunedA>::value;
+
+ UNIT_ASSERT(sameA);
+ UNIT_ASSERT(sameB);
+ UNIT_ASSERT_EQUAL(param1, 24);
+ UNIT_ASSERT_EQUAL(param2, 42);
+ }
+}
diff --git a/library/cpp/threading/queue/unordered_ut.cpp b/library/cpp/threading/queue/unordered_ut.cpp
new file mode 100644
index 0000000000..a43b7f520e
--- /dev/null
+++ b/library/cpp/threading/queue/unordered_ut.cpp
@@ -0,0 +1,154 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/system/thread.h>
+#include <algorithm>
+#include <util/generic/vector.h>
+#include <util/random/fast.h>
+
+#include "ut_helpers.h"
+
+template <typename TQueueType>
+class TTestUnorderedQueue: public TTestBase {
+private:
+ using TLink = TIntrusiveLink;
+
+ UNIT_TEST_SUITE_DEMANGLE(TTestUnorderedQueue<TQueueType>);
+ UNIT_TEST(Push1M_Pop1M_Unordered)
+ UNIT_TEST_SUITE_END();
+
+public:
+ void Push1M_Pop1M_Unordered() {
+ constexpr int REPEAT = 1000000;
+ TQueueType queue;
+ TLink msg[REPEAT];
+
+ auto pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+
+ for (int i = 0; i < REPEAT; ++i) {
+ queue.Push(&msg[i]);
+ }
+
+ TVector<TLink*> popped;
+ popped.reserve(REPEAT);
+ for (int i = 0; i < REPEAT; ++i) {
+ popped.push_back((TLink*)queue.Pop());
+ }
+
+ pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+
+ std::sort(popped.begin(), popped.end());
+ for (int i = 0; i < REPEAT; ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(&msg[i], popped[i]);
+ }
+ }
+};
+
+template <typename TQueueType>
+class TTestWeakQueue: public TTestBase {
+private:
+ UNIT_TEST_SUITE_DEMANGLE(TTestWeakQueue<TQueueType>);
+ UNIT_TEST(Threads8_Rnd_Exchange)
+ UNIT_TEST_SUITE_END();
+
+public:
+ template <ui16 COUNT = 48, ui32 MSG_COUNT = 10000>
+ void ManyThreadsRndExchange() {
+ TQueueType queues[COUNT];
+
+ class TWorker: public ISimpleThread {
+ public:
+ TWorker(
+ TQueueType* queues_,
+ ui16 mine,
+ TAtomic* pushDone)
+ : Queues(queues_)
+ , MineQueue(mine)
+ , PushDone(pushDone)
+ {
+ }
+
+ TQueueType* Queues;
+ ui16 MineQueue;
+ TVector<uintptr_t> Received;
+ TAtomic* PushDone;
+
+ void* ThreadProc() override {
+ TReallyFastRng32 rng(GetCycleCount());
+ Received.reserve(MSG_COUNT * 2);
+
+ for (ui32 loop = 1; loop <= MSG_COUNT; ++loop) {
+ for (;;) {
+ auto msg = Queues[MineQueue].Pop();
+ if (msg == nullptr) {
+ break;
+ }
+
+ Received.push_back((uintptr_t)msg);
+ }
+
+ ui16 rnd = rng.GenRand64() % COUNT;
+ ui64 msg = ((ui64)MineQueue << 32) + loop;
+ while (!Queues[rnd].Push((void*)msg)) {
+ }
+ }
+
+ AtomicIncrement(*PushDone);
+
+ for (;;) {
+ bool isItLast = AtomicGet(*PushDone) == COUNT;
+ auto msg = Queues[MineQueue].Pop();
+ if (msg != nullptr) {
+ Received.push_back((uintptr_t)msg);
+ } else {
+ if (isItLast) {
+ break;
+ }
+ SpinLockPause();
+ }
+ }
+
+ for (ui64 last = 0;;) {
+ auto msg = Queues[MineQueue].UnsafeScanningPop(&last);
+ if (msg == nullptr) {
+ break;
+ }
+ Received.push_back((uintptr_t)msg);
+ }
+
+ return nullptr;
+ }
+ };
+
+ TVector<TAutoPtr<TWorker>> workers;
+ TAtomic pushDone = 0;
+
+ for (ui32 i = 0; i < COUNT; ++i) {
+ workers.emplace_back(new TWorker(&queues[0], i, &pushDone));
+ workers.back()->Start();
+ }
+
+ TVector<uintptr_t> all;
+ for (ui32 i = 0; i < COUNT; ++i) {
+ workers[i]->Join();
+ all.insert(all.begin(),
+ workers[i]->Received.begin(), workers[i]->Received.end());
+ }
+
+ std::sort(all.begin(), all.end());
+ auto iter = all.begin();
+ for (ui32 i = 0; i < COUNT; ++i) {
+ for (ui32 k = 1; k <= MSG_COUNT; ++k) {
+ UNIT_ASSERT_VALUES_EQUAL(((ui64)i << 32) + k, *iter);
+ ++iter;
+ }
+ }
+ }
+
+ void Threads8_Rnd_Exchange() {
+ ManyThreadsRndExchange<8>();
+ }
+};
+
+REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TTestUnorderedQueue);
+UNIT_TEST_SUITE_REGISTRATION(TTestWeakQueue<TMPMCUnorderedRing>);
diff --git a/library/cpp/threading/queue/ut/ya.make b/library/cpp/threading/queue/ut/ya.make
new file mode 100644
index 0000000000..8883d9bf69
--- /dev/null
+++ b/library/cpp/threading/queue/ut/ya.make
@@ -0,0 +1,16 @@
+UNITTEST_FOR(library/cpp/threading/queue)
+
+OWNER(agri)
+
+ALLOCATOR(B)
+
+SRCS(
+ basic_ut.cpp
+ queue_ut.cpp
+ tune_ut.cpp
+ unordered_ut.cpp
+ ut_helpers.cpp
+ ut_helpers.h
+)
+
+END()
diff --git a/library/cpp/threading/queue/ut_helpers.cpp b/library/cpp/threading/queue/ut_helpers.cpp
new file mode 100644
index 0000000000..aa3a831441
--- /dev/null
+++ b/library/cpp/threading/queue/ut_helpers.cpp
@@ -0,0 +1 @@
+#include "ut_helpers.h"
diff --git a/library/cpp/threading/queue/ut_helpers.h b/library/cpp/threading/queue/ut_helpers.h
new file mode 100644
index 0000000000..2756b52601
--- /dev/null
+++ b/library/cpp/threading/queue/ut_helpers.h
@@ -0,0 +1,40 @@
+#pragma once
+
+#include "mpsc_read_as_filled.h"
+#include "mpsc_htswap.h"
+#include "mpsc_vinfarr_obstructive.h"
+#include "mpsc_intrusive_unordered.h"
+#include "mpmc_unordered_ring.h"
+
+struct TBasicHTSwap: public NThreading::THTSwapQueue<> {
+};
+
+struct TBasicReadAsFilled: public NThreading::TReadAsFilledQueue<> {
+};
+
+struct TBasicObstructiveConsumer
+ : public NThreading::TObstructiveConsumerQueue<> {
+};
+
+struct TBasicMPSCIntrusiveUnordered
+ : public NThreading::TMPSCIntrusiveUnordered {
+};
+
+struct TIntrusiveLink: public NThreading::TIntrusiveNode {
+};
+
+struct TMPMCUnorderedRing: public NThreading::TMPMCUnorderedRing {
+ TMPMCUnorderedRing()
+ : NThreading::TMPMCUnorderedRing(10000000)
+ {
+ }
+};
+
+#define REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TestTemplate) \
+ UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicHTSwap>); \
+ UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicReadAsFilled>); \
+ UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicObstructiveConsumer>)
+
+#define REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TestTemplate) \
+ UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicMPSCIntrusiveUnordered>); \
+ UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TMPMCUnorderedRing>);
diff --git a/library/cpp/threading/queue/ya.make b/library/cpp/threading/queue/ya.make
new file mode 100644
index 0000000000..6570b38ce5
--- /dev/null
+++ b/library/cpp/threading/queue/ya.make
@@ -0,0 +1,18 @@
+LIBRARY()
+
+OWNER(agri)
+
+SRCS(
+ mpmc_unordered_ring.cpp
+ mpmc_unordered_ring.h
+ mpsc_htswap.cpp
+ mpsc_htswap.h
+ mpsc_intrusive_unordered.cpp
+ mpsc_intrusive_unordered.h
+ mpsc_read_as_filled.cpp
+ mpsc_read_as_filled.h
+ mpsc_vinfarr_obstructive.cpp
+ mpsc_vinfarr_obstructive.h
+)
+
+END()