diff options
author | agri <agri@yandex-team.ru> | 2022-02-10 16:48:12 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:12 +0300 |
commit | d3530b2692e400bd4d29bd4f07cafaee139164e7 (patch) | |
tree | b7ae636a74490e649a2ed0fdd5361f1bec83b9f9 /library/cpp/threading/queue | |
parent | 0f4c5d1e8c0672bf0a1f2f2d8acac5ba24772435 (diff) | |
download | ydb-d3530b2692e400bd4d29bd4f07cafaee139164e7.tar.gz |
Restoring authorship annotation for <agri@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/queue')
19 files changed, 2215 insertions, 2215 deletions
diff --git a/library/cpp/threading/queue/basic_ut.cpp b/library/cpp/threading/queue/basic_ut.cpp index 5f56f8583e..2db5d6e8e8 100644 --- a/library/cpp/threading/queue/basic_ut.cpp +++ b/library/cpp/threading/queue/basic_ut.cpp @@ -1,92 +1,92 @@ #include <library/cpp/testing/unittest/registar.h> -#include <util/generic/vector.h> -#include <util/system/thread.h> - -#include "ut_helpers.h" - +#include <util/generic/vector.h> +#include <util/system/thread.h> + +#include "ut_helpers.h" + template <typename TQueueType> -class TQueueTestsInSingleThread: public TTestBase { -private: +class TQueueTestsInSingleThread: public TTestBase { +private: using TSelf = TQueueTestsInSingleThread<TQueueType>; - using TLink = TIntrusiveLink; - - UNIT_TEST_SUITE_DEMANGLE(TSelf); - UNIT_TEST(OnePushOnePop) - UNIT_TEST(OnePushOnePop_Repeat1M) - UNIT_TEST(Threads8_Repeat1M_Push1Pop1) - UNIT_TEST_SUITE_END(); - -public: - void OnePushOnePop() { + using TLink = TIntrusiveLink; + + UNIT_TEST_SUITE_DEMANGLE(TSelf); + UNIT_TEST(OnePushOnePop) + UNIT_TEST(OnePushOnePop_Repeat1M) + UNIT_TEST(Threads8_Repeat1M_Push1Pop1) + UNIT_TEST_SUITE_END(); + +public: + void OnePushOnePop() { TQueueType queue; - - auto popped = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); - - TLink msg; - queue.Push(&msg); - popped = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(&msg, popped); - - popped = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); - }; - - void OnePushOnePop_Repeat1M() { + + auto popped = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); + + TLink msg; + queue.Push(&msg); + popped = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(&msg, popped); + + popped = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); + }; + + void OnePushOnePop_Repeat1M() { TQueueType queue; - TLink msg; - - auto popped = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); - - for (int i = 0; i < 1000000; ++i) { - queue.Push(&msg); - popped = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(&msg, popped); - - popped = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); - } - } - - template <size_t NUMBER_OF_THREADS> - void RepeatPush1Pop1_InManyThreads() { + TLink msg; + + auto popped = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); + + for (int i = 0; i < 1000000; ++i) { + queue.Push(&msg); + popped = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(&msg, popped); + + popped = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); + } + } + + template <size_t NUMBER_OF_THREADS> + void RepeatPush1Pop1_InManyThreads() { class TCycleThread: public ISimpleThread { - public: - void* ThreadProc() override { + public: + void* ThreadProc() override { TQueueType queue; - TLink msg; - auto popped = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); - - for (size_t i = 0; i < 1000000; ++i) { - queue.Push(&msg); - popped = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(popped, &msg); - - popped = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); - } - return nullptr; - } - }; - + TLink msg; + auto popped = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); + + for (size_t i = 0; i < 1000000; ++i) { + queue.Push(&msg); + popped = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(popped, &msg); + + popped = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); + } + return nullptr; + } + }; + TVector<TAutoPtr<TCycleThread>> cyclers; - - for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) { - cyclers.emplace_back(new TCycleThread); - cyclers.back()->Start(); - } - - for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) { - cyclers[i]->Join(); - } - } - - void Threads8_Repeat1M_Push1Pop1() { - RepeatPush1Pop1_InManyThreads<8>(); - } -}; - -REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestsInSingleThread); -REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TQueueTestsInSingleThread) + + for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) { + cyclers.emplace_back(new TCycleThread); + cyclers.back()->Start(); + } + + for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) { + cyclers[i]->Join(); + } + } + + void Threads8_Repeat1M_Push1Pop1() { + RepeatPush1Pop1_InManyThreads<8>(); + } +}; + +REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestsInSingleThread); +REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TQueueTestsInSingleThread) diff --git a/library/cpp/threading/queue/mpmc_unordered_ring.cpp b/library/cpp/threading/queue/mpmc_unordered_ring.cpp index 160547f594..df48182210 100644 --- a/library/cpp/threading/queue/mpmc_unordered_ring.cpp +++ b/library/cpp/threading/queue/mpmc_unordered_ring.cpp @@ -1,74 +1,74 @@ -#include "mpmc_unordered_ring.h" - -namespace NThreading { - TMPMCUnorderedRing::TMPMCUnorderedRing(size_t size) { - Y_VERIFY(size > 0); - RingSize = size; - RingBuffer.Reset(new void*[size]); - memset(&RingBuffer[0], 0, sizeof(void*) * size); - } - - bool TMPMCUnorderedRing::Push(void* msg, ui16 retryCount) noexcept { - if (retryCount == 0) { - StubbornPush(msg); - return true; - } - for (ui16 itry = retryCount; itry-- > 0;) { - if (WeakPush(msg)) { - return true; - } - } - return false; - } - - bool TMPMCUnorderedRing::WeakPush(void* msg) noexcept { - auto pawl = AtomicIncrement(WritePawl); - if (pawl - AtomicGet(ReadFront) >= RingSize) { - // Queue is full - AtomicDecrement(WritePawl); - return false; - } - - auto writeSlot = AtomicGetAndIncrement(WriteFront); - if (AtomicCas(&RingBuffer[writeSlot % RingSize], msg, nullptr)) { - return true; - } - // slot is occupied for some reason, retry - return false; - } - - void* TMPMCUnorderedRing::Pop() noexcept { - ui64 readSlot; - - for (ui16 itry = MAX_POP_TRIES; itry-- > 0;) { - auto pawl = AtomicIncrement(ReadPawl); - if (pawl > AtomicGet(WriteFront)) { - // Queue is empty - AtomicDecrement(ReadPawl); - return nullptr; - } - - readSlot = AtomicGetAndIncrement(ReadFront); - - auto msg = AtomicSwap(&RingBuffer[readSlot % RingSize], nullptr); - if (msg != nullptr) { - return msg; - } - } - - /* got no message in the slot, let's try to rollback readfront */ - AtomicCas(&ReadFront, readSlot - 1, readSlot); - return nullptr; - } - - void* TMPMCUnorderedRing::UnsafeScanningPop(ui64* last) noexcept { - for (; *last < RingSize;) { - auto msg = AtomicSwap(&RingBuffer[*last], nullptr); - ++*last; - if (msg != nullptr) { - return msg; - } - } - return nullptr; - } -} +#include "mpmc_unordered_ring.h" + +namespace NThreading { + TMPMCUnorderedRing::TMPMCUnorderedRing(size_t size) { + Y_VERIFY(size > 0); + RingSize = size; + RingBuffer.Reset(new void*[size]); + memset(&RingBuffer[0], 0, sizeof(void*) * size); + } + + bool TMPMCUnorderedRing::Push(void* msg, ui16 retryCount) noexcept { + if (retryCount == 0) { + StubbornPush(msg); + return true; + } + for (ui16 itry = retryCount; itry-- > 0;) { + if (WeakPush(msg)) { + return true; + } + } + return false; + } + + bool TMPMCUnorderedRing::WeakPush(void* msg) noexcept { + auto pawl = AtomicIncrement(WritePawl); + if (pawl - AtomicGet(ReadFront) >= RingSize) { + // Queue is full + AtomicDecrement(WritePawl); + return false; + } + + auto writeSlot = AtomicGetAndIncrement(WriteFront); + if (AtomicCas(&RingBuffer[writeSlot % RingSize], msg, nullptr)) { + return true; + } + // slot is occupied for some reason, retry + return false; + } + + void* TMPMCUnorderedRing::Pop() noexcept { + ui64 readSlot; + + for (ui16 itry = MAX_POP_TRIES; itry-- > 0;) { + auto pawl = AtomicIncrement(ReadPawl); + if (pawl > AtomicGet(WriteFront)) { + // Queue is empty + AtomicDecrement(ReadPawl); + return nullptr; + } + + readSlot = AtomicGetAndIncrement(ReadFront); + + auto msg = AtomicSwap(&RingBuffer[readSlot % RingSize], nullptr); + if (msg != nullptr) { + return msg; + } + } + + /* got no message in the slot, let's try to rollback readfront */ + AtomicCas(&ReadFront, readSlot - 1, readSlot); + return nullptr; + } + + void* TMPMCUnorderedRing::UnsafeScanningPop(ui64* last) noexcept { + for (; *last < RingSize;) { + auto msg = AtomicSwap(&RingBuffer[*last], nullptr); + ++*last; + if (msg != nullptr) { + return msg; + } + } + return nullptr; + } +} diff --git a/library/cpp/threading/queue/mpmc_unordered_ring.h b/library/cpp/threading/queue/mpmc_unordered_ring.h index 5042f7528e..59758d2c35 100644 --- a/library/cpp/threading/queue/mpmc_unordered_ring.h +++ b/library/cpp/threading/queue/mpmc_unordered_ring.h @@ -1,42 +1,42 @@ -#pragma once - -/* - It's not a general purpose queue. - No order guarantee, but it mostly ordered. - Items may stuck in almost empty queue. - Use UnsafeScanningPop to pop all stuck items. - Almost wait-free for producers and consumers. - */ - -#include <util/system/atomic.h> -#include <util/generic/ptr.h> - -namespace NThreading { - struct TMPMCUnorderedRing { - public: - static constexpr ui16 MAX_PUSH_TRIES = 4; - static constexpr ui16 MAX_POP_TRIES = 4; - - TMPMCUnorderedRing(size_t size); - - bool Push(void* msg, ui16 retryCount = MAX_PUSH_TRIES) noexcept; - void StubbornPush(void* msg) { - while (!WeakPush(msg)) { - } - } - - void* Pop() noexcept; - - void* UnsafeScanningPop(ui64* last) noexcept; - - private: - bool WeakPush(void* msg) noexcept; - - size_t RingSize; - TArrayPtr<void*> RingBuffer; - ui64 WritePawl = 0; - ui64 WriteFront = 0; - ui64 ReadPawl = 0; - ui64 ReadFront = 0; - }; -} +#pragma once + +/* + It's not a general purpose queue. + No order guarantee, but it mostly ordered. + Items may stuck in almost empty queue. + Use UnsafeScanningPop to pop all stuck items. + Almost wait-free for producers and consumers. + */ + +#include <util/system/atomic.h> +#include <util/generic/ptr.h> + +namespace NThreading { + struct TMPMCUnorderedRing { + public: + static constexpr ui16 MAX_PUSH_TRIES = 4; + static constexpr ui16 MAX_POP_TRIES = 4; + + TMPMCUnorderedRing(size_t size); + + bool Push(void* msg, ui16 retryCount = MAX_PUSH_TRIES) noexcept; + void StubbornPush(void* msg) { + while (!WeakPush(msg)) { + } + } + + void* Pop() noexcept; + + void* UnsafeScanningPop(ui64* last) noexcept; + + private: + bool WeakPush(void* msg) noexcept; + + size_t RingSize; + TArrayPtr<void*> RingBuffer; + ui64 WritePawl = 0; + ui64 WriteFront = 0; + ui64 ReadPawl = 0; + ui64 ReadFront = 0; + }; +} diff --git a/library/cpp/threading/queue/mpsc_htswap.cpp b/library/cpp/threading/queue/mpsc_htswap.cpp index 610c8f67f1..d8ab0d4f48 100644 --- a/library/cpp/threading/queue/mpsc_htswap.cpp +++ b/library/cpp/threading/queue/mpsc_htswap.cpp @@ -1 +1 @@ -#include "mpsc_htswap.h" +#include "mpsc_htswap.h" diff --git a/library/cpp/threading/queue/mpsc_htswap.h b/library/cpp/threading/queue/mpsc_htswap.h index c42caa7ac0..2d0bfd1146 100644 --- a/library/cpp/threading/queue/mpsc_htswap.h +++ b/library/cpp/threading/queue/mpsc_htswap.h @@ -1,132 +1,132 @@ -#pragma once - -/* - http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue - - Simple semi-wait-free queue. Many producers - one consumer. - Tracking of allocated memory is not required. - No CAS. Only atomic swap (exchange) operations. - - WARNING: a sleeping producer can stop progress for consumer. - - WARNING: there is no wait¬ify mechanic for consumer, - consumer receives nullptr if queue was empty. - - WARNING: the algorithm itself is lock-free - but producers and consumer could be blocked by memory allocator - - Reference design: rtmapreduce/libs/threading/lfqueue.h - */ - -#include <util/generic/noncopyable.h> -#include <util/system/types.h> -#include <util/system/atomic.h> - -#include "tune.h" - -namespace NThreading { - namespace NHTSwapPrivate { - template <typename T, typename TTuneup> - struct TNode +#pragma once + +/* + http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue + + Simple semi-wait-free queue. Many producers - one consumer. + Tracking of allocated memory is not required. + No CAS. Only atomic swap (exchange) operations. + + WARNING: a sleeping producer can stop progress for consumer. + + WARNING: there is no wait¬ify mechanic for consumer, + consumer receives nullptr if queue was empty. + + WARNING: the algorithm itself is lock-free + but producers and consumer could be blocked by memory allocator + + Reference design: rtmapreduce/libs/threading/lfqueue.h + */ + +#include <util/generic/noncopyable.h> +#include <util/system/types.h> +#include <util/system/atomic.h> + +#include "tune.h" + +namespace NThreading { + namespace NHTSwapPrivate { + template <typename T, typename TTuneup> + struct TNode : public TTuneup::TNodeBase, public TTuneup::template TNodeLayout<TNode<T, TTuneup>, T> { - TNode(const T& item) { - this->Next = nullptr; - this->Item = item; - } - - TNode(T&& item) { - this->Next = nullptr; - this->Item = std::move(item); - } - }; - - struct TDefaultTuneup { - struct TNodeBase: private TNonCopyable { - }; - - template <typename TNode, typename T> - struct TNodeLayout { - TNode* Next; - T Item; - }; - - template <typename TNode> - struct TQueueLayout { - TNode* Head; - TNode* Tail; - }; - }; - - template <typename T, typename TTuneup> - class THTSwapQueueImpl + TNode(const T& item) { + this->Next = nullptr; + this->Item = item; + } + + TNode(T&& item) { + this->Next = nullptr; + this->Item = std::move(item); + } + }; + + struct TDefaultTuneup { + struct TNodeBase: private TNonCopyable { + }; + + template <typename TNode, typename T> + struct TNodeLayout { + TNode* Next; + T Item; + }; + + template <typename TNode> + struct TQueueLayout { + TNode* Head; + TNode* Tail; + }; + }; + + template <typename T, typename TTuneup> + class THTSwapQueueImpl : protected TTuneup::template TQueueLayout<TNode<T, TTuneup>> { - protected: - using TTunedNode = TNode<T, TTuneup>; - - public: - using TItem = T; - - THTSwapQueueImpl() { - this->Head = new TTunedNode(T()); - this->Tail = this->Head; - } - - ~THTSwapQueueImpl() { - TTunedNode* node = this->Head; - while (node != nullptr) { - TTunedNode* next = node->Next; - delete node; - node = next; - } - } - - template <typename TT> - void Push(TT&& item) { - Enqueue(new TTunedNode(std::forward<TT>(item))); - } - - T Peek() { - TTunedNode* next = AtomicGet(this->Head->Next); - if (next == nullptr) { - return T(); - } - return next->Item; - } - - void Enqueue(TTunedNode* node) { - // our goal is to avoid expensive CAS here, - // but now consumer will be blocked until new tail linked. - // fortunately 'window of inconsistency' is extremely small. - TTunedNode* prev = AtomicSwap(&this->Tail, node); - AtomicSet(prev->Next, node); - } - - T Pop() { - TTunedNode* next = AtomicGet(this->Head->Next); - if (next == nullptr) { - return nullptr; - } - auto item = std::move(next->Item); - std::swap(this->Head, next); // no need atomic here - delete next; - return item; - } - - bool IsEmpty() const { - TTunedNode* next = AtomicGet(this->Head->Next); - return (next == nullptr); - } - }; - } - - DeclareTuneTypeParam(THTSwapNodeBase, TNodeBase); - DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout); - DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout); - + protected: + using TTunedNode = TNode<T, TTuneup>; + + public: + using TItem = T; + + THTSwapQueueImpl() { + this->Head = new TTunedNode(T()); + this->Tail = this->Head; + } + + ~THTSwapQueueImpl() { + TTunedNode* node = this->Head; + while (node != nullptr) { + TTunedNode* next = node->Next; + delete node; + node = next; + } + } + + template <typename TT> + void Push(TT&& item) { + Enqueue(new TTunedNode(std::forward<TT>(item))); + } + + T Peek() { + TTunedNode* next = AtomicGet(this->Head->Next); + if (next == nullptr) { + return T(); + } + return next->Item; + } + + void Enqueue(TTunedNode* node) { + // our goal is to avoid expensive CAS here, + // but now consumer will be blocked until new tail linked. + // fortunately 'window of inconsistency' is extremely small. + TTunedNode* prev = AtomicSwap(&this->Tail, node); + AtomicSet(prev->Next, node); + } + + T Pop() { + TTunedNode* next = AtomicGet(this->Head->Next); + if (next == nullptr) { + return nullptr; + } + auto item = std::move(next->Item); + std::swap(this->Head, next); // no need atomic here + delete next; + return item; + } + + bool IsEmpty() const { + TTunedNode* next = AtomicGet(this->Head->Next); + return (next == nullptr); + } + }; + } + + DeclareTuneTypeParam(THTSwapNodeBase, TNodeBase); + DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout); + DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout); + template <typename T = void*, typename... TParams> - class THTSwapQueue + class THTSwapQueue : public NHTSwapPrivate::THTSwapQueueImpl<T, TTune<NHTSwapPrivate::TDefaultTuneup, TParams...>> { - }; -} + }; +} diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp index 3bb1a04f7e..a6a2fcef39 100644 --- a/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp +++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp @@ -1,79 +1,79 @@ -#include "mpsc_intrusive_unordered.h" -#include <util/system/atomic.h> - -namespace NThreading { - void TMPSCIntrusiveUnordered::Push(TIntrusiveNode* node) noexcept { - auto head = AtomicGet(HeadForCaS); - for (ui32 i = NUMBER_OF_TRIES_FOR_CAS; i-- > 0;) { - // no ABA here, because Next is exactly head - // it does not matter how many travels head was made/ - node->Next = head; - auto prev = AtomicGetAndCas(&HeadForCaS, node, head); - if (head == prev) { - return; - } - head = prev; - } - // boring of trying to do cas, let's just swap - - // no need for atomic here, because the next is atomic swap - node->Next = 0; - - head = AtomicSwap(&HeadForSwap, node); - if (head != nullptr) { - AtomicSet(node->Next, head); - } else { - // consumer must know if no other thread may access the memory, - // setting Next to node is a way to notify consumer - AtomicSet(node->Next, node); - } - } - - TIntrusiveNode* TMPSCIntrusiveUnordered::PopMany() noexcept { - if (NotReadyChain == nullptr) { - auto head = AtomicSwap(&HeadForSwap, nullptr); - NotReadyChain = head; - } - - if (NotReadyChain != nullptr) { - auto next = AtomicGet(NotReadyChain->Next); - if (next != nullptr) { - auto ready = NotReadyChain; - TIntrusiveNode* cut; - do { - cut = NotReadyChain; - NotReadyChain = next; - next = AtomicGet(NotReadyChain->Next); - if (next == NotReadyChain) { - cut = NotReadyChain; - NotReadyChain = nullptr; - break; - } - } while (next != nullptr); - cut->Next = nullptr; - return ready; - } - } - - if (AtomicGet(HeadForCaS) != nullptr) { - return AtomicSwap(&HeadForCaS, nullptr); - } - return nullptr; - } - - TIntrusiveNode* TMPSCIntrusiveUnordered::Pop() noexcept { - if (PopOneQueue != nullptr) { - auto head = PopOneQueue; - PopOneQueue = PopOneQueue->Next; - return head; - } - - PopOneQueue = PopMany(); - if (PopOneQueue != nullptr) { - auto head = PopOneQueue; - PopOneQueue = PopOneQueue->Next; - return head; - } - return nullptr; - } -} +#include "mpsc_intrusive_unordered.h" +#include <util/system/atomic.h> + +namespace NThreading { + void TMPSCIntrusiveUnordered::Push(TIntrusiveNode* node) noexcept { + auto head = AtomicGet(HeadForCaS); + for (ui32 i = NUMBER_OF_TRIES_FOR_CAS; i-- > 0;) { + // no ABA here, because Next is exactly head + // it does not matter how many travels head was made/ + node->Next = head; + auto prev = AtomicGetAndCas(&HeadForCaS, node, head); + if (head == prev) { + return; + } + head = prev; + } + // boring of trying to do cas, let's just swap + + // no need for atomic here, because the next is atomic swap + node->Next = 0; + + head = AtomicSwap(&HeadForSwap, node); + if (head != nullptr) { + AtomicSet(node->Next, head); + } else { + // consumer must know if no other thread may access the memory, + // setting Next to node is a way to notify consumer + AtomicSet(node->Next, node); + } + } + + TIntrusiveNode* TMPSCIntrusiveUnordered::PopMany() noexcept { + if (NotReadyChain == nullptr) { + auto head = AtomicSwap(&HeadForSwap, nullptr); + NotReadyChain = head; + } + + if (NotReadyChain != nullptr) { + auto next = AtomicGet(NotReadyChain->Next); + if (next != nullptr) { + auto ready = NotReadyChain; + TIntrusiveNode* cut; + do { + cut = NotReadyChain; + NotReadyChain = next; + next = AtomicGet(NotReadyChain->Next); + if (next == NotReadyChain) { + cut = NotReadyChain; + NotReadyChain = nullptr; + break; + } + } while (next != nullptr); + cut->Next = nullptr; + return ready; + } + } + + if (AtomicGet(HeadForCaS) != nullptr) { + return AtomicSwap(&HeadForCaS, nullptr); + } + return nullptr; + } + + TIntrusiveNode* TMPSCIntrusiveUnordered::Pop() noexcept { + if (PopOneQueue != nullptr) { + auto head = PopOneQueue; + PopOneQueue = PopOneQueue->Next; + return head; + } + + PopOneQueue = PopMany(); + if (PopOneQueue != nullptr) { + auto head = PopOneQueue; + PopOneQueue = PopOneQueue->Next; + return head; + } + return nullptr; + } +} diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.h b/library/cpp/threading/queue/mpsc_intrusive_unordered.h index 6ac7537ae9..c07cf761f6 100644 --- a/library/cpp/threading/queue/mpsc_intrusive_unordered.h +++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.h @@ -1,35 +1,35 @@ -#pragma once - -/* - Simple almost-wait-free unordered queue for low contention operations. - - It's wait-free for producers. - Hanging producer can hide some items from consumer. - */ - -#include <util/system/types.h> - -namespace NThreading { - struct TIntrusiveNode { - TIntrusiveNode* Next; - }; - - class TMPSCIntrusiveUnordered { - public: - static constexpr ui32 NUMBER_OF_TRIES_FOR_CAS = 3; - - void Push(TIntrusiveNode* node) noexcept; - TIntrusiveNode* PopMany() noexcept; - TIntrusiveNode* Pop() noexcept; - - void Push(void* node) noexcept { - Push(reinterpret_cast<TIntrusiveNode*>(node)); - } - - private: - TIntrusiveNode* HeadForCaS = nullptr; - TIntrusiveNode* HeadForSwap = nullptr; - TIntrusiveNode* NotReadyChain = nullptr; - TIntrusiveNode* PopOneQueue = nullptr; - }; -} +#pragma once + +/* + Simple almost-wait-free unordered queue for low contention operations. + + It's wait-free for producers. + Hanging producer can hide some items from consumer. + */ + +#include <util/system/types.h> + +namespace NThreading { + struct TIntrusiveNode { + TIntrusiveNode* Next; + }; + + class TMPSCIntrusiveUnordered { + public: + static constexpr ui32 NUMBER_OF_TRIES_FOR_CAS = 3; + + void Push(TIntrusiveNode* node) noexcept; + TIntrusiveNode* PopMany() noexcept; + TIntrusiveNode* Pop() noexcept; + + void Push(void* node) noexcept { + Push(reinterpret_cast<TIntrusiveNode*>(node)); + } + + private: + TIntrusiveNode* HeadForCaS = nullptr; + TIntrusiveNode* HeadForSwap = nullptr; + TIntrusiveNode* NotReadyChain = nullptr; + TIntrusiveNode* PopOneQueue = nullptr; + }; +} diff --git a/library/cpp/threading/queue/mpsc_read_as_filled.cpp b/library/cpp/threading/queue/mpsc_read_as_filled.cpp index 8b4664a6f3..3b89fb1df6 100644 --- a/library/cpp/threading/queue/mpsc_read_as_filled.cpp +++ b/library/cpp/threading/queue/mpsc_read_as_filled.cpp @@ -1 +1 @@ -#include "mpsc_read_as_filled.h" +#include "mpsc_read_as_filled.h" diff --git a/library/cpp/threading/queue/mpsc_read_as_filled.h b/library/cpp/threading/queue/mpsc_read_as_filled.h index be33ba5a58..4dfdb1fbbf 100644 --- a/library/cpp/threading/queue/mpsc_read_as_filled.h +++ b/library/cpp/threading/queue/mpsc_read_as_filled.h @@ -1,611 +1,611 @@ -#pragma once - -/* - Completely wait-free queue, multiple producers - one consumer. Strict order. - The queue algorithm is using concept of virtual infinite array. - +#pragma once + +/* + Completely wait-free queue, multiple producers - one consumer. Strict order. + The queue algorithm is using concept of virtual infinite array. + A producer takes a number from a counter and atomically increments the counter. - The number taken is a number of a slot for the producer to put a new message - into infinite array. - - Then producer constructs a virtual infinite array by bidirectional linked list - of blocks. Each block contains several slots. - + The number taken is a number of a slot for the producer to put a new message + into infinite array. + + Then producer constructs a virtual infinite array by bidirectional linked list + of blocks. Each block contains several slots. + There is a hint pointer which optimistically points to the last block - of the list and never goes backward. - - Consumer exploits the property of the hint pointer always going forward - to free old blocks eventually. Consumer periodically read the hint pointer - and the counter and thus deduce producers which potentially holds the pointer - to a block. Consumer can free the block if all that producers filled their - slots and left the queue. - - No producer can stop the progress for other producers. - - Consumer can't stop the progress for producers. - Consumer can skip not-yet-filled slots and read them later. - Thus no producer can stop the progress for consumer. + of the list and never goes backward. + + Consumer exploits the property of the hint pointer always going forward + to free old blocks eventually. Consumer periodically read the hint pointer + and the counter and thus deduce producers which potentially holds the pointer + to a block. Consumer can free the block if all that producers filled their + slots and left the queue. + + No producer can stop the progress for other producers. + + Consumer can't stop the progress for producers. + Consumer can skip not-yet-filled slots and read them later. + Thus no producer can stop the progress for consumer. The algorithm is virtually strictly ordered because it skips slots only - if it is really does not matter in which order the slots were produced and - consumed. - - WARNING: there is no wait¬ify mechanic for consumer, - consumer receives nullptr if queue was empty. - - WARNING: though the algorithm itself is completely wait-free - but producers and consumer could be blocked by memory allocator - + if it is really does not matter in which order the slots were produced and + consumed. + + WARNING: there is no wait¬ify mechanic for consumer, + consumer receives nullptr if queue was empty. + + WARNING: though the algorithm itself is completely wait-free + but producers and consumer could be blocked by memory allocator + WARNING: copy constructors of the queue are not thread-safe - */ - -#include <util/generic/deque.h> -#include <util/generic/ptr.h> -#include <util/system/atomic.h> -#include <util/system/spinlock.h> - -#include "tune.h" - -namespace NThreading { - namespace NReadAsFilledPrivate { - typedef void* TMsgLink; - - static constexpr ui32 DEFAULT_BUNCH_SIZE = 251; - - struct TEmpty { - }; - - struct TEmptyAux { - TEmptyAux Retrieve() const { - return TEmptyAux(); - } - - void Store(TEmptyAux&) { - } - - static constexpr TEmptyAux Zero() { - return TEmptyAux(); - } - }; - - template <typename TAux> - struct TSlot { - TMsgLink volatile Msg; - TAux AuxiliaryData; - - inline void Store(TAux& aux) { - AuxiliaryData.Store(aux); - } - - inline TAux Retrieve() const { - return AuxiliaryData.Retrieve(); - } - - static TSlot<TAux> NullElem() { - return {nullptr, TAux::Zero()}; - } - - static TSlot<TAux> Pair(TMsgLink msg, TAux aux) { - return {msg, std::move(aux)}; - } - }; - - template <> - struct TSlot<TEmptyAux> { - TMsgLink volatile Msg; - - inline void Store(TEmptyAux&) { - } - - inline TEmptyAux Retrieve() const { - return TEmptyAux(); - } - - static TSlot<TEmptyAux> NullElem() { - return {nullptr}; - } - - static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) { - return {msg}; - } - }; - - enum TPushResult { - PUSH_RESULT_OK, - PUSH_RESULT_BACKWARD, - PUSH_RESULT_FORWARD, - }; - - template <ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE, - typename TBase = TEmpty, - typename TAux = TEmptyAux> - struct TMsgBunch: public TBase { - static constexpr size_t RELEASE_SIZE = BUNCH_SIZE * 2; - - ui64 FirstSlot; - - TSlot<TAux> LinkArray[BUNCH_SIZE]; - - TMsgBunch* volatile NextBunch; - TMsgBunch* volatile BackLink; - - ui64 volatile Token; - TMsgBunch* volatile NextToken; - - /* this push can return PUSH_RESULT_BLOCKED */ + */ + +#include <util/generic/deque.h> +#include <util/generic/ptr.h> +#include <util/system/atomic.h> +#include <util/system/spinlock.h> + +#include "tune.h" + +namespace NThreading { + namespace NReadAsFilledPrivate { + typedef void* TMsgLink; + + static constexpr ui32 DEFAULT_BUNCH_SIZE = 251; + + struct TEmpty { + }; + + struct TEmptyAux { + TEmptyAux Retrieve() const { + return TEmptyAux(); + } + + void Store(TEmptyAux&) { + } + + static constexpr TEmptyAux Zero() { + return TEmptyAux(); + } + }; + + template <typename TAux> + struct TSlot { + TMsgLink volatile Msg; + TAux AuxiliaryData; + + inline void Store(TAux& aux) { + AuxiliaryData.Store(aux); + } + + inline TAux Retrieve() const { + return AuxiliaryData.Retrieve(); + } + + static TSlot<TAux> NullElem() { + return {nullptr, TAux::Zero()}; + } + + static TSlot<TAux> Pair(TMsgLink msg, TAux aux) { + return {msg, std::move(aux)}; + } + }; + + template <> + struct TSlot<TEmptyAux> { + TMsgLink volatile Msg; + + inline void Store(TEmptyAux&) { + } + + inline TEmptyAux Retrieve() const { + return TEmptyAux(); + } + + static TSlot<TEmptyAux> NullElem() { + return {nullptr}; + } + + static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) { + return {msg}; + } + }; + + enum TPushResult { + PUSH_RESULT_OK, + PUSH_RESULT_BACKWARD, + PUSH_RESULT_FORWARD, + }; + + template <ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE, + typename TBase = TEmpty, + typename TAux = TEmptyAux> + struct TMsgBunch: public TBase { + static constexpr size_t RELEASE_SIZE = BUNCH_SIZE * 2; + + ui64 FirstSlot; + + TSlot<TAux> LinkArray[BUNCH_SIZE]; + + TMsgBunch* volatile NextBunch; + TMsgBunch* volatile BackLink; + + ui64 volatile Token; + TMsgBunch* volatile NextToken; + + /* this push can return PUSH_RESULT_BLOCKED */ inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) { - if (Y_UNLIKELY(slot < FirstSlot)) { - return PUSH_RESULT_BACKWARD; - } - - if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) { - return PUSH_RESULT_FORWARD; - } - - LinkArray[slot - FirstSlot].Store(auxiliary); - - AtomicSet(LinkArray[slot - FirstSlot].Msg, msg); - return PUSH_RESULT_OK; - } - - inline bool IsSlotHere(ui64 slot) { - return slot < FirstSlot + BUNCH_SIZE; - } - - inline TMsgLink GetSlot(ui64 slot) const { - return AtomicGet(LinkArray[slot - FirstSlot].Msg); - } - - inline TSlot<TAux> GetSlotAux(ui64 slot) const { - auto msg = GetSlot(slot); - auto aux = LinkArray[slot - FirstSlot].Retrieve(); - return TSlot<TAux>::Pair(msg, aux); - } - - inline TMsgBunch* GetNextBunch() const { - return AtomicGet(NextBunch); - } - - inline bool SetNextBunch(TMsgBunch* ptr) { - return AtomicCas(&NextBunch, ptr, nullptr); - } - - inline TMsgBunch* GetBackLink() const { - return AtomicGet(BackLink); - } - - inline TMsgBunch* GetToken(ui64 slot) { - return reinterpret_cast<TMsgBunch*>( - LinkArray[slot - FirstSlot].Msg); - } - - inline void IncrementToken() { - AtomicIncrement(Token); - } - - // the object could be destroyed after this method - inline void DecrementToken() { - if (Y_UNLIKELY(AtomicDecrement(Token) == RELEASE_SIZE)) { - Release(this); - AtomicGet(NextToken)->DecrementToken(); - // this could be invalid here - } - } - - // the object could be destroyed after this method - inline void SetNextToken(TMsgBunch* next) { - AtomicSet(NextToken, next); + if (Y_UNLIKELY(slot < FirstSlot)) { + return PUSH_RESULT_BACKWARD; + } + + if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) { + return PUSH_RESULT_FORWARD; + } + + LinkArray[slot - FirstSlot].Store(auxiliary); + + AtomicSet(LinkArray[slot - FirstSlot].Msg, msg); + return PUSH_RESULT_OK; + } + + inline bool IsSlotHere(ui64 slot) { + return slot < FirstSlot + BUNCH_SIZE; + } + + inline TMsgLink GetSlot(ui64 slot) const { + return AtomicGet(LinkArray[slot - FirstSlot].Msg); + } + + inline TSlot<TAux> GetSlotAux(ui64 slot) const { + auto msg = GetSlot(slot); + auto aux = LinkArray[slot - FirstSlot].Retrieve(); + return TSlot<TAux>::Pair(msg, aux); + } + + inline TMsgBunch* GetNextBunch() const { + return AtomicGet(NextBunch); + } + + inline bool SetNextBunch(TMsgBunch* ptr) { + return AtomicCas(&NextBunch, ptr, nullptr); + } + + inline TMsgBunch* GetBackLink() const { + return AtomicGet(BackLink); + } + + inline TMsgBunch* GetToken(ui64 slot) { + return reinterpret_cast<TMsgBunch*>( + LinkArray[slot - FirstSlot].Msg); + } + + inline void IncrementToken() { + AtomicIncrement(Token); + } + + // the object could be destroyed after this method + inline void DecrementToken() { + if (Y_UNLIKELY(AtomicDecrement(Token) == RELEASE_SIZE)) { + Release(this); + AtomicGet(NextToken)->DecrementToken(); + // this could be invalid here + } + } + + // the object could be destroyed after this method + inline void SetNextToken(TMsgBunch* next) { + AtomicSet(NextToken, next); if (Y_UNLIKELY(AtomicAdd(Token, RELEASE_SIZE) == RELEASE_SIZE)) { - Release(this); - next->DecrementToken(); - } - // this could be invalid here - } - - TMsgBunch(ui64 start, TMsgBunch* backLink) { - AtomicSet(FirstSlot, start); - memset(&LinkArray, 0, sizeof(LinkArray)); - AtomicSet(NextBunch, nullptr); - AtomicSet(BackLink, backLink); - - AtomicSet(Token, 1); - AtomicSet(NextToken, nullptr); - } - - static void Release(TMsgBunch* block) { - auto backLink = AtomicGet(block->BackLink); - if (backLink == nullptr) { - return; - } - AtomicSet(block->BackLink, nullptr); - - do { - auto bbackLink = backLink->BackLink; - delete backLink; - backLink = bbackLink; - } while (backLink != nullptr); - } - - void Destroy() { - for (auto tail = BackLink; tail != nullptr;) { - auto next = tail->BackLink; - delete tail; - tail = next; - } - - for (auto next = this; next != nullptr;) { - auto nnext = next->NextBunch; - delete next; - next = nnext; - } - } - }; - - template <ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE, - typename TBunchBase = NReadAsFilledPrivate::TEmpty, - typename TAux = TEmptyAux> - class TWriteBucket { - public: - using TUsingAux = TAux; // for TReadBucket binding - using TBunch = TMsgBunch<BUNCH_SIZE, TBunchBase, TAux>; - - TWriteBucket(TBunch* bunch = new TBunch(0, nullptr)) { - AtomicSet(LastBunch, bunch); - AtomicSet(SlotCounter, 0); - } - - TWriteBucket(TWriteBucket&& move) - : LastBunch(move.LastBunch) - , SlotCounter(move.SlotCounter) - { - move.LastBunch = nullptr; - } - - ~TWriteBucket() { - if (LastBunch != nullptr) { - LastBunch->Destroy(); - } - } - - inline void Push(TMsgLink msg, TAux aux) { - ui64 pushSlot = AtomicGetAndIncrement(SlotCounter); - TBunch* hintBunch = GetLastBunch(); - - for (;;) { - auto hint = hintBunch->Push(msg, pushSlot, aux); - if (Y_LIKELY(hint == PUSH_RESULT_OK)) { - return; - } - HandleHint(hintBunch, hint); - } - } - - protected: - template <typename, template <typename, typename...> class> - friend class TReadBucket; - - TBunch* volatile LastBunch; // Hint - volatile ui64 SlotCounter; - - inline TBunch* GetLastBunch() const { - return AtomicGet(LastBunch); - } - - void HandleHint(TBunch*& hintBunch, TPushResult hint) { - if (Y_UNLIKELY(hint == PUSH_RESULT_BACKWARD)) { - hintBunch = hintBunch->GetBackLink(); - return; - } - - // PUSH_RESULT_FORWARD - auto nextBunch = hintBunch->GetNextBunch(); - - if (nextBunch == nullptr) { - auto first = hintBunch->FirstSlot + BUNCH_SIZE; - nextBunch = new TBunch(first, hintBunch); - if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) { - delete nextBunch; - nextBunch = hintBunch->GetNextBunch(); - } - } - - // hintBunch could not be freed here so it cannot be reused - // it's alright if this CAS was not succeeded, - // it means that other thread did that recently - AtomicCas(&LastBunch, nextBunch, hintBunch); - - hintBunch = nextBunch; - } - }; - + Release(this); + next->DecrementToken(); + } + // this could be invalid here + } + + TMsgBunch(ui64 start, TMsgBunch* backLink) { + AtomicSet(FirstSlot, start); + memset(&LinkArray, 0, sizeof(LinkArray)); + AtomicSet(NextBunch, nullptr); + AtomicSet(BackLink, backLink); + + AtomicSet(Token, 1); + AtomicSet(NextToken, nullptr); + } + + static void Release(TMsgBunch* block) { + auto backLink = AtomicGet(block->BackLink); + if (backLink == nullptr) { + return; + } + AtomicSet(block->BackLink, nullptr); + + do { + auto bbackLink = backLink->BackLink; + delete backLink; + backLink = bbackLink; + } while (backLink != nullptr); + } + + void Destroy() { + for (auto tail = BackLink; tail != nullptr;) { + auto next = tail->BackLink; + delete tail; + tail = next; + } + + for (auto next = this; next != nullptr;) { + auto nnext = next->NextBunch; + delete next; + next = nnext; + } + } + }; + + template <ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE, + typename TBunchBase = NReadAsFilledPrivate::TEmpty, + typename TAux = TEmptyAux> + class TWriteBucket { + public: + using TUsingAux = TAux; // for TReadBucket binding + using TBunch = TMsgBunch<BUNCH_SIZE, TBunchBase, TAux>; + + TWriteBucket(TBunch* bunch = new TBunch(0, nullptr)) { + AtomicSet(LastBunch, bunch); + AtomicSet(SlotCounter, 0); + } + + TWriteBucket(TWriteBucket&& move) + : LastBunch(move.LastBunch) + , SlotCounter(move.SlotCounter) + { + move.LastBunch = nullptr; + } + + ~TWriteBucket() { + if (LastBunch != nullptr) { + LastBunch->Destroy(); + } + } + + inline void Push(TMsgLink msg, TAux aux) { + ui64 pushSlot = AtomicGetAndIncrement(SlotCounter); + TBunch* hintBunch = GetLastBunch(); + + for (;;) { + auto hint = hintBunch->Push(msg, pushSlot, aux); + if (Y_LIKELY(hint == PUSH_RESULT_OK)) { + return; + } + HandleHint(hintBunch, hint); + } + } + + protected: + template <typename, template <typename, typename...> class> + friend class TReadBucket; + + TBunch* volatile LastBunch; // Hint + volatile ui64 SlotCounter; + + inline TBunch* GetLastBunch() const { + return AtomicGet(LastBunch); + } + + void HandleHint(TBunch*& hintBunch, TPushResult hint) { + if (Y_UNLIKELY(hint == PUSH_RESULT_BACKWARD)) { + hintBunch = hintBunch->GetBackLink(); + return; + } + + // PUSH_RESULT_FORWARD + auto nextBunch = hintBunch->GetNextBunch(); + + if (nextBunch == nullptr) { + auto first = hintBunch->FirstSlot + BUNCH_SIZE; + nextBunch = new TBunch(first, hintBunch); + if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) { + delete nextBunch; + nextBunch = hintBunch->GetNextBunch(); + } + } + + // hintBunch could not be freed here so it cannot be reused + // it's alright if this CAS was not succeeded, + // it means that other thread did that recently + AtomicCas(&LastBunch, nextBunch, hintBunch); + + hintBunch = nextBunch; + } + }; + template <typename TWBucket = TWriteBucket<>, template <typename, typename...> class TContainer = TDeque> - class TReadBucket { - public: - using TAux = typename TWBucket::TUsingAux; - using TBunch = typename TWBucket::TBunch; - - static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 5; - - TReadBucket(TWBucket* writer) - : Writer(writer) - , ReadBunch(writer->GetLastBunch()) - , LastKnownPushBunch(writer->GetLastBunch()) - { - ReadBunch->DecrementToken(); // no previous token - } - - TReadBucket(TReadBucket toCopy, TWBucket* writer) - : TReadBucket(std::move(toCopy)) - { - Writer = writer; - } - - ui64 ReadyCount() const { - return AtomicGet(Writer->SlotCounter) - ReadSlot; - } - - TMsgLink Pop() { - return PopAux().Msg; - } - - TMsgLink Peek() { - return PeekAux().Msg; - } - - TSlot<TAux> PopAux() { - for (;;) { - if (Y_UNLIKELY(ReadNow.size() != 0)) { - auto result = PopSkipped(); - if (Y_LIKELY(result.Msg != nullptr)) { - return result; - } - } - - if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) { - if (Y_LIKELY(!RereadPushSlot())) { - return TSlot<TAux>::NullElem(); - } - continue; - } - - if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) { - if (Y_UNLIKELY(!SwitchToNextBunch())) { - return TSlot<TAux>::NullElem(); - } - } - - auto result = ReadBunch->GetSlotAux(ReadSlot); - if (Y_LIKELY(result.Msg != nullptr)) { - ++ReadSlot; - return result; - } - - result = StubbornPop(); - if (Y_LIKELY(result.Msg != nullptr)) { - return result; - } - } - } - - TSlot<TAux> PeekAux() { - for (;;) { - if (Y_UNLIKELY(ReadNow.size() != 0)) { - auto result = PeekSkipped(); - if (Y_LIKELY(result.Msg != nullptr)) { - return result; - } - } - - if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) { - if (Y_LIKELY(!RereadPushSlot())) { - return TSlot<TAux>::NullElem(); - } - continue; - } - - if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) { - if (Y_UNLIKELY(!SwitchToNextBunch())) { - return TSlot<TAux>::NullElem(); - } - } - - auto result = ReadBunch->GetSlotAux(ReadSlot); - if (Y_LIKELY(result.Msg != nullptr)) { - return result; - } - - result = StubbornPeek(); - if (Y_LIKELY(result.Msg != nullptr)) { - return result; - } - } - } - - private: - TWBucket* Writer; - TBunch* ReadBunch; - ui64 ReadSlot = 0; - TBunch* LastKnownPushBunch; - ui64 LastKnownPushSlot = 0; - - struct TSkipItem { - TBunch* Bunch; - ui64 Slot; - TBunch* Token; - }; - - TContainer<TSkipItem> ReadNow; - TContainer<TSkipItem> ReadLater; - - void AddToReadLater() { - ReadLater.push_back({ReadBunch, ReadSlot, LastKnownPushBunch}); - LastKnownPushBunch->IncrementToken(); - ++ReadSlot; - } - - // MUST BE: ReadSlot == LastKnownPushSlot - bool RereadPushSlot() { - ReadNow = std::move(ReadLater); - ReadLater.clear(); - - auto oldSlot = LastKnownPushSlot; - - auto currentPushBunch = Writer->GetLastBunch(); - auto currentPushSlot = AtomicGet(Writer->SlotCounter); - - if (currentPushBunch != LastKnownPushBunch) { - // LastKnownPushBunch could be invalid after this line - LastKnownPushBunch->SetNextToken(currentPushBunch); - } - - LastKnownPushBunch = currentPushBunch; - LastKnownPushSlot = currentPushSlot; - - return oldSlot != LastKnownPushSlot; - } - - bool SwitchToNextBunch() { - for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { - auto next = ReadBunch->GetNextBunch(); - if (next != nullptr) { - ReadBunch = next; - return true; - } - SpinLockPause(); - } - return false; - } - - TSlot<TAux> StubbornPop() { - for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { - auto result = ReadBunch->GetSlotAux(ReadSlot); - if (Y_LIKELY(result.Msg != nullptr)) { - ++ReadSlot; - return result; - } - SpinLockPause(); - } - - AddToReadLater(); - return TSlot<TAux>::NullElem(); - } - - TSlot<TAux> StubbornPeek() { - for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { - auto result = ReadBunch->GetSlotAux(ReadSlot); - if (Y_LIKELY(result.Msg != nullptr)) { - return result; - } - SpinLockPause(); - } - - AddToReadLater(); - return TSlot<TAux>::NullElem(); - } - - TSlot<TAux> PopSkipped() { - do { - auto elem = ReadNow.front(); - ReadNow.pop_front(); - - auto result = elem.Bunch->GetSlotAux(elem.Slot); - if (Y_LIKELY(result.Msg != nullptr)) { - elem.Token->DecrementToken(); - return result; - } - - ReadLater.emplace_back(elem); - - } while (ReadNow.size() > 0); - - return TSlot<TAux>::NullElem(); - } - - TSlot<TAux> PeekSkipped() { - do { - auto elem = ReadNow.front(); - - auto result = elem.Bunch->GetSlotAux(elem.Slot); - if (Y_LIKELY(result.Msg != nullptr)) { - return result; - } - - ReadNow.pop_front(); - ReadLater.emplace_back(elem); - - } while (ReadNow.size() > 0); - - return TSlot<TAux>::NullElem(); - } - }; - - struct TDefaultParams { - static constexpr ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE; - using TBunchBase = TEmpty; - + class TReadBucket { + public: + using TAux = typename TWBucket::TUsingAux; + using TBunch = typename TWBucket::TBunch; + + static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 5; + + TReadBucket(TWBucket* writer) + : Writer(writer) + , ReadBunch(writer->GetLastBunch()) + , LastKnownPushBunch(writer->GetLastBunch()) + { + ReadBunch->DecrementToken(); // no previous token + } + + TReadBucket(TReadBucket toCopy, TWBucket* writer) + : TReadBucket(std::move(toCopy)) + { + Writer = writer; + } + + ui64 ReadyCount() const { + return AtomicGet(Writer->SlotCounter) - ReadSlot; + } + + TMsgLink Pop() { + return PopAux().Msg; + } + + TMsgLink Peek() { + return PeekAux().Msg; + } + + TSlot<TAux> PopAux() { + for (;;) { + if (Y_UNLIKELY(ReadNow.size() != 0)) { + auto result = PopSkipped(); + if (Y_LIKELY(result.Msg != nullptr)) { + return result; + } + } + + if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) { + if (Y_LIKELY(!RereadPushSlot())) { + return TSlot<TAux>::NullElem(); + } + continue; + } + + if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) { + if (Y_UNLIKELY(!SwitchToNextBunch())) { + return TSlot<TAux>::NullElem(); + } + } + + auto result = ReadBunch->GetSlotAux(ReadSlot); + if (Y_LIKELY(result.Msg != nullptr)) { + ++ReadSlot; + return result; + } + + result = StubbornPop(); + if (Y_LIKELY(result.Msg != nullptr)) { + return result; + } + } + } + + TSlot<TAux> PeekAux() { + for (;;) { + if (Y_UNLIKELY(ReadNow.size() != 0)) { + auto result = PeekSkipped(); + if (Y_LIKELY(result.Msg != nullptr)) { + return result; + } + } + + if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) { + if (Y_LIKELY(!RereadPushSlot())) { + return TSlot<TAux>::NullElem(); + } + continue; + } + + if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) { + if (Y_UNLIKELY(!SwitchToNextBunch())) { + return TSlot<TAux>::NullElem(); + } + } + + auto result = ReadBunch->GetSlotAux(ReadSlot); + if (Y_LIKELY(result.Msg != nullptr)) { + return result; + } + + result = StubbornPeek(); + if (Y_LIKELY(result.Msg != nullptr)) { + return result; + } + } + } + + private: + TWBucket* Writer; + TBunch* ReadBunch; + ui64 ReadSlot = 0; + TBunch* LastKnownPushBunch; + ui64 LastKnownPushSlot = 0; + + struct TSkipItem { + TBunch* Bunch; + ui64 Slot; + TBunch* Token; + }; + + TContainer<TSkipItem> ReadNow; + TContainer<TSkipItem> ReadLater; + + void AddToReadLater() { + ReadLater.push_back({ReadBunch, ReadSlot, LastKnownPushBunch}); + LastKnownPushBunch->IncrementToken(); + ++ReadSlot; + } + + // MUST BE: ReadSlot == LastKnownPushSlot + bool RereadPushSlot() { + ReadNow = std::move(ReadLater); + ReadLater.clear(); + + auto oldSlot = LastKnownPushSlot; + + auto currentPushBunch = Writer->GetLastBunch(); + auto currentPushSlot = AtomicGet(Writer->SlotCounter); + + if (currentPushBunch != LastKnownPushBunch) { + // LastKnownPushBunch could be invalid after this line + LastKnownPushBunch->SetNextToken(currentPushBunch); + } + + LastKnownPushBunch = currentPushBunch; + LastKnownPushSlot = currentPushSlot; + + return oldSlot != LastKnownPushSlot; + } + + bool SwitchToNextBunch() { + for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { + auto next = ReadBunch->GetNextBunch(); + if (next != nullptr) { + ReadBunch = next; + return true; + } + SpinLockPause(); + } + return false; + } + + TSlot<TAux> StubbornPop() { + for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { + auto result = ReadBunch->GetSlotAux(ReadSlot); + if (Y_LIKELY(result.Msg != nullptr)) { + ++ReadSlot; + return result; + } + SpinLockPause(); + } + + AddToReadLater(); + return TSlot<TAux>::NullElem(); + } + + TSlot<TAux> StubbornPeek() { + for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { + auto result = ReadBunch->GetSlotAux(ReadSlot); + if (Y_LIKELY(result.Msg != nullptr)) { + return result; + } + SpinLockPause(); + } + + AddToReadLater(); + return TSlot<TAux>::NullElem(); + } + + TSlot<TAux> PopSkipped() { + do { + auto elem = ReadNow.front(); + ReadNow.pop_front(); + + auto result = elem.Bunch->GetSlotAux(elem.Slot); + if (Y_LIKELY(result.Msg != nullptr)) { + elem.Token->DecrementToken(); + return result; + } + + ReadLater.emplace_back(elem); + + } while (ReadNow.size() > 0); + + return TSlot<TAux>::NullElem(); + } + + TSlot<TAux> PeekSkipped() { + do { + auto elem = ReadNow.front(); + + auto result = elem.Bunch->GetSlotAux(elem.Slot); + if (Y_LIKELY(result.Msg != nullptr)) { + return result; + } + + ReadNow.pop_front(); + ReadLater.emplace_back(elem); + + } while (ReadNow.size() > 0); + + return TSlot<TAux>::NullElem(); + } + }; + + struct TDefaultParams { + static constexpr ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE; + using TBunchBase = TEmpty; + template <typename TElem, typename... TRest> using TContainer = TDeque<TElem, TRest...>; - - static constexpr bool DeleteItems = true; - }; - - } //namespace NReadAsFilledPrivate - - DeclareTuneValueParam(TRaFQueueBunchSize, ui32, BUNCH_SIZE); - DeclareTuneTypeParam(TRaFQueueBunchBase, TBunchBase); - DeclareTuneContainer(TRaFQueueSkipContainer, TContainer); - DeclareTuneValueParam(TRaFQueueDeleteItems, bool, DeleteItems); - + + static constexpr bool DeleteItems = true; + }; + + } //namespace NReadAsFilledPrivate + + DeclareTuneValueParam(TRaFQueueBunchSize, ui32, BUNCH_SIZE); + DeclareTuneTypeParam(TRaFQueueBunchBase, TBunchBase); + DeclareTuneContainer(TRaFQueueSkipContainer, TContainer); + DeclareTuneValueParam(TRaFQueueDeleteItems, bool, DeleteItems); + template <typename TItem = void, typename... TParams> - class TReadAsFilledQueue { - private: - using TTuned = TTune<NReadAsFilledPrivate::TDefaultParams, TParams...>; - - static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE; - - using TBunchBase = typename TTuned::TBunchBase; - + class TReadAsFilledQueue { + private: + using TTuned = TTune<NReadAsFilledPrivate::TDefaultParams, TParams...>; + + static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE; + + using TBunchBase = typename TTuned::TBunchBase; + template <typename TElem, typename... TRest> - using TContainer = - typename TTuned::template TContainer<TElem, TRest...>; - - using TWriteBucket = - NReadAsFilledPrivate::TWriteBucket<BUNCH_SIZE, TBunchBase>; - using TReadBucket = - NReadAsFilledPrivate::TReadBucket<TWriteBucket, TContainer>; - - public: - TReadAsFilledQueue() - : RBucket(&WBucket) - { - } - - ~TReadAsFilledQueue() { - if (TTuned::DeleteItems) { - for (;;) { - auto msg = Pop(); - if (msg == nullptr) { - break; - } - TDelete::Destroy(msg); - } - } - } - - void Push(TItem* msg) { - WBucket.Push((void*)msg, NReadAsFilledPrivate::TEmptyAux()); - } - - TItem* Pop() { - return (TItem*)RBucket.Pop(); - } - - TItem* Peek() { - return (TItem*)RBucket.Peek(); - } - - protected: - TWriteBucket WBucket; - TReadBucket RBucket; - }; -} + using TContainer = + typename TTuned::template TContainer<TElem, TRest...>; + + using TWriteBucket = + NReadAsFilledPrivate::TWriteBucket<BUNCH_SIZE, TBunchBase>; + using TReadBucket = + NReadAsFilledPrivate::TReadBucket<TWriteBucket, TContainer>; + + public: + TReadAsFilledQueue() + : RBucket(&WBucket) + { + } + + ~TReadAsFilledQueue() { + if (TTuned::DeleteItems) { + for (;;) { + auto msg = Pop(); + if (msg == nullptr) { + break; + } + TDelete::Destroy(msg); + } + } + } + + void Push(TItem* msg) { + WBucket.Push((void*)msg, NReadAsFilledPrivate::TEmptyAux()); + } + + TItem* Pop() { + return (TItem*)RBucket.Pop(); + } + + TItem* Peek() { + return (TItem*)RBucket.Peek(); + } + + protected: + TWriteBucket WBucket; + TReadBucket RBucket; + }; +} diff --git a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp index 2bd0c29821..00dbfeaa64 100644 --- a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp +++ b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp @@ -1 +1 @@ -#include "mpsc_vinfarr_obstructive.h" +#include "mpsc_vinfarr_obstructive.h" diff --git a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h index 5f91f1b5a8..3e1ae92342 100644 --- a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h +++ b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h @@ -1,528 +1,528 @@ -#pragma once - -/* - Semi-wait-free queue, multiple producers - one consumer. Strict order. - The queue algorithm is using concept of virtual infinite array. - - A producer takes a number from a counter and atomicaly increments the counter. - The number taken is a number of a slot for the producer to put a new message - into infinite array. - - Then producer constructs a virtual infinite array by bidirectional linked list - of blocks. Each block contains several slots. - - There is a hint pointer which optimisticly points to the last block - of the list and never goes backward. - - Consumer exploits the property of the hint pointer always going forward - to free old blocks eventually. Consumer periodically read the hint pointer - and the counter and thus deduce producers which potentially holds the pointer - to a block. Consumer can free the block if all that producers filled their - slots and left the queue. - - No producer can stop the progress for other producers. - - Consumer can obstruct a slot of a delayed producer by putting special mark. - Thus no producer can stop the progress for consumer. - But a slow producer may be forced to retry unlimited number of times. - Though it's very unlikely for a non-preempted producer to be obstructed. - That's why the algorithm is semi-wait-free. - - WARNING: there is no wait¬ify mechanic for consumer, - consumer receives nullptr if queue was empty. - - WARNING: though the algorithm itself is lock-free - but producers and consumer could be blocked by memory allocator - - WARNING: copy constructers of the queue are not thread-safe - */ - -#include <util/generic/noncopyable.h> -#include <util/generic/ptr.h> -#include <util/system/atomic.h> -#include <util/system/spinlock.h> - -#include "tune.h" - -namespace NThreading { - namespace NObstructiveQueuePrivate { - typedef void* TMsgLink; - - struct TEmpty { - }; - - struct TEmptyAux { - TEmptyAux Retrieve() const { - return TEmptyAux(); - } - void Store(TEmptyAux&) { - } - static constexpr TEmptyAux Zero() { - return TEmptyAux(); - } - }; - - template <typename TAux> - struct TSlot { - TMsgLink volatile Msg; - TAux AuxiliaryData; - - inline void Store(TAux& aux) { - AuxiliaryData.Store(aux); - } - - inline TAux Retrieve() const { - return AuxiliaryData.Retrieve(); - } - - static TSlot<TAux> NullElem() { - return {nullptr, TAux::Zero()}; - } - - static TSlot<TAux> Pair(TMsgLink msg, TAux aux) { - return {msg, std::move(aux)}; - } - }; - - template <> - struct TSlot<TEmptyAux> { - TMsgLink volatile Msg; - inline void Store(TEmptyAux&) { - } - inline TEmptyAux Retrieve() const { - return TEmptyAux(); - } - - static TSlot<TEmptyAux> NullElem() { - return {nullptr}; - } - - static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) { - return {msg}; - } - }; - - enum TPushResult { - PUSH_RESULT_OK, - PUSH_RESULT_BACKWARD, - PUSH_RESULT_FORWARD, - PUSH_RESULT_BLOCKED, - }; - - template <typename TAux, ui32 BUNCH_SIZE, typename TBase = TEmpty> - struct TMsgBunch: public TBase { - ui64 FirstSlot; - - TSlot<TAux> LinkArray[BUNCH_SIZE]; - - TMsgBunch* volatile NextBunch; - TMsgBunch* volatile BackLink; - - ui64 volatile Token; - TMsgBunch* volatile NextToken; - - /* this push can return PUSH_RESULT_BLOCKED */ - inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) { - if (Y_UNLIKELY(slot < FirstSlot)) { - return PUSH_RESULT_BACKWARD; - } - - if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) { - return PUSH_RESULT_FORWARD; - } - - LinkArray[slot - FirstSlot].Store(auxiliary); - - auto oldValue = AtomicSwap(&LinkArray[slot - FirstSlot].Msg, msg); - - if (Y_LIKELY(oldValue == nullptr)) { - return PUSH_RESULT_OK; - } else { - LeaveBlocked(oldValue); - return PUSH_RESULT_BLOCKED; - } - } - - inline bool IsSlotHere(ui64 slot) { - return slot < FirstSlot + BUNCH_SIZE; - } - - inline TMsgLink GetSlot(ui64 slot) const { - return AtomicGet(LinkArray[slot - FirstSlot].Msg); - } - - inline TSlot<TAux> GetSlotAux(ui64 slot) const { - auto msg = GetSlot(slot); - auto aux = LinkArray[slot - FirstSlot].Retrieve(); - return TSlot<TAux>::Pair(msg, aux); - } - - void LeaveBlocked(ui64 slot) { - auto token = GetToken(slot); - token->DecrementToken(); - } - - void LeaveBlocked(TMsgLink msg) { - auto token = reinterpret_cast<TMsgBunch*>(msg); - token->DecrementToken(); - } - - TSlot<TAux> BlockSlotAux(ui64 slot, TMsgBunch* token) { - auto old = - AtomicSwap(&LinkArray[slot - FirstSlot].Msg, (TMsgLink)token); - if (old == nullptr) { - // It's valid to increment after AtomicCas - // because token will release data only after SetNextToken - token->IncrementToken(); - return TSlot<TAux>::NullElem(); - } - return TSlot<TAux>::Pair(old, LinkArray[slot - FirstSlot].Retrieve()); - } - - inline TMsgBunch* GetNextBunch() const { - return AtomicGet(NextBunch); - } - - inline bool SetNextBunch(TMsgBunch* ptr) { - return AtomicCas(&NextBunch, ptr, nullptr); - } - - inline TMsgBunch* GetBackLink() const { - return AtomicGet(BackLink); - } - - inline TMsgBunch* GetToken(ui64 slot) { - return reinterpret_cast<TMsgBunch*>(LinkArray[slot - FirstSlot].Msg); - } - - inline void IncrementToken() { - AtomicIncrement(Token); - } - - // the object could be destroyed after this method - inline void DecrementToken() { - if (Y_UNLIKELY(AtomicDecrement(Token) == BUNCH_SIZE)) { - Release(this); - AtomicGet(NextToken)->DecrementToken(); - // this could be invalid here - } - } - - // the object could be destroyed after this method - inline void SetNextToken(TMsgBunch* next) { - AtomicSet(NextToken, next); - if (Y_UNLIKELY(AtomicAdd(Token, BUNCH_SIZE) == BUNCH_SIZE)) { - Release(this); - next->DecrementToken(); - } - // this could be invalid here - } - - TMsgBunch(ui64 start, TMsgBunch* backLink) { - AtomicSet(FirstSlot, start); - memset(&LinkArray, 0, sizeof(LinkArray)); - AtomicSet(NextBunch, nullptr); - AtomicSet(BackLink, backLink); - - AtomicSet(Token, 1); - AtomicSet(NextToken, nullptr); - } - - static void Release(TMsgBunch* bunch) { - auto backLink = AtomicGet(bunch->BackLink); - if (backLink == nullptr) { - return; - } - AtomicSet(bunch->BackLink, nullptr); - - do { - auto bbackLink = backLink->BackLink; - delete backLink; - backLink = bbackLink; - } while (backLink != nullptr); - } - - void Destroy() { - for (auto tail = BackLink; tail != nullptr;) { - auto next = tail->BackLink; - delete tail; - tail = next; - } - - for (auto next = this; next != nullptr;) { - auto nnext = next->NextBunch; - delete next; - next = nnext; - } - } - }; - - template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase = TEmpty> - class TWriteBucket { - public: - static const ui64 GROSS_SIZE; - - using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>; - - TWriteBucket(TBunch* bunch = new TBunch(0, nullptr)) - : LastBunch(bunch) - , SlotCounter(0) - { - } - - TWriteBucket(TWriteBucket&& move) - : LastBunch(move.LastBunch) - , SlotCounter(move.SlotCounter) - { - move.LastBunch = nullptr; - } - - ~TWriteBucket() { - if (LastBunch != nullptr) { - LastBunch->Destroy(); - } - } - - inline bool Push(TMsgLink msg, TAux aux) { - ui64 pushSlot = AtomicGetAndIncrement(SlotCounter); - TBunch* hintBunch = GetLastBunch(); - - for (;;) { - auto hint = hintBunch->Push(msg, pushSlot, aux); - if (Y_LIKELY(hint == PUSH_RESULT_OK)) { - return true; - } - bool hhResult = HandleHint(hintBunch, hint); - if (Y_UNLIKELY(!hhResult)) { - return false; - } - } - } - - protected: - template <typename, ui32, typename> - friend class TReadBucket; - - TBunch* volatile LastBunch; // Hint - volatile ui64 SlotCounter; - - inline TBunch* GetLastBunch() const { - return AtomicGet(LastBunch); - } - - bool HandleHint(TBunch*& hintBunch, TPushResult hint) { - if (Y_UNLIKELY(hint == PUSH_RESULT_BLOCKED)) { - return false; - } - - if (Y_UNLIKELY(hint == PUSH_RESULT_BACKWARD)) { - hintBunch = hintBunch->GetBackLink(); - return true; - } - - // PUSH_RESULT_FORWARD - auto nextBunch = hintBunch->GetNextBunch(); - - if (nextBunch == nullptr) { - auto first = hintBunch->FirstSlot + BUNCH_SIZE; - nextBunch = new TBunch(first, hintBunch); - if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) { - delete nextBunch; - nextBunch = hintBunch->GetNextBunch(); - } - } - - // hintBunch could not be freed here so it cannot be reused - // it's alright if this CAS was not succeeded, - // it means that other thread did that recently - AtomicCas(&LastBunch, nextBunch, hintBunch); - - hintBunch = nextBunch; - return true; - } - }; - - template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase> - class TReadBucket { - public: - static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 20; - - using TWBucket = TWriteBucket<TAux, BUNCH_SIZE, TBunchBase>; - using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>; - - TReadBucket(TWBucket* writer) - : Writer(writer) - , ReadBunch(writer->GetLastBunch()) - , LastKnownPushBunch(writer->GetLastBunch()) - { - ReadBunch->DecrementToken(); // no previous token - } - - TReadBucket(TReadBucket toCopy, TWBucket* writer) - : TReadBucket(std::move(toCopy)) - { - Writer = writer; - } - - ui64 ReadyCount() const { - return AtomicGet(Writer->SlotCounter) - ReadSlot; - } - - inline TMsgLink Pop() { - return PopAux().Msg; - } - - inline TSlot<TAux> PopAux() { - for (;;) { - if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) { - if (Y_LIKELY(!RereadPushSlot())) { - return TSlot<TAux>::NullElem(); - } - } - - if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) { - if (Y_UNLIKELY(!SwitchToNextBunch())) { - return TSlot<TAux>::NullElem(); - } - } - - auto result = ReadBunch->GetSlotAux(ReadSlot); - if (Y_LIKELY(result.Msg != nullptr)) { - ++ReadSlot; - return result; - } - - if (ReadSlot + 1 == AtomicGet(Writer->SlotCounter)) { - return TSlot<TAux>::NullElem(); - } - - result = StubbornPopAux(); - - if (result.Msg != nullptr) { - return result; - } - } - } - - private: - TWBucket* Writer; - TBunch* ReadBunch; - ui64 ReadSlot = 0; - TBunch* LastKnownPushBunch; - ui64 LastKnownPushSlot = 0; - - // MUST BE: ReadSlot == LastKnownPushSlot - bool RereadPushSlot() { - auto oldSlot = LastKnownPushSlot; - - auto currentPushBunch = Writer->GetLastBunch(); - auto currentPushSlot = AtomicGet(Writer->SlotCounter); - - if (currentPushBunch != LastKnownPushBunch) { - // LastKnownPushBunch could be invalid after this line - LastKnownPushBunch->SetNextToken(currentPushBunch); - } - - LastKnownPushBunch = currentPushBunch; - LastKnownPushSlot = currentPushSlot; - - return oldSlot != LastKnownPushSlot; - } - - bool SwitchToNextBunch() { - for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { - auto next = ReadBunch->GetNextBunch(); - if (next != nullptr) { - ReadBunch = next; - return true; - } - SpinLockPause(); - } - return false; - } - - TSlot<TAux> StubbornPopAux() { - for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { - auto result = ReadBunch->GetSlotAux(ReadSlot); - if (Y_LIKELY(result.Msg != nullptr)) { - ++ReadSlot; - return result; - } - SpinLockPause(); - } - - return ReadBunch->BlockSlotAux(ReadSlot++, LastKnownPushBunch); - } - }; - - struct TDefaultParams { - static constexpr bool DeleteItems = true; - using TAux = NObstructiveQueuePrivate::TEmptyAux; - using TBunchBase = NObstructiveQueuePrivate::TEmpty; - static constexpr ui32 BUNCH_SIZE = 251; - }; - - } //namespace NObstructiveQueuePrivate - - DeclareTuneValueParam(TObstructiveQueueBunchSize, ui32, BUNCH_SIZE); - DeclareTuneValueParam(TObstructiveQueueDeleteItems, bool, DeleteItems); - DeclareTuneTypeParam(TObstructiveQueueBunchBase, TBunchBase); - DeclareTuneTypeParam(TObstructiveQueueAux, TAux); - +#pragma once + +/* + Semi-wait-free queue, multiple producers - one consumer. Strict order. + The queue algorithm is using concept of virtual infinite array. + + A producer takes a number from a counter and atomicaly increments the counter. + The number taken is a number of a slot for the producer to put a new message + into infinite array. + + Then producer constructs a virtual infinite array by bidirectional linked list + of blocks. Each block contains several slots. + + There is a hint pointer which optimisticly points to the last block + of the list and never goes backward. + + Consumer exploits the property of the hint pointer always going forward + to free old blocks eventually. Consumer periodically read the hint pointer + and the counter and thus deduce producers which potentially holds the pointer + to a block. Consumer can free the block if all that producers filled their + slots and left the queue. + + No producer can stop the progress for other producers. + + Consumer can obstruct a slot of a delayed producer by putting special mark. + Thus no producer can stop the progress for consumer. + But a slow producer may be forced to retry unlimited number of times. + Though it's very unlikely for a non-preempted producer to be obstructed. + That's why the algorithm is semi-wait-free. + + WARNING: there is no wait¬ify mechanic for consumer, + consumer receives nullptr if queue was empty. + + WARNING: though the algorithm itself is lock-free + but producers and consumer could be blocked by memory allocator + + WARNING: copy constructers of the queue are not thread-safe + */ + +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/system/atomic.h> +#include <util/system/spinlock.h> + +#include "tune.h" + +namespace NThreading { + namespace NObstructiveQueuePrivate { + typedef void* TMsgLink; + + struct TEmpty { + }; + + struct TEmptyAux { + TEmptyAux Retrieve() const { + return TEmptyAux(); + } + void Store(TEmptyAux&) { + } + static constexpr TEmptyAux Zero() { + return TEmptyAux(); + } + }; + + template <typename TAux> + struct TSlot { + TMsgLink volatile Msg; + TAux AuxiliaryData; + + inline void Store(TAux& aux) { + AuxiliaryData.Store(aux); + } + + inline TAux Retrieve() const { + return AuxiliaryData.Retrieve(); + } + + static TSlot<TAux> NullElem() { + return {nullptr, TAux::Zero()}; + } + + static TSlot<TAux> Pair(TMsgLink msg, TAux aux) { + return {msg, std::move(aux)}; + } + }; + + template <> + struct TSlot<TEmptyAux> { + TMsgLink volatile Msg; + inline void Store(TEmptyAux&) { + } + inline TEmptyAux Retrieve() const { + return TEmptyAux(); + } + + static TSlot<TEmptyAux> NullElem() { + return {nullptr}; + } + + static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) { + return {msg}; + } + }; + + enum TPushResult { + PUSH_RESULT_OK, + PUSH_RESULT_BACKWARD, + PUSH_RESULT_FORWARD, + PUSH_RESULT_BLOCKED, + }; + + template <typename TAux, ui32 BUNCH_SIZE, typename TBase = TEmpty> + struct TMsgBunch: public TBase { + ui64 FirstSlot; + + TSlot<TAux> LinkArray[BUNCH_SIZE]; + + TMsgBunch* volatile NextBunch; + TMsgBunch* volatile BackLink; + + ui64 volatile Token; + TMsgBunch* volatile NextToken; + + /* this push can return PUSH_RESULT_BLOCKED */ + inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) { + if (Y_UNLIKELY(slot < FirstSlot)) { + return PUSH_RESULT_BACKWARD; + } + + if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) { + return PUSH_RESULT_FORWARD; + } + + LinkArray[slot - FirstSlot].Store(auxiliary); + + auto oldValue = AtomicSwap(&LinkArray[slot - FirstSlot].Msg, msg); + + if (Y_LIKELY(oldValue == nullptr)) { + return PUSH_RESULT_OK; + } else { + LeaveBlocked(oldValue); + return PUSH_RESULT_BLOCKED; + } + } + + inline bool IsSlotHere(ui64 slot) { + return slot < FirstSlot + BUNCH_SIZE; + } + + inline TMsgLink GetSlot(ui64 slot) const { + return AtomicGet(LinkArray[slot - FirstSlot].Msg); + } + + inline TSlot<TAux> GetSlotAux(ui64 slot) const { + auto msg = GetSlot(slot); + auto aux = LinkArray[slot - FirstSlot].Retrieve(); + return TSlot<TAux>::Pair(msg, aux); + } + + void LeaveBlocked(ui64 slot) { + auto token = GetToken(slot); + token->DecrementToken(); + } + + void LeaveBlocked(TMsgLink msg) { + auto token = reinterpret_cast<TMsgBunch*>(msg); + token->DecrementToken(); + } + + TSlot<TAux> BlockSlotAux(ui64 slot, TMsgBunch* token) { + auto old = + AtomicSwap(&LinkArray[slot - FirstSlot].Msg, (TMsgLink)token); + if (old == nullptr) { + // It's valid to increment after AtomicCas + // because token will release data only after SetNextToken + token->IncrementToken(); + return TSlot<TAux>::NullElem(); + } + return TSlot<TAux>::Pair(old, LinkArray[slot - FirstSlot].Retrieve()); + } + + inline TMsgBunch* GetNextBunch() const { + return AtomicGet(NextBunch); + } + + inline bool SetNextBunch(TMsgBunch* ptr) { + return AtomicCas(&NextBunch, ptr, nullptr); + } + + inline TMsgBunch* GetBackLink() const { + return AtomicGet(BackLink); + } + + inline TMsgBunch* GetToken(ui64 slot) { + return reinterpret_cast<TMsgBunch*>(LinkArray[slot - FirstSlot].Msg); + } + + inline void IncrementToken() { + AtomicIncrement(Token); + } + + // the object could be destroyed after this method + inline void DecrementToken() { + if (Y_UNLIKELY(AtomicDecrement(Token) == BUNCH_SIZE)) { + Release(this); + AtomicGet(NextToken)->DecrementToken(); + // this could be invalid here + } + } + + // the object could be destroyed after this method + inline void SetNextToken(TMsgBunch* next) { + AtomicSet(NextToken, next); + if (Y_UNLIKELY(AtomicAdd(Token, BUNCH_SIZE) == BUNCH_SIZE)) { + Release(this); + next->DecrementToken(); + } + // this could be invalid here + } + + TMsgBunch(ui64 start, TMsgBunch* backLink) { + AtomicSet(FirstSlot, start); + memset(&LinkArray, 0, sizeof(LinkArray)); + AtomicSet(NextBunch, nullptr); + AtomicSet(BackLink, backLink); + + AtomicSet(Token, 1); + AtomicSet(NextToken, nullptr); + } + + static void Release(TMsgBunch* bunch) { + auto backLink = AtomicGet(bunch->BackLink); + if (backLink == nullptr) { + return; + } + AtomicSet(bunch->BackLink, nullptr); + + do { + auto bbackLink = backLink->BackLink; + delete backLink; + backLink = bbackLink; + } while (backLink != nullptr); + } + + void Destroy() { + for (auto tail = BackLink; tail != nullptr;) { + auto next = tail->BackLink; + delete tail; + tail = next; + } + + for (auto next = this; next != nullptr;) { + auto nnext = next->NextBunch; + delete next; + next = nnext; + } + } + }; + + template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase = TEmpty> + class TWriteBucket { + public: + static const ui64 GROSS_SIZE; + + using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>; + + TWriteBucket(TBunch* bunch = new TBunch(0, nullptr)) + : LastBunch(bunch) + , SlotCounter(0) + { + } + + TWriteBucket(TWriteBucket&& move) + : LastBunch(move.LastBunch) + , SlotCounter(move.SlotCounter) + { + move.LastBunch = nullptr; + } + + ~TWriteBucket() { + if (LastBunch != nullptr) { + LastBunch->Destroy(); + } + } + + inline bool Push(TMsgLink msg, TAux aux) { + ui64 pushSlot = AtomicGetAndIncrement(SlotCounter); + TBunch* hintBunch = GetLastBunch(); + + for (;;) { + auto hint = hintBunch->Push(msg, pushSlot, aux); + if (Y_LIKELY(hint == PUSH_RESULT_OK)) { + return true; + } + bool hhResult = HandleHint(hintBunch, hint); + if (Y_UNLIKELY(!hhResult)) { + return false; + } + } + } + + protected: + template <typename, ui32, typename> + friend class TReadBucket; + + TBunch* volatile LastBunch; // Hint + volatile ui64 SlotCounter; + + inline TBunch* GetLastBunch() const { + return AtomicGet(LastBunch); + } + + bool HandleHint(TBunch*& hintBunch, TPushResult hint) { + if (Y_UNLIKELY(hint == PUSH_RESULT_BLOCKED)) { + return false; + } + + if (Y_UNLIKELY(hint == PUSH_RESULT_BACKWARD)) { + hintBunch = hintBunch->GetBackLink(); + return true; + } + + // PUSH_RESULT_FORWARD + auto nextBunch = hintBunch->GetNextBunch(); + + if (nextBunch == nullptr) { + auto first = hintBunch->FirstSlot + BUNCH_SIZE; + nextBunch = new TBunch(first, hintBunch); + if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) { + delete nextBunch; + nextBunch = hintBunch->GetNextBunch(); + } + } + + // hintBunch could not be freed here so it cannot be reused + // it's alright if this CAS was not succeeded, + // it means that other thread did that recently + AtomicCas(&LastBunch, nextBunch, hintBunch); + + hintBunch = nextBunch; + return true; + } + }; + + template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase> + class TReadBucket { + public: + static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 20; + + using TWBucket = TWriteBucket<TAux, BUNCH_SIZE, TBunchBase>; + using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>; + + TReadBucket(TWBucket* writer) + : Writer(writer) + , ReadBunch(writer->GetLastBunch()) + , LastKnownPushBunch(writer->GetLastBunch()) + { + ReadBunch->DecrementToken(); // no previous token + } + + TReadBucket(TReadBucket toCopy, TWBucket* writer) + : TReadBucket(std::move(toCopy)) + { + Writer = writer; + } + + ui64 ReadyCount() const { + return AtomicGet(Writer->SlotCounter) - ReadSlot; + } + + inline TMsgLink Pop() { + return PopAux().Msg; + } + + inline TSlot<TAux> PopAux() { + for (;;) { + if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) { + if (Y_LIKELY(!RereadPushSlot())) { + return TSlot<TAux>::NullElem(); + } + } + + if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) { + if (Y_UNLIKELY(!SwitchToNextBunch())) { + return TSlot<TAux>::NullElem(); + } + } + + auto result = ReadBunch->GetSlotAux(ReadSlot); + if (Y_LIKELY(result.Msg != nullptr)) { + ++ReadSlot; + return result; + } + + if (ReadSlot + 1 == AtomicGet(Writer->SlotCounter)) { + return TSlot<TAux>::NullElem(); + } + + result = StubbornPopAux(); + + if (result.Msg != nullptr) { + return result; + } + } + } + + private: + TWBucket* Writer; + TBunch* ReadBunch; + ui64 ReadSlot = 0; + TBunch* LastKnownPushBunch; + ui64 LastKnownPushSlot = 0; + + // MUST BE: ReadSlot == LastKnownPushSlot + bool RereadPushSlot() { + auto oldSlot = LastKnownPushSlot; + + auto currentPushBunch = Writer->GetLastBunch(); + auto currentPushSlot = AtomicGet(Writer->SlotCounter); + + if (currentPushBunch != LastKnownPushBunch) { + // LastKnownPushBunch could be invalid after this line + LastKnownPushBunch->SetNextToken(currentPushBunch); + } + + LastKnownPushBunch = currentPushBunch; + LastKnownPushSlot = currentPushSlot; + + return oldSlot != LastKnownPushSlot; + } + + bool SwitchToNextBunch() { + for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { + auto next = ReadBunch->GetNextBunch(); + if (next != nullptr) { + ReadBunch = next; + return true; + } + SpinLockPause(); + } + return false; + } + + TSlot<TAux> StubbornPopAux() { + for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { + auto result = ReadBunch->GetSlotAux(ReadSlot); + if (Y_LIKELY(result.Msg != nullptr)) { + ++ReadSlot; + return result; + } + SpinLockPause(); + } + + return ReadBunch->BlockSlotAux(ReadSlot++, LastKnownPushBunch); + } + }; + + struct TDefaultParams { + static constexpr bool DeleteItems = true; + using TAux = NObstructiveQueuePrivate::TEmptyAux; + using TBunchBase = NObstructiveQueuePrivate::TEmpty; + static constexpr ui32 BUNCH_SIZE = 251; + }; + + } //namespace NObstructiveQueuePrivate + + DeclareTuneValueParam(TObstructiveQueueBunchSize, ui32, BUNCH_SIZE); + DeclareTuneValueParam(TObstructiveQueueDeleteItems, bool, DeleteItems); + DeclareTuneTypeParam(TObstructiveQueueBunchBase, TBunchBase); + DeclareTuneTypeParam(TObstructiveQueueAux, TAux); + template <typename TItem = void, typename... TParams> - class TObstructiveConsumerAuxQueue { - private: - using TTuned = - TTune<NObstructiveQueuePrivate::TDefaultParams, TParams...>; - - using TAux = typename TTuned::TAux; - using TSlot = NObstructiveQueuePrivate::TSlot<TAux>; - using TMsgLink = NObstructiveQueuePrivate::TMsgLink; - using TBunchBase = typename TTuned::TBunchBase; - static constexpr bool DeleteItems = TTuned::DeleteItems; - static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE; - - public: - TObstructiveConsumerAuxQueue() - : RBuckets(&WBucket) - { - } - - ~TObstructiveConsumerAuxQueue() { - if (DeleteItems) { - for (;;) { - auto msg = Pop(); - if (msg == nullptr) { - break; - } - TDelete::Destroy(msg); - } - } - } - - void Push(TItem* msg) { - while (!WBucket.Push(reinterpret_cast<TMsgLink>(msg), TAux())) { - } - } - - TItem* Pop() { - return reinterpret_cast<TItem*>(RBuckets.Pop()); - } - - TSlot PopAux() { - return RBuckets.PopAux(); - } - - private: - NObstructiveQueuePrivate::TWriteBucket<TAux, BUNCH_SIZE, TBunchBase> - WBucket; - NObstructiveQueuePrivate::TReadBucket<TAux, BUNCH_SIZE, TBunchBase> - RBuckets; - }; - - template <typename TItem = void, bool DeleteItems = true> - class TObstructiveConsumerQueue + class TObstructiveConsumerAuxQueue { + private: + using TTuned = + TTune<NObstructiveQueuePrivate::TDefaultParams, TParams...>; + + using TAux = typename TTuned::TAux; + using TSlot = NObstructiveQueuePrivate::TSlot<TAux>; + using TMsgLink = NObstructiveQueuePrivate::TMsgLink; + using TBunchBase = typename TTuned::TBunchBase; + static constexpr bool DeleteItems = TTuned::DeleteItems; + static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE; + + public: + TObstructiveConsumerAuxQueue() + : RBuckets(&WBucket) + { + } + + ~TObstructiveConsumerAuxQueue() { + if (DeleteItems) { + for (;;) { + auto msg = Pop(); + if (msg == nullptr) { + break; + } + TDelete::Destroy(msg); + } + } + } + + void Push(TItem* msg) { + while (!WBucket.Push(reinterpret_cast<TMsgLink>(msg), TAux())) { + } + } + + TItem* Pop() { + return reinterpret_cast<TItem*>(RBuckets.Pop()); + } + + TSlot PopAux() { + return RBuckets.PopAux(); + } + + private: + NObstructiveQueuePrivate::TWriteBucket<TAux, BUNCH_SIZE, TBunchBase> + WBucket; + NObstructiveQueuePrivate::TReadBucket<TAux, BUNCH_SIZE, TBunchBase> + RBuckets; + }; + + template <typename TItem = void, bool DeleteItems = true> + class TObstructiveConsumerQueue : public TObstructiveConsumerAuxQueue<TItem, TObstructiveQueueDeleteItems<DeleteItems>> { - }; + }; } diff --git a/library/cpp/threading/queue/queue_ut.cpp b/library/cpp/threading/queue/queue_ut.cpp index 80eca147da..8b36437034 100644 --- a/library/cpp/threading/queue/queue_ut.cpp +++ b/library/cpp/threading/queue/queue_ut.cpp @@ -1,242 +1,242 @@ #include <library/cpp/testing/unittest/registar.h> -#include <util/system/thread.h> - -#include "ut_helpers.h" - -typedef void* TMsgLink; - +#include <util/system/thread.h> + +#include "ut_helpers.h" + +typedef void* TMsgLink; + template <typename TQueueType> -class TQueueTestProcs: public TTestBase { -private: +class TQueueTestProcs: public TTestBase { +private: UNIT_TEST_SUITE_DEMANGLE(TQueueTestProcs<TQueueType>); - UNIT_TEST(Threads2_Push1M_Threads1_Pop2M) - UNIT_TEST(Threads4_Push1M_Threads1_Pop4M) - UNIT_TEST(Threads8_RndPush100K_Threads8_Queues) + UNIT_TEST(Threads2_Push1M_Threads1_Pop2M) + UNIT_TEST(Threads4_Push1M_Threads1_Pop4M) + UNIT_TEST(Threads8_RndPush100K_Threads8_Queues) /* - UNIT_TEST(Threads24_RndPush100K_Threads24_Queues) - UNIT_TEST(Threads24_RndPush100K_Threads8_Queues) - UNIT_TEST(Threads24_RndPush100K_Threads4_Queues) -*/ - UNIT_TEST_SUITE_END(); - -public: - void Push1M_Pop1M() { + UNIT_TEST(Threads24_RndPush100K_Threads24_Queues) + UNIT_TEST(Threads24_RndPush100K_Threads8_Queues) + UNIT_TEST(Threads24_RndPush100K_Threads4_Queues) +*/ + UNIT_TEST_SUITE_END(); + +public: + void Push1M_Pop1M() { TQueueType queue; - TMsgLink msg = &queue; - - auto pmsg = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); - - for (int i = 0; i < 1000000; ++i) { - queue.Push((char*)msg + i); - } - - for (int i = 0; i < 1000000; ++i) { - auto popped = queue.Pop(); - UNIT_ASSERT_EQUAL((char*)msg + i, popped); - } - - pmsg = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); - } - - void Threads2_Push1M_Threads1_Pop2M() { + TMsgLink msg = &queue; + + auto pmsg = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); + + for (int i = 0; i < 1000000; ++i) { + queue.Push((char*)msg + i); + } + + for (int i = 0; i < 1000000; ++i) { + auto popped = queue.Pop(); + UNIT_ASSERT_EQUAL((char*)msg + i, popped); + } + + pmsg = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); + } + + void Threads2_Push1M_Threads1_Pop2M() { TQueueType queue; - + class TPusherThread: public ISimpleThread { - public: + public: TPusherThread(TQueueType& theQueue, char* start) : Queue(theQueue) - , Arg(start) - { - } - + , Arg(start) + { + } + TQueueType& Queue; - char* Arg; - - void* ThreadProc() override { - for (int i = 0; i < 1000000; ++i) { - Queue.Push(Arg + i); - } - return nullptr; - } - }; - - TPusherThread pusher1(queue, (char*)&queue); - TPusherThread pusher2(queue, (char*)&queue + 2000000); - - pusher1.Start(); - pusher2.Start(); - - for (int i = 0; i < 2000000; ++i) { - while (queue.Pop() == nullptr) { - SpinLockPause(); - } - } - - auto pmsg = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); - } - - void Threads4_Push1M_Threads1_Pop4M() { + char* Arg; + + void* ThreadProc() override { + for (int i = 0; i < 1000000; ++i) { + Queue.Push(Arg + i); + } + return nullptr; + } + }; + + TPusherThread pusher1(queue, (char*)&queue); + TPusherThread pusher2(queue, (char*)&queue + 2000000); + + pusher1.Start(); + pusher2.Start(); + + for (int i = 0; i < 2000000; ++i) { + while (queue.Pop() == nullptr) { + SpinLockPause(); + } + } + + auto pmsg = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); + } + + void Threads4_Push1M_Threads1_Pop4M() { TQueueType queue; - + class TPusherThread: public ISimpleThread { - public: + public: TPusherThread(TQueueType& theQueue, char* start) : Queue(theQueue) - , Arg(start) - { - } - + , Arg(start) + { + } + TQueueType& Queue; - char* Arg; - - void* ThreadProc() override { - for (int i = 0; i < 1000000; ++i) { - Queue.Push(Arg + i); - } - return nullptr; - } - }; - - TPusherThread pusher1(queue, (char*)&queue); - TPusherThread pusher2(queue, (char*)&queue + 2000000); - TPusherThread pusher3(queue, (char*)&queue + 4000000); - TPusherThread pusher4(queue, (char*)&queue + 6000000); - - pusher1.Start(); - pusher2.Start(); - pusher3.Start(); - pusher4.Start(); - - for (int i = 0; i < 4000000; ++i) { - while (queue.Pop() == nullptr) { - SpinLockPause(); - } - } - - auto pmsg = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); - } - - template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES> - void ManyRndPush100K_ManyQueues() { + char* Arg; + + void* ThreadProc() override { + for (int i = 0; i < 1000000; ++i) { + Queue.Push(Arg + i); + } + return nullptr; + } + }; + + TPusherThread pusher1(queue, (char*)&queue); + TPusherThread pusher2(queue, (char*)&queue + 2000000); + TPusherThread pusher3(queue, (char*)&queue + 4000000); + TPusherThread pusher4(queue, (char*)&queue + 6000000); + + pusher1.Start(); + pusher2.Start(); + pusher3.Start(); + pusher4.Start(); + + for (int i = 0; i < 4000000; ++i) { + while (queue.Pop() == nullptr) { + SpinLockPause(); + } + } + + auto pmsg = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); + } + + template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES> + void ManyRndPush100K_ManyQueues() { TQueueType queue[NUMBER_OF_QUEUES]; - + class TPusherThread: public ISimpleThread { - public: + public: TPusherThread(TQueueType* queues, char* start) - : Queues(queues) - , Arg(start) - { - } - + : Queues(queues) + , Arg(start) + { + } + TQueueType* Queues; - char* Arg; - - void* ThreadProc() override { - ui64 counters[NUMBER_OF_QUEUES]; - for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { - counters[i] = 0; - } - - for (int i = 0; i < 100000; ++i) { - size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES; - int cookie = counters[rnd]++; - Queues[rnd].Push(Arg + cookie); - } - - for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { - Queues[i].Push((void*)2ULL); - } - - return nullptr; - } - }; - + char* Arg; + + void* ThreadProc() override { + ui64 counters[NUMBER_OF_QUEUES]; + for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { + counters[i] = 0; + } + + for (int i = 0; i < 100000; ++i) { + size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES; + int cookie = counters[rnd]++; + Queues[rnd].Push(Arg + cookie); + } + + for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { + Queues[i].Push((void*)2ULL); + } + + return nullptr; + } + }; + class TPopperThread: public ISimpleThread { - public: + public: TPopperThread(TQueueType* theQueue, char* base) : Queue(theQueue) - , Base(base) - { - } - + , Base(base) + { + } + TQueueType* Queue; - char* Base; - - void* ThreadProc() override { - ui64 counters[NUMBER_OF_PUSHERS]; - for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { - counters[i] = 0; - } - - for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) { - auto msg = Queue->Pop(); - if (msg == nullptr) { - SpinLockPause(); - continue; - } - if (msg == (void*)2ULL) { - ++fin; - continue; - } - ui64 shift = (char*)msg - Base; - auto pusherNum = shift / 200000000ULL; - auto msgNum = shift % 200000000ULL; - - UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum); - ++counters[pusherNum]; - } - - auto pmsg = Queue->Pop(); - UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); - - return nullptr; - } - }; - + char* Base; + + void* ThreadProc() override { + ui64 counters[NUMBER_OF_PUSHERS]; + for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { + counters[i] = 0; + } + + for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) { + auto msg = Queue->Pop(); + if (msg == nullptr) { + SpinLockPause(); + continue; + } + if (msg == (void*)2ULL) { + ++fin; + continue; + } + ui64 shift = (char*)msg - Base; + auto pusherNum = shift / 200000000ULL; + auto msgNum = shift % 200000000ULL; + + UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum); + ++counters[pusherNum]; + } + + auto pmsg = Queue->Pop(); + UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); + + return nullptr; + } + }; + TVector<TAutoPtr<TPopperThread>> poppers; TVector<TAutoPtr<TPusherThread>> pushers; - - for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { - poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue)); - poppers.back()->Start(); - } - - for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { - pushers.emplace_back( - new TPusherThread(queue, (char*)&queue + 200000000ULL * i)); - pushers.back()->Start(); - } - - for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { - poppers[i]->Join(); - } - - for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { - pushers[i]->Join(); - } - } - - void Threads8_RndPush100K_Threads8_Queues() { - ManyRndPush100K_ManyQueues<8, 8>(); - } - - /* - void Threads24_RndPush100K_Threads24_Queues() { - ManyRndPush100K_ManyQueues<24, 24>(); - } - - void Threads24_RndPush100K_Threads8_Queues() { - ManyRndPush100K_ManyQueues<24, 8>(); - } - - void Threads24_RndPush100K_Threads4_Queues() { - ManyRndPush100K_ManyQueues<24, 4>(); - } - */ -}; - -REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestProcs); + + for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { + poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue)); + poppers.back()->Start(); + } + + for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { + pushers.emplace_back( + new TPusherThread(queue, (char*)&queue + 200000000ULL * i)); + pushers.back()->Start(); + } + + for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { + poppers[i]->Join(); + } + + for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { + pushers[i]->Join(); + } + } + + void Threads8_RndPush100K_Threads8_Queues() { + ManyRndPush100K_ManyQueues<8, 8>(); + } + + /* + void Threads24_RndPush100K_Threads24_Queues() { + ManyRndPush100K_ManyQueues<24, 24>(); + } + + void Threads24_RndPush100K_Threads8_Queues() { + ManyRndPush100K_ManyQueues<24, 8>(); + } + + void Threads24_RndPush100K_Threads4_Queues() { + ManyRndPush100K_ManyQueues<24, 4>(); + } + */ +}; + +REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestProcs); diff --git a/library/cpp/threading/queue/tune.h b/library/cpp/threading/queue/tune.h index 50fc3dc17c..43ad5efe3e 100644 --- a/library/cpp/threading/queue/tune.h +++ b/library/cpp/threading/queue/tune.h @@ -1,101 +1,101 @@ -#pragma once - -/* - Motivation: consider you have a template class with many parameters - with default associations - - template <typename A = TDefA, - typename B = TDefB, - typename C = TDefC, - typename D = TDefD> - class TExample { - }; - - consider you would like to provide easy to use interface to tune all - these parameters in position independed manner, - In that case TTune would be helpful for you. - - How to use: - First step: declare a struct with all default associations - - struct TDefaultTune { - using TStructA = TDefA; - using TStructB = TDefB; - using TStructC = TDefC; - using TStructD = TDefD; - }; - - Second step: declare helper names visible to a user - - DeclareTuneTypeParam(TTuneParamA, TStructA); - DeclareTuneTypeParam(TTuneParamB, TStructB); - DeclareTuneTypeParam(TTuneParamC, TStructC); - DeclareTuneTypeParam(TTuneParamD, TStructD); - - Third step: declare TExample this way: - - template <typename...TParams> - class TExample { - using TMyParams = TTune<TDefaultTune, TParams...>; - - using TActualA = TMyParams::TStructA; - using TActualB = TMyParams::TStructB; - ... - }; - - TTune<TDefaultTune, TParams...> is a struct with the default parameteres - taken from TDefaultTune and overridden from "TParams...". - - for example: "TTune<TDefaultTune, TTuneParamC<TUserClass>>" - will be virtually the same as: - - struct TTunedClass { - using TStructA = TDefA; - using TStructB = TDefB; - using TStructC = TUserClass; - using TStructD = TDefD; - }; - - From now on you can tune your TExample in the following manner: - - using TCustomClass = - TExample <TTuneParamA<TUserStruct1>, TTuneParamD<TUserStruct2>>; - - You can also tweak constant expressions in your TDefaultTune. - Consider you have: - - struct TDefaultTune { - static constexpr ui32 MySize = 42; - }; - - declare an interface to modify the parameter this way: - - DeclareTuneValueParam(TStructSize, ui32, MySize); - - and tweak your class: - - using TTwiceBigger = TExample<TStructSize<84>>; - - */ - -#define DeclareTuneTypeParam(TParamName, InternalName) \ - template <typename TNewType> \ - struct TParamName { \ - template <typename TBase> \ - struct TApply: public TBase { \ - using InternalName = TNewType; \ - }; \ - } - -#define DeclareTuneValueParam(TParamName, TValueType, InternalName) \ - template <TValueType NewValue> \ - struct TParamName { \ - template <typename TBase> \ - struct TApply: public TBase { \ - static constexpr TValueType InternalName = NewValue; \ - }; \ - } - +#pragma once + +/* + Motivation: consider you have a template class with many parameters + with default associations + + template <typename A = TDefA, + typename B = TDefB, + typename C = TDefC, + typename D = TDefD> + class TExample { + }; + + consider you would like to provide easy to use interface to tune all + these parameters in position independed manner, + In that case TTune would be helpful for you. + + How to use: + First step: declare a struct with all default associations + + struct TDefaultTune { + using TStructA = TDefA; + using TStructB = TDefB; + using TStructC = TDefC; + using TStructD = TDefD; + }; + + Second step: declare helper names visible to a user + + DeclareTuneTypeParam(TTuneParamA, TStructA); + DeclareTuneTypeParam(TTuneParamB, TStructB); + DeclareTuneTypeParam(TTuneParamC, TStructC); + DeclareTuneTypeParam(TTuneParamD, TStructD); + + Third step: declare TExample this way: + + template <typename...TParams> + class TExample { + using TMyParams = TTune<TDefaultTune, TParams...>; + + using TActualA = TMyParams::TStructA; + using TActualB = TMyParams::TStructB; + ... + }; + + TTune<TDefaultTune, TParams...> is a struct with the default parameteres + taken from TDefaultTune and overridden from "TParams...". + + for example: "TTune<TDefaultTune, TTuneParamC<TUserClass>>" + will be virtually the same as: + + struct TTunedClass { + using TStructA = TDefA; + using TStructB = TDefB; + using TStructC = TUserClass; + using TStructD = TDefD; + }; + + From now on you can tune your TExample in the following manner: + + using TCustomClass = + TExample <TTuneParamA<TUserStruct1>, TTuneParamD<TUserStruct2>>; + + You can also tweak constant expressions in your TDefaultTune. + Consider you have: + + struct TDefaultTune { + static constexpr ui32 MySize = 42; + }; + + declare an interface to modify the parameter this way: + + DeclareTuneValueParam(TStructSize, ui32, MySize); + + and tweak your class: + + using TTwiceBigger = TExample<TStructSize<84>>; + + */ + +#define DeclareTuneTypeParam(TParamName, InternalName) \ + template <typename TNewType> \ + struct TParamName { \ + template <typename TBase> \ + struct TApply: public TBase { \ + using InternalName = TNewType; \ + }; \ + } + +#define DeclareTuneValueParam(TParamName, TValueType, InternalName) \ + template <TValueType NewValue> \ + struct TParamName { \ + template <typename TBase> \ + struct TApply: public TBase { \ + static constexpr TValueType InternalName = NewValue; \ + }; \ + } + #define DeclareTuneContainer(TParamName, InternalName) \ template <template <typename, typename...> class TNewContainer> \ struct TParamName { \ @@ -104,22 +104,22 @@ template <typename TElem, typename... TRest> \ using InternalName = TNewContainer<TElem, TRest...>; \ }; \ - } - -namespace NTunePrivate { - template <typename TBase, typename... TParams> - struct TFold; - - template <typename TBase> - struct TFold<TBase>: public TBase { - }; - - template <typename TBase, typename TFirstArg, typename... TRest> - struct TFold<TBase, TFirstArg, TRest...> - : public TFold<typename TFirstArg::template TApply<TBase>, TRest...> { - }; -} - -template <typename TDefault, typename... TParams> -struct TTune: public NTunePrivate::TFold<TDefault, TParams...> { -}; + } + +namespace NTunePrivate { + template <typename TBase, typename... TParams> + struct TFold; + + template <typename TBase> + struct TFold<TBase>: public TBase { + }; + + template <typename TBase, typename TFirstArg, typename... TRest> + struct TFold<TBase, TFirstArg, TRest...> + : public TFold<typename TFirstArg::template TApply<TBase>, TRest...> { + }; +} + +template <typename TDefault, typename... TParams> +struct TTune: public NTunePrivate::TFold<TDefault, TParams...> { +}; diff --git a/library/cpp/threading/queue/tune_ut.cpp b/library/cpp/threading/queue/tune_ut.cpp index 7e980d3e27..64bc8fd427 100644 --- a/library/cpp/threading/queue/tune_ut.cpp +++ b/library/cpp/threading/queue/tune_ut.cpp @@ -1,118 +1,118 @@ #include <library/cpp/testing/unittest/registar.h> -#include "tune.h" - -struct TDefaultStructA { -}; - -struct TDefaultStructB { -}; - -struct TDefaults { - using TStructA = TDefaultStructA; - using TStructB = TDefaultStructB; - static constexpr ui32 Param1 = 42; - static constexpr ui32 Param2 = 42; -}; - -DeclareTuneTypeParam(TweakStructA, TStructA); -DeclareTuneTypeParam(TweakStructB, TStructB); -DeclareTuneValueParam(TweakParam1, ui32, Param1); -DeclareTuneValueParam(TweakParam2, ui32, Param2); - +#include "tune.h" + +struct TDefaultStructA { +}; + +struct TDefaultStructB { +}; + +struct TDefaults { + using TStructA = TDefaultStructA; + using TStructB = TDefaultStructB; + static constexpr ui32 Param1 = 42; + static constexpr ui32 Param2 = 42; +}; + +DeclareTuneTypeParam(TweakStructA, TStructA); +DeclareTuneTypeParam(TweakStructB, TStructB); +DeclareTuneValueParam(TweakParam1, ui32, Param1); +DeclareTuneValueParam(TweakParam2, ui32, Param2); + Y_UNIT_TEST_SUITE(TestTuning) { Y_UNIT_TEST(Defaults) { - using TTuned = TTune<TDefaults>; - using TunedA = TTuned::TStructA; - using TunedB = TTuned::TStructB; - auto sameA = std::is_same<TDefaultStructA, TunedA>::value; - auto sameB = std::is_same<TDefaultStructB, TunedB>::value; - auto param1 = TTuned::Param1; - auto param2 = TTuned::Param2; - - UNIT_ASSERT(sameA); - UNIT_ASSERT(sameB); - UNIT_ASSERT_EQUAL(param1, 42); - UNIT_ASSERT_EQUAL(param2, 42); - } - + using TTuned = TTune<TDefaults>; + using TunedA = TTuned::TStructA; + using TunedB = TTuned::TStructB; + auto sameA = std::is_same<TDefaultStructA, TunedA>::value; + auto sameB = std::is_same<TDefaultStructB, TunedB>::value; + auto param1 = TTuned::Param1; + auto param2 = TTuned::Param2; + + UNIT_ASSERT(sameA); + UNIT_ASSERT(sameB); + UNIT_ASSERT_EQUAL(param1, 42); + UNIT_ASSERT_EQUAL(param2, 42); + } + Y_UNIT_TEST(TuneStructA) { - struct TMyStruct { - }; - - using TTuned = TTune<TDefaults, TweakStructA<TMyStruct>>; - - using TunedA = TTuned::TStructA; - using TunedB = TTuned::TStructB; - //auto sameA = std::is_same<TDefaultStructA, TunedA>::value; - auto sameB = std::is_same<TDefaultStructB, TunedB>::value; - auto param1 = TTuned::Param1; - auto param2 = TTuned::Param2; - - auto sameA = std::is_same<TMyStruct, TunedA>::value; - - UNIT_ASSERT(sameA); - UNIT_ASSERT(sameB); - UNIT_ASSERT_EQUAL(param1, 42); - UNIT_ASSERT_EQUAL(param2, 42); - } - + struct TMyStruct { + }; + + using TTuned = TTune<TDefaults, TweakStructA<TMyStruct>>; + + using TunedA = TTuned::TStructA; + using TunedB = TTuned::TStructB; + //auto sameA = std::is_same<TDefaultStructA, TunedA>::value; + auto sameB = std::is_same<TDefaultStructB, TunedB>::value; + auto param1 = TTuned::Param1; + auto param2 = TTuned::Param2; + + auto sameA = std::is_same<TMyStruct, TunedA>::value; + + UNIT_ASSERT(sameA); + UNIT_ASSERT(sameB); + UNIT_ASSERT_EQUAL(param1, 42); + UNIT_ASSERT_EQUAL(param2, 42); + } + Y_UNIT_TEST(TuneParam1) { - using TTuned = TTune<TDefaults, TweakParam1<24>>; - - using TunedA = TTuned::TStructA; - using TunedB = TTuned::TStructB; - auto sameA = std::is_same<TDefaultStructA, TunedA>::value; - auto sameB = std::is_same<TDefaultStructB, TunedB>::value; - auto param1 = TTuned::Param1; - auto param2 = TTuned::Param2; - - UNIT_ASSERT(sameA); - UNIT_ASSERT(sameB); - UNIT_ASSERT_EQUAL(param1, 24); - UNIT_ASSERT_EQUAL(param2, 42); - } - + using TTuned = TTune<TDefaults, TweakParam1<24>>; + + using TunedA = TTuned::TStructA; + using TunedB = TTuned::TStructB; + auto sameA = std::is_same<TDefaultStructA, TunedA>::value; + auto sameB = std::is_same<TDefaultStructB, TunedB>::value; + auto param1 = TTuned::Param1; + auto param2 = TTuned::Param2; + + UNIT_ASSERT(sameA); + UNIT_ASSERT(sameB); + UNIT_ASSERT_EQUAL(param1, 24); + UNIT_ASSERT_EQUAL(param2, 42); + } + Y_UNIT_TEST(TuneStructAAndParam1) { - struct TMyStruct { - }; - - using TTuned = - TTune<TDefaults, TweakStructA<TMyStruct>, TweakParam1<24>>; - - using TunedA = TTuned::TStructA; - using TunedB = TTuned::TStructB; - //auto sameA = std::is_same<TDefaultStructA, TunedA>::value; - auto sameB = std::is_same<TDefaultStructB, TunedB>::value; - auto param1 = TTuned::Param1; - auto param2 = TTuned::Param2; - - auto sameA = std::is_same<TMyStruct, TunedA>::value; - - UNIT_ASSERT(sameA); - UNIT_ASSERT(sameB); - UNIT_ASSERT_EQUAL(param1, 24); - UNIT_ASSERT_EQUAL(param2, 42); - } - + struct TMyStruct { + }; + + using TTuned = + TTune<TDefaults, TweakStructA<TMyStruct>, TweakParam1<24>>; + + using TunedA = TTuned::TStructA; + using TunedB = TTuned::TStructB; + //auto sameA = std::is_same<TDefaultStructA, TunedA>::value; + auto sameB = std::is_same<TDefaultStructB, TunedB>::value; + auto param1 = TTuned::Param1; + auto param2 = TTuned::Param2; + + auto sameA = std::is_same<TMyStruct, TunedA>::value; + + UNIT_ASSERT(sameA); + UNIT_ASSERT(sameB); + UNIT_ASSERT_EQUAL(param1, 24); + UNIT_ASSERT_EQUAL(param2, 42); + } + Y_UNIT_TEST(TuneParam1AndStructA) { - struct TMyStruct { - }; - - using TTuned = - TTune<TDefaults, TweakParam1<24>, TweakStructA<TMyStruct>>; - - using TunedA = TTuned::TStructA; - using TunedB = TTuned::TStructB; - //auto sameA = std::is_same<TDefaultStructA, TunedA>::value; - auto sameB = std::is_same<TDefaultStructB, TunedB>::value; - auto param1 = TTuned::Param1; - auto param2 = TTuned::Param2; - - auto sameA = std::is_same<TMyStruct, TunedA>::value; - - UNIT_ASSERT(sameA); - UNIT_ASSERT(sameB); - UNIT_ASSERT_EQUAL(param1, 24); - UNIT_ASSERT_EQUAL(param2, 42); - } -} + struct TMyStruct { + }; + + using TTuned = + TTune<TDefaults, TweakParam1<24>, TweakStructA<TMyStruct>>; + + using TunedA = TTuned::TStructA; + using TunedB = TTuned::TStructB; + //auto sameA = std::is_same<TDefaultStructA, TunedA>::value; + auto sameB = std::is_same<TDefaultStructB, TunedB>::value; + auto param1 = TTuned::Param1; + auto param2 = TTuned::Param2; + + auto sameA = std::is_same<TMyStruct, TunedA>::value; + + UNIT_ASSERT(sameA); + UNIT_ASSERT(sameB); + UNIT_ASSERT_EQUAL(param1, 24); + UNIT_ASSERT_EQUAL(param2, 42); + } +} diff --git a/library/cpp/threading/queue/unordered_ut.cpp b/library/cpp/threading/queue/unordered_ut.cpp index a43b7f520e..2018538bf7 100644 --- a/library/cpp/threading/queue/unordered_ut.cpp +++ b/library/cpp/threading/queue/unordered_ut.cpp @@ -1,154 +1,154 @@ #include <library/cpp/testing/unittest/registar.h> -#include <util/system/thread.h> -#include <algorithm> -#include <util/generic/vector.h> -#include <util/random/fast.h> - -#include "ut_helpers.h" - +#include <util/system/thread.h> +#include <algorithm> +#include <util/generic/vector.h> +#include <util/random/fast.h> + +#include "ut_helpers.h" + template <typename TQueueType> -class TTestUnorderedQueue: public TTestBase { -private: - using TLink = TIntrusiveLink; - +class TTestUnorderedQueue: public TTestBase { +private: + using TLink = TIntrusiveLink; + UNIT_TEST_SUITE_DEMANGLE(TTestUnorderedQueue<TQueueType>); - UNIT_TEST(Push1M_Pop1M_Unordered) - UNIT_TEST_SUITE_END(); - -public: - void Push1M_Pop1M_Unordered() { - constexpr int REPEAT = 1000000; + UNIT_TEST(Push1M_Pop1M_Unordered) + UNIT_TEST_SUITE_END(); + +public: + void Push1M_Pop1M_Unordered() { + constexpr int REPEAT = 1000000; TQueueType queue; - TLink msg[REPEAT]; - - auto pmsg = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); - - for (int i = 0; i < REPEAT; ++i) { - queue.Push(&msg[i]); - } - + TLink msg[REPEAT]; + + auto pmsg = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); + + for (int i = 0; i < REPEAT; ++i) { + queue.Push(&msg[i]); + } + TVector<TLink*> popped; - popped.reserve(REPEAT); - for (int i = 0; i < REPEAT; ++i) { - popped.push_back((TLink*)queue.Pop()); - } - - pmsg = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); - - std::sort(popped.begin(), popped.end()); - for (int i = 0; i < REPEAT; ++i) { - UNIT_ASSERT_VALUES_EQUAL(&msg[i], popped[i]); - } - } -}; - + popped.reserve(REPEAT); + for (int i = 0; i < REPEAT; ++i) { + popped.push_back((TLink*)queue.Pop()); + } + + pmsg = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); + + std::sort(popped.begin(), popped.end()); + for (int i = 0; i < REPEAT; ++i) { + UNIT_ASSERT_VALUES_EQUAL(&msg[i], popped[i]); + } + } +}; + template <typename TQueueType> -class TTestWeakQueue: public TTestBase { -private: +class TTestWeakQueue: public TTestBase { +private: UNIT_TEST_SUITE_DEMANGLE(TTestWeakQueue<TQueueType>); - UNIT_TEST(Threads8_Rnd_Exchange) - UNIT_TEST_SUITE_END(); - -public: - template <ui16 COUNT = 48, ui32 MSG_COUNT = 10000> - void ManyThreadsRndExchange() { + UNIT_TEST(Threads8_Rnd_Exchange) + UNIT_TEST_SUITE_END(); + +public: + template <ui16 COUNT = 48, ui32 MSG_COUNT = 10000> + void ManyThreadsRndExchange() { TQueueType queues[COUNT]; - + class TWorker: public ISimpleThread { - public: - TWorker( + public: + TWorker( TQueueType* queues_, ui16 mine, TAtomic* pushDone) - : Queues(queues_) - , MineQueue(mine) - , PushDone(pushDone) - { - } - + : Queues(queues_) + , MineQueue(mine) + , PushDone(pushDone) + { + } + TQueueType* Queues; - ui16 MineQueue; + ui16 MineQueue; TVector<uintptr_t> Received; - TAtomic* PushDone; - - void* ThreadProc() override { - TReallyFastRng32 rng(GetCycleCount()); - Received.reserve(MSG_COUNT * 2); - - for (ui32 loop = 1; loop <= MSG_COUNT; ++loop) { - for (;;) { - auto msg = Queues[MineQueue].Pop(); - if (msg == nullptr) { - break; - } - - Received.push_back((uintptr_t)msg); - } - - ui16 rnd = rng.GenRand64() % COUNT; - ui64 msg = ((ui64)MineQueue << 32) + loop; - while (!Queues[rnd].Push((void*)msg)) { - } - } - - AtomicIncrement(*PushDone); - - for (;;) { - bool isItLast = AtomicGet(*PushDone) == COUNT; - auto msg = Queues[MineQueue].Pop(); - if (msg != nullptr) { - Received.push_back((uintptr_t)msg); - } else { - if (isItLast) { - break; - } - SpinLockPause(); - } - } - - for (ui64 last = 0;;) { - auto msg = Queues[MineQueue].UnsafeScanningPop(&last); - if (msg == nullptr) { - break; - } - Received.push_back((uintptr_t)msg); - } - - return nullptr; - } - }; - + TAtomic* PushDone; + + void* ThreadProc() override { + TReallyFastRng32 rng(GetCycleCount()); + Received.reserve(MSG_COUNT * 2); + + for (ui32 loop = 1; loop <= MSG_COUNT; ++loop) { + for (;;) { + auto msg = Queues[MineQueue].Pop(); + if (msg == nullptr) { + break; + } + + Received.push_back((uintptr_t)msg); + } + + ui16 rnd = rng.GenRand64() % COUNT; + ui64 msg = ((ui64)MineQueue << 32) + loop; + while (!Queues[rnd].Push((void*)msg)) { + } + } + + AtomicIncrement(*PushDone); + + for (;;) { + bool isItLast = AtomicGet(*PushDone) == COUNT; + auto msg = Queues[MineQueue].Pop(); + if (msg != nullptr) { + Received.push_back((uintptr_t)msg); + } else { + if (isItLast) { + break; + } + SpinLockPause(); + } + } + + for (ui64 last = 0;;) { + auto msg = Queues[MineQueue].UnsafeScanningPop(&last); + if (msg == nullptr) { + break; + } + Received.push_back((uintptr_t)msg); + } + + return nullptr; + } + }; + TVector<TAutoPtr<TWorker>> workers; - TAtomic pushDone = 0; - - for (ui32 i = 0; i < COUNT; ++i) { - workers.emplace_back(new TWorker(&queues[0], i, &pushDone)); - workers.back()->Start(); - } - + TAtomic pushDone = 0; + + for (ui32 i = 0; i < COUNT; ++i) { + workers.emplace_back(new TWorker(&queues[0], i, &pushDone)); + workers.back()->Start(); + } + TVector<uintptr_t> all; - for (ui32 i = 0; i < COUNT; ++i) { - workers[i]->Join(); - all.insert(all.begin(), + for (ui32 i = 0; i < COUNT; ++i) { + workers[i]->Join(); + all.insert(all.begin(), workers[i]->Received.begin(), workers[i]->Received.end()); - } - - std::sort(all.begin(), all.end()); - auto iter = all.begin(); - for (ui32 i = 0; i < COUNT; ++i) { - for (ui32 k = 1; k <= MSG_COUNT; ++k) { - UNIT_ASSERT_VALUES_EQUAL(((ui64)i << 32) + k, *iter); - ++iter; - } - } - } - - void Threads8_Rnd_Exchange() { - ManyThreadsRndExchange<8>(); - } -}; - -REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TTestUnorderedQueue); -UNIT_TEST_SUITE_REGISTRATION(TTestWeakQueue<TMPMCUnorderedRing>); + } + + std::sort(all.begin(), all.end()); + auto iter = all.begin(); + for (ui32 i = 0; i < COUNT; ++i) { + for (ui32 k = 1; k <= MSG_COUNT; ++k) { + UNIT_ASSERT_VALUES_EQUAL(((ui64)i << 32) + k, *iter); + ++iter; + } + } + } + + void Threads8_Rnd_Exchange() { + ManyThreadsRndExchange<8>(); + } +}; + +REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TTestUnorderedQueue); +UNIT_TEST_SUITE_REGISTRATION(TTestWeakQueue<TMPMCUnorderedRing>); diff --git a/library/cpp/threading/queue/ut/ya.make b/library/cpp/threading/queue/ut/ya.make index 8883d9bf69..dda204155e 100644 --- a/library/cpp/threading/queue/ut/ya.make +++ b/library/cpp/threading/queue/ut/ya.make @@ -1,16 +1,16 @@ UNITTEST_FOR(library/cpp/threading/queue) - + OWNER(agri) - -ALLOCATOR(B) - -SRCS( - basic_ut.cpp - queue_ut.cpp - tune_ut.cpp - unordered_ut.cpp - ut_helpers.cpp - ut_helpers.h -) - -END() + +ALLOCATOR(B) + +SRCS( + basic_ut.cpp + queue_ut.cpp + tune_ut.cpp + unordered_ut.cpp + ut_helpers.cpp + ut_helpers.h +) + +END() diff --git a/library/cpp/threading/queue/ut_helpers.cpp b/library/cpp/threading/queue/ut_helpers.cpp index aa3a831441..342aa125a0 100644 --- a/library/cpp/threading/queue/ut_helpers.cpp +++ b/library/cpp/threading/queue/ut_helpers.cpp @@ -1 +1 @@ -#include "ut_helpers.h" +#include "ut_helpers.h" diff --git a/library/cpp/threading/queue/ut_helpers.h b/library/cpp/threading/queue/ut_helpers.h index 2756b52601..c720366593 100644 --- a/library/cpp/threading/queue/ut_helpers.h +++ b/library/cpp/threading/queue/ut_helpers.h @@ -1,40 +1,40 @@ -#pragma once - -#include "mpsc_read_as_filled.h" -#include "mpsc_htswap.h" -#include "mpsc_vinfarr_obstructive.h" -#include "mpsc_intrusive_unordered.h" -#include "mpmc_unordered_ring.h" - -struct TBasicHTSwap: public NThreading::THTSwapQueue<> { -}; - -struct TBasicReadAsFilled: public NThreading::TReadAsFilledQueue<> { -}; - -struct TBasicObstructiveConsumer +#pragma once + +#include "mpsc_read_as_filled.h" +#include "mpsc_htswap.h" +#include "mpsc_vinfarr_obstructive.h" +#include "mpsc_intrusive_unordered.h" +#include "mpmc_unordered_ring.h" + +struct TBasicHTSwap: public NThreading::THTSwapQueue<> { +}; + +struct TBasicReadAsFilled: public NThreading::TReadAsFilledQueue<> { +}; + +struct TBasicObstructiveConsumer : public NThreading::TObstructiveConsumerQueue<> { -}; - -struct TBasicMPSCIntrusiveUnordered +}; + +struct TBasicMPSCIntrusiveUnordered : public NThreading::TMPSCIntrusiveUnordered { -}; - -struct TIntrusiveLink: public NThreading::TIntrusiveNode { -}; - -struct TMPMCUnorderedRing: public NThreading::TMPMCUnorderedRing { - TMPMCUnorderedRing() - : NThreading::TMPMCUnorderedRing(10000000) - { - } -}; - +}; + +struct TIntrusiveLink: public NThreading::TIntrusiveNode { +}; + +struct TMPMCUnorderedRing: public NThreading::TMPMCUnorderedRing { + TMPMCUnorderedRing() + : NThreading::TMPMCUnorderedRing(10000000) + { + } +}; + #define REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TestTemplate) \ UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicHTSwap>); \ UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicReadAsFilled>); \ - UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicObstructiveConsumer>) - + UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicObstructiveConsumer>) + #define REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TestTemplate) \ - UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicMPSCIntrusiveUnordered>); \ - UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TMPMCUnorderedRing>); + UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicMPSCIntrusiveUnordered>); \ + UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TMPMCUnorderedRing>); diff --git a/library/cpp/threading/queue/ya.make b/library/cpp/threading/queue/ya.make index 6570b38ce5..3a11eb2d92 100644 --- a/library/cpp/threading/queue/ya.make +++ b/library/cpp/threading/queue/ya.make @@ -1,18 +1,18 @@ -LIBRARY() - -OWNER(agri) - -SRCS( - mpmc_unordered_ring.cpp - mpmc_unordered_ring.h - mpsc_htswap.cpp - mpsc_htswap.h - mpsc_intrusive_unordered.cpp - mpsc_intrusive_unordered.h - mpsc_read_as_filled.cpp - mpsc_read_as_filled.h - mpsc_vinfarr_obstructive.cpp - mpsc_vinfarr_obstructive.h -) - -END() +LIBRARY() + +OWNER(agri) + +SRCS( + mpmc_unordered_ring.cpp + mpmc_unordered_ring.h + mpsc_htswap.cpp + mpsc_htswap.h + mpsc_intrusive_unordered.cpp + mpsc_intrusive_unordered.h + mpsc_read_as_filled.cpp + mpsc_read_as_filled.h + mpsc_vinfarr_obstructive.cpp + mpsc_vinfarr_obstructive.h +) + +END() |