diff options
| author | agri <[email protected]> | 2022-02-10 16:48:12 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:48:12 +0300 | 
| commit | 2909866fbc652492b7d7cab3023cb19489dc4fd8 (patch) | |
| tree | b222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/threading/queue | |
| parent | d3530b2692e400bd4d29bd4f07cafaee139164e7 (diff) | |
Restoring authorship annotation for <[email protected]>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading/queue')
19 files changed, 2215 insertions, 2215 deletions
| diff --git a/library/cpp/threading/queue/basic_ut.cpp b/library/cpp/threading/queue/basic_ut.cpp index 2db5d6e8e83..5f56f8583ec 100644 --- a/library/cpp/threading/queue/basic_ut.cpp +++ b/library/cpp/threading/queue/basic_ut.cpp @@ -1,92 +1,92 @@  #include <library/cpp/testing/unittest/registar.h> -#include <util/generic/vector.h>  -#include <util/system/thread.h>  -  -#include "ut_helpers.h"  -  +#include <util/generic/vector.h> +#include <util/system/thread.h> + +#include "ut_helpers.h" +  template <typename TQueueType> -class TQueueTestsInSingleThread: public TTestBase {  -private:  +class TQueueTestsInSingleThread: public TTestBase { +private:      using TSelf = TQueueTestsInSingleThread<TQueueType>; -    using TLink = TIntrusiveLink;  -  -    UNIT_TEST_SUITE_DEMANGLE(TSelf);  -    UNIT_TEST(OnePushOnePop)  -    UNIT_TEST(OnePushOnePop_Repeat1M)  -    UNIT_TEST(Threads8_Repeat1M_Push1Pop1)  -    UNIT_TEST_SUITE_END();  -  -public:  -    void OnePushOnePop() {  +    using TLink = TIntrusiveLink; + +    UNIT_TEST_SUITE_DEMANGLE(TSelf); +    UNIT_TEST(OnePushOnePop) +    UNIT_TEST(OnePushOnePop_Repeat1M) +    UNIT_TEST(Threads8_Repeat1M_Push1Pop1) +    UNIT_TEST_SUITE_END(); + +public: +    void OnePushOnePop() {          TQueueType queue; -  -        auto popped = queue.Pop();  -        UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);  -  -        TLink msg;  -        queue.Push(&msg);  -        popped = queue.Pop();  -        UNIT_ASSERT_VALUES_EQUAL(&msg, popped);  -  -        popped = queue.Pop();  -        UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);  -    };  -  -    void OnePushOnePop_Repeat1M() {  + +        auto popped = queue.Pop(); +        UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); + +        TLink msg; +        queue.Push(&msg); +        popped = queue.Pop(); +        UNIT_ASSERT_VALUES_EQUAL(&msg, popped); + +        popped = queue.Pop(); +        UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); +    }; + +    void OnePushOnePop_Repeat1M() {          TQueueType queue; -        TLink msg;  -  -        auto popped = queue.Pop();  -        UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);  -  -        for (int i = 0; i < 1000000; ++i) {  -            queue.Push(&msg);  -            popped = queue.Pop();  -            UNIT_ASSERT_VALUES_EQUAL(&msg, popped);  -  -            popped = queue.Pop();  -            UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);  -        }  -    }  -  -    template <size_t NUMBER_OF_THREADS>  -    void RepeatPush1Pop1_InManyThreads() {  +        TLink msg; + +        auto popped = queue.Pop(); +        UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); + +        for (int i = 0; i < 1000000; ++i) { +            queue.Push(&msg); +            popped = queue.Pop(); +            UNIT_ASSERT_VALUES_EQUAL(&msg, popped); + +            popped = queue.Pop(); +            UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); +        } +    } + +    template <size_t NUMBER_OF_THREADS> +    void RepeatPush1Pop1_InManyThreads() {          class TCycleThread: public ISimpleThread { -        public:  -            void* ThreadProc() override {  +        public: +            void* ThreadProc() override {                  TQueueType queue; -                TLink msg;  -                auto popped = queue.Pop();  -                UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);  -  -                for (size_t i = 0; i < 1000000; ++i) {  -                    queue.Push(&msg);  -                    popped = queue.Pop();  -                    UNIT_ASSERT_VALUES_EQUAL(popped, &msg);  -  -                    popped = queue.Pop();  -                    UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);  -                }  -                return nullptr;  -            }  -        };  -  +                TLink msg; +                auto popped = queue.Pop(); +                UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); + +                for (size_t i = 0; i < 1000000; ++i) { +                    queue.Push(&msg); +                    popped = queue.Pop(); +                    UNIT_ASSERT_VALUES_EQUAL(popped, &msg); + +                    popped = queue.Pop(); +                    UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); +                } +                return nullptr; +            } +        }; +          TVector<TAutoPtr<TCycleThread>> cyclers; -  -        for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) {  -            cyclers.emplace_back(new TCycleThread);  -            cyclers.back()->Start();  -        }  -  -        for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) {  -            cyclers[i]->Join();  -        }  -    }  -  -    void Threads8_Repeat1M_Push1Pop1() {  -        RepeatPush1Pop1_InManyThreads<8>();  -    }  -};  -  -REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestsInSingleThread);  -REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TQueueTestsInSingleThread)  + +        for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) { +            cyclers.emplace_back(new TCycleThread); +            cyclers.back()->Start(); +        } + +        for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) { +            cyclers[i]->Join(); +        } +    } + +    void Threads8_Repeat1M_Push1Pop1() { +        RepeatPush1Pop1_InManyThreads<8>(); +    } +}; + +REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestsInSingleThread); +REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TQueueTestsInSingleThread) diff --git a/library/cpp/threading/queue/mpmc_unordered_ring.cpp b/library/cpp/threading/queue/mpmc_unordered_ring.cpp index df48182210f..160547f5946 100644 --- a/library/cpp/threading/queue/mpmc_unordered_ring.cpp +++ b/library/cpp/threading/queue/mpmc_unordered_ring.cpp @@ -1,74 +1,74 @@ -#include "mpmc_unordered_ring.h"  -  -namespace NThreading {  -    TMPMCUnorderedRing::TMPMCUnorderedRing(size_t size) {  -        Y_VERIFY(size > 0);  -        RingSize = size;  -        RingBuffer.Reset(new void*[size]);  -        memset(&RingBuffer[0], 0, sizeof(void*) * size);  -    }  -  -    bool TMPMCUnorderedRing::Push(void* msg, ui16 retryCount) noexcept {  -        if (retryCount == 0) {  -            StubbornPush(msg);  -            return true;  -        }  -        for (ui16 itry = retryCount; itry-- > 0;) {  -            if (WeakPush(msg)) {  -                return true;  -            }  -        }  -        return false;  -    }  -  -    bool TMPMCUnorderedRing::WeakPush(void* msg) noexcept {  -        auto pawl = AtomicIncrement(WritePawl);  -        if (pawl - AtomicGet(ReadFront) >= RingSize) {  -            // Queue is full  -            AtomicDecrement(WritePawl);  -            return false;  -        }  -  -        auto writeSlot = AtomicGetAndIncrement(WriteFront);  -        if (AtomicCas(&RingBuffer[writeSlot % RingSize], msg, nullptr)) {  -            return true;  -        }  -        // slot is occupied for some reason, retry  -        return false;  -    }  -  -    void* TMPMCUnorderedRing::Pop() noexcept {  -        ui64 readSlot;  -  -        for (ui16 itry = MAX_POP_TRIES; itry-- > 0;) {  -            auto pawl = AtomicIncrement(ReadPawl);  -            if (pawl > AtomicGet(WriteFront)) {  -                // Queue is empty  -                AtomicDecrement(ReadPawl);  -                return nullptr;  -            }  -  -            readSlot = AtomicGetAndIncrement(ReadFront);  -  -            auto msg = AtomicSwap(&RingBuffer[readSlot % RingSize], nullptr);  -            if (msg != nullptr) {  -                return msg;  -            }  -        }  -  -        /* got no message in the slot, let's try to rollback readfront */  -        AtomicCas(&ReadFront, readSlot - 1, readSlot);  -        return nullptr;  -    }  -  -    void* TMPMCUnorderedRing::UnsafeScanningPop(ui64* last) noexcept {  -        for (; *last < RingSize;) {  -            auto msg = AtomicSwap(&RingBuffer[*last], nullptr);  -            ++*last;  -            if (msg != nullptr) {  -                return msg;  -            }  -        }  -        return nullptr;  -    }  -}  +#include "mpmc_unordered_ring.h" + +namespace NThreading { +    TMPMCUnorderedRing::TMPMCUnorderedRing(size_t size) { +        Y_VERIFY(size > 0); +        RingSize = size; +        RingBuffer.Reset(new void*[size]); +        memset(&RingBuffer[0], 0, sizeof(void*) * size); +    } + +    bool TMPMCUnorderedRing::Push(void* msg, ui16 retryCount) noexcept { +        if (retryCount == 0) { +            StubbornPush(msg); +            return true; +        } +        for (ui16 itry = retryCount; itry-- > 0;) { +            if (WeakPush(msg)) { +                return true; +            } +        } +        return false; +    } + +    bool TMPMCUnorderedRing::WeakPush(void* msg) noexcept { +        auto pawl = AtomicIncrement(WritePawl); +        if (pawl - AtomicGet(ReadFront) >= RingSize) { +            // Queue is full +            AtomicDecrement(WritePawl); +            return false; +        } + +        auto writeSlot = AtomicGetAndIncrement(WriteFront); +        if (AtomicCas(&RingBuffer[writeSlot % RingSize], msg, nullptr)) { +            return true; +        } +        // slot is occupied for some reason, retry +        return false; +    } + +    void* TMPMCUnorderedRing::Pop() noexcept { +        ui64 readSlot; + +        for (ui16 itry = MAX_POP_TRIES; itry-- > 0;) { +            auto pawl = AtomicIncrement(ReadPawl); +            if (pawl > AtomicGet(WriteFront)) { +                // Queue is empty +                AtomicDecrement(ReadPawl); +                return nullptr; +            } + +            readSlot = AtomicGetAndIncrement(ReadFront); + +            auto msg = AtomicSwap(&RingBuffer[readSlot % RingSize], nullptr); +            if (msg != nullptr) { +                return msg; +            } +        } + +        /* got no message in the slot, let's try to rollback readfront */ +        AtomicCas(&ReadFront, readSlot - 1, readSlot); +        return nullptr; +    } + +    void* TMPMCUnorderedRing::UnsafeScanningPop(ui64* last) noexcept { +        for (; *last < RingSize;) { +            auto msg = AtomicSwap(&RingBuffer[*last], nullptr); +            ++*last; +            if (msg != nullptr) { +                return msg; +            } +        } +        return nullptr; +    } +} diff --git a/library/cpp/threading/queue/mpmc_unordered_ring.h b/library/cpp/threading/queue/mpmc_unordered_ring.h index 59758d2c352..5042f7528e8 100644 --- a/library/cpp/threading/queue/mpmc_unordered_ring.h +++ b/library/cpp/threading/queue/mpmc_unordered_ring.h @@ -1,42 +1,42 @@ -#pragma once  -  -/*  -  It's not a general purpose queue.  -  No order guarantee, but it mostly ordered.  -  Items may stuck in almost empty queue.  -  Use UnsafeScanningPop to pop all stuck items.  -  Almost wait-free for producers and consumers.  - */  -  -#include <util/system/atomic.h>  -#include <util/generic/ptr.h>  -  -namespace NThreading {  -    struct TMPMCUnorderedRing {  -    public:  -        static constexpr ui16 MAX_PUSH_TRIES = 4;  -        static constexpr ui16 MAX_POP_TRIES = 4;  -  -        TMPMCUnorderedRing(size_t size);  -  -        bool Push(void* msg, ui16 retryCount = MAX_PUSH_TRIES) noexcept;  -        void StubbornPush(void* msg) {  -            while (!WeakPush(msg)) {  -            }  -        }  -  -        void* Pop() noexcept;  -  -        void* UnsafeScanningPop(ui64* last) noexcept;  -  -    private:  -        bool WeakPush(void* msg) noexcept;  -  -        size_t RingSize;  -        TArrayPtr<void*> RingBuffer;  -        ui64 WritePawl = 0;  -        ui64 WriteFront = 0;  -        ui64 ReadPawl = 0;  -        ui64 ReadFront = 0;  -    };  -}  +#pragma once + +/* +  It's not a general purpose queue. +  No order guarantee, but it mostly ordered. +  Items may stuck in almost empty queue. +  Use UnsafeScanningPop to pop all stuck items. +  Almost wait-free for producers and consumers. + */ + +#include <util/system/atomic.h> +#include <util/generic/ptr.h> + +namespace NThreading { +    struct TMPMCUnorderedRing { +    public: +        static constexpr ui16 MAX_PUSH_TRIES = 4; +        static constexpr ui16 MAX_POP_TRIES = 4; + +        TMPMCUnorderedRing(size_t size); + +        bool Push(void* msg, ui16 retryCount = MAX_PUSH_TRIES) noexcept; +        void StubbornPush(void* msg) { +            while (!WeakPush(msg)) { +            } +        } + +        void* Pop() noexcept; + +        void* UnsafeScanningPop(ui64* last) noexcept; + +    private: +        bool WeakPush(void* msg) noexcept; + +        size_t RingSize; +        TArrayPtr<void*> RingBuffer; +        ui64 WritePawl = 0; +        ui64 WriteFront = 0; +        ui64 ReadPawl = 0; +        ui64 ReadFront = 0; +    }; +} diff --git a/library/cpp/threading/queue/mpsc_htswap.cpp b/library/cpp/threading/queue/mpsc_htswap.cpp index d8ab0d4f488..610c8f67f13 100644 --- a/library/cpp/threading/queue/mpsc_htswap.cpp +++ b/library/cpp/threading/queue/mpsc_htswap.cpp @@ -1 +1 @@ -#include "mpsc_htswap.h"  +#include "mpsc_htswap.h" diff --git a/library/cpp/threading/queue/mpsc_htswap.h b/library/cpp/threading/queue/mpsc_htswap.h index 2d0bfd1146a..c42caa7ac02 100644 --- a/library/cpp/threading/queue/mpsc_htswap.h +++ b/library/cpp/threading/queue/mpsc_htswap.h @@ -1,132 +1,132 @@ -#pragma once  -  -/*  -  http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue  -  -  Simple semi-wait-free queue. Many producers - one consumer.  -  Tracking of allocated memory is not required.  -  No CAS. Only atomic swap (exchange) operations.  -  -  WARNING: a sleeping producer can stop progress for consumer.  -  -  WARNING: there is no wait¬ify mechanic for consumer,  -  consumer receives nullptr if queue was empty.  -  -  WARNING: the algorithm itself is lock-free  -  but producers and consumer could be blocked by memory allocator  -  -  Reference design: rtmapreduce/libs/threading/lfqueue.h  - */  -  -#include <util/generic/noncopyable.h>  -#include <util/system/types.h>  -#include <util/system/atomic.h>  -  -#include "tune.h"  -  -namespace NThreading {  -    namespace NHTSwapPrivate {  -        template <typename T, typename TTuneup>  -        struct TNode  +#pragma once + +/* +  http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue + +  Simple semi-wait-free queue. Many producers - one consumer. +  Tracking of allocated memory is not required. +  No CAS. Only atomic swap (exchange) operations. + +  WARNING: a sleeping producer can stop progress for consumer. + +  WARNING: there is no wait¬ify mechanic for consumer, +  consumer receives nullptr if queue was empty. + +  WARNING: the algorithm itself is lock-free +  but producers and consumer could be blocked by memory allocator + +  Reference design: rtmapreduce/libs/threading/lfqueue.h + */ + +#include <util/generic/noncopyable.h> +#include <util/system/types.h> +#include <util/system/atomic.h> + +#include "tune.h" + +namespace NThreading { +    namespace NHTSwapPrivate { +        template <typename T, typename TTuneup> +        struct TNode             : public TTuneup::TNodeBase,                public TTuneup::template TNodeLayout<TNode<T, TTuneup>, T> { -            TNode(const T& item) {  -                this->Next = nullptr;  -                this->Item = item;  -            }  -  -            TNode(T&& item) {  -                this->Next = nullptr;  -                this->Item = std::move(item);  -            }  -        };  -  -        struct TDefaultTuneup {  -            struct TNodeBase: private TNonCopyable {  -            };  -  -            template <typename TNode, typename T>  -            struct TNodeLayout {  -                TNode* Next;  -                T Item;  -            };  -  -            template <typename TNode>  -            struct TQueueLayout {  -                TNode* Head;  -                TNode* Tail;  -            };  -        };  -  -        template <typename T, typename TTuneup>  -        class THTSwapQueueImpl  +            TNode(const T& item) { +                this->Next = nullptr; +                this->Item = item; +            } + +            TNode(T&& item) { +                this->Next = nullptr; +                this->Item = std::move(item); +            } +        }; + +        struct TDefaultTuneup { +            struct TNodeBase: private TNonCopyable { +            }; + +            template <typename TNode, typename T> +            struct TNodeLayout { +                TNode* Next; +                T Item; +            }; + +            template <typename TNode> +            struct TQueueLayout { +                TNode* Head; +                TNode* Tail; +            }; +        }; + +        template <typename T, typename TTuneup> +        class THTSwapQueueImpl             : protected  TTuneup::template TQueueLayout<TNode<T, TTuneup>> { -        protected:  -            using TTunedNode = TNode<T, TTuneup>;  -  -        public:  -            using TItem = T;  -  -            THTSwapQueueImpl() {  -                this->Head = new TTunedNode(T());  -                this->Tail = this->Head;  -            }  -  -            ~THTSwapQueueImpl() {  -                TTunedNode* node = this->Head;  -                while (node != nullptr) {  -                    TTunedNode* next = node->Next;  -                    delete node;  -                    node = next;  -                }  -            }  -  -            template <typename TT>  -            void Push(TT&& item) {  -                Enqueue(new TTunedNode(std::forward<TT>(item)));  -            }  -  -            T Peek() {  -                TTunedNode* next = AtomicGet(this->Head->Next);  -                if (next == nullptr) {  -                    return T();  -                }  -                return next->Item;  -            }  -  -            void Enqueue(TTunedNode* node) {  -                // our goal is to avoid expensive CAS here,  -                // but now consumer will be blocked until new tail linked.  -                // fortunately 'window of inconsistency' is extremely small.  -                TTunedNode* prev = AtomicSwap(&this->Tail, node);  -                AtomicSet(prev->Next, node);  -            }  -  -            T Pop() {  -                TTunedNode* next = AtomicGet(this->Head->Next);  -                if (next == nullptr) {  -                    return nullptr;  -                }  -                auto item = std::move(next->Item);  -                std::swap(this->Head, next); // no need atomic here  -                delete next;  -                return item;  -            }  -  -            bool IsEmpty() const {  -                TTunedNode* next = AtomicGet(this->Head->Next);  -                return (next == nullptr);  -            }  -        };  -    }  -  -    DeclareTuneTypeParam(THTSwapNodeBase, TNodeBase);  -    DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout);  -    DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout);  -  +        protected: +            using TTunedNode = TNode<T, TTuneup>; + +        public: +            using TItem = T; + +            THTSwapQueueImpl() { +                this->Head = new TTunedNode(T()); +                this->Tail = this->Head; +            } + +            ~THTSwapQueueImpl() { +                TTunedNode* node = this->Head; +                while (node != nullptr) { +                    TTunedNode* next = node->Next; +                    delete node; +                    node = next; +                } +            } + +            template <typename TT> +            void Push(TT&& item) { +                Enqueue(new TTunedNode(std::forward<TT>(item))); +            } + +            T Peek() { +                TTunedNode* next = AtomicGet(this->Head->Next); +                if (next == nullptr) { +                    return T(); +                } +                return next->Item; +            } + +            void Enqueue(TTunedNode* node) { +                // our goal is to avoid expensive CAS here, +                // but now consumer will be blocked until new tail linked. +                // fortunately 'window of inconsistency' is extremely small. +                TTunedNode* prev = AtomicSwap(&this->Tail, node); +                AtomicSet(prev->Next, node); +            } + +            T Pop() { +                TTunedNode* next = AtomicGet(this->Head->Next); +                if (next == nullptr) { +                    return nullptr; +                } +                auto item = std::move(next->Item); +                std::swap(this->Head, next); // no need atomic here +                delete next; +                return item; +            } + +            bool IsEmpty() const { +                TTunedNode* next = AtomicGet(this->Head->Next); +                return (next == nullptr); +            } +        }; +    } + +    DeclareTuneTypeParam(THTSwapNodeBase, TNodeBase); +    DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout); +    DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout); +      template <typename T = void*, typename... TParams> -    class THTSwapQueue  +    class THTSwapQueue         : public NHTSwapPrivate::THTSwapQueueImpl<T,                                                    TTune<NHTSwapPrivate::TDefaultTuneup, TParams...>> { -    };  -}  +    }; +} diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp index a6a2fcef398..3bb1a04f7e9 100644 --- a/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp +++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp @@ -1,79 +1,79 @@ -#include "mpsc_intrusive_unordered.h"  -#include <util/system/atomic.h>  -  -namespace NThreading {  -    void TMPSCIntrusiveUnordered::Push(TIntrusiveNode* node) noexcept {  -        auto head = AtomicGet(HeadForCaS);  -        for (ui32 i = NUMBER_OF_TRIES_FOR_CAS; i-- > 0;) {  -            // no ABA here, because Next is exactly head  -            // it does not matter how many travels head was made/  -            node->Next = head;  -            auto prev = AtomicGetAndCas(&HeadForCaS, node, head);  -            if (head == prev) {  -                return;  -            }  -            head = prev;  -        }  -        // boring of trying to do cas, let's just swap  -  -        // no need for atomic here, because the next is atomic swap  -        node->Next = 0;  -  -        head = AtomicSwap(&HeadForSwap, node);  -        if (head != nullptr) {  -            AtomicSet(node->Next, head);  -        } else {  -            // consumer must know if no other thread may access the memory,  -            // setting Next to node is a way to notify consumer  -            AtomicSet(node->Next, node);  -        }  -    }  -  -    TIntrusiveNode* TMPSCIntrusiveUnordered::PopMany() noexcept {  -        if (NotReadyChain == nullptr) {  -            auto head = AtomicSwap(&HeadForSwap, nullptr);  -            NotReadyChain = head;  -        }  -  -        if (NotReadyChain != nullptr) {  -            auto next = AtomicGet(NotReadyChain->Next);  -            if (next != nullptr) {  -                auto ready = NotReadyChain;  -                TIntrusiveNode* cut;  -                do {  -                    cut = NotReadyChain;  -                    NotReadyChain = next;  -                    next = AtomicGet(NotReadyChain->Next);  -                    if (next == NotReadyChain) {  -                        cut = NotReadyChain;  -                        NotReadyChain = nullptr;  -                        break;  -                    }  -                } while (next != nullptr);  -                cut->Next = nullptr;  -                return ready;  -            }  -        }  -  -        if (AtomicGet(HeadForCaS) != nullptr) {  -            return AtomicSwap(&HeadForCaS, nullptr);  -        }  -        return nullptr;  -    }  -  -    TIntrusiveNode* TMPSCIntrusiveUnordered::Pop() noexcept {  -        if (PopOneQueue != nullptr) {  -            auto head = PopOneQueue;  -            PopOneQueue = PopOneQueue->Next;  -            return head;  -        }  -  -        PopOneQueue = PopMany();  -        if (PopOneQueue != nullptr) {  -            auto head = PopOneQueue;  -            PopOneQueue = PopOneQueue->Next;  -            return head;  -        }  -        return nullptr;  -    }  -}  +#include "mpsc_intrusive_unordered.h" +#include <util/system/atomic.h> + +namespace NThreading { +    void TMPSCIntrusiveUnordered::Push(TIntrusiveNode* node) noexcept { +        auto head = AtomicGet(HeadForCaS); +        for (ui32 i = NUMBER_OF_TRIES_FOR_CAS; i-- > 0;) { +            // no ABA here, because Next is exactly head +            // it does not matter how many travels head was made/ +            node->Next = head; +            auto prev = AtomicGetAndCas(&HeadForCaS, node, head); +            if (head == prev) { +                return; +            } +            head = prev; +        } +        // boring of trying to do cas, let's just swap + +        // no need for atomic here, because the next is atomic swap +        node->Next = 0; + +        head = AtomicSwap(&HeadForSwap, node); +        if (head != nullptr) { +            AtomicSet(node->Next, head); +        } else { +            // consumer must know if no other thread may access the memory, +            // setting Next to node is a way to notify consumer +            AtomicSet(node->Next, node); +        } +    } + +    TIntrusiveNode* TMPSCIntrusiveUnordered::PopMany() noexcept { +        if (NotReadyChain == nullptr) { +            auto head = AtomicSwap(&HeadForSwap, nullptr); +            NotReadyChain = head; +        } + +        if (NotReadyChain != nullptr) { +            auto next = AtomicGet(NotReadyChain->Next); +            if (next != nullptr) { +                auto ready = NotReadyChain; +                TIntrusiveNode* cut; +                do { +                    cut = NotReadyChain; +                    NotReadyChain = next; +                    next = AtomicGet(NotReadyChain->Next); +                    if (next == NotReadyChain) { +                        cut = NotReadyChain; +                        NotReadyChain = nullptr; +                        break; +                    } +                } while (next != nullptr); +                cut->Next = nullptr; +                return ready; +            } +        } + +        if (AtomicGet(HeadForCaS) != nullptr) { +            return AtomicSwap(&HeadForCaS, nullptr); +        } +        return nullptr; +    } + +    TIntrusiveNode* TMPSCIntrusiveUnordered::Pop() noexcept { +        if (PopOneQueue != nullptr) { +            auto head = PopOneQueue; +            PopOneQueue = PopOneQueue->Next; +            return head; +        } + +        PopOneQueue = PopMany(); +        if (PopOneQueue != nullptr) { +            auto head = PopOneQueue; +            PopOneQueue = PopOneQueue->Next; +            return head; +        } +        return nullptr; +    } +} diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.h b/library/cpp/threading/queue/mpsc_intrusive_unordered.h index c07cf761f67..6ac7537ae9a 100644 --- a/library/cpp/threading/queue/mpsc_intrusive_unordered.h +++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.h @@ -1,35 +1,35 @@ -#pragma once  -  -/*  -  Simple almost-wait-free unordered queue for low contention operations.  -  -  It's wait-free for producers.  -  Hanging producer can hide some items from consumer.  - */  -  -#include <util/system/types.h>  -  -namespace NThreading {  -    struct TIntrusiveNode {  -        TIntrusiveNode* Next;  -    };  -  -    class TMPSCIntrusiveUnordered {  -    public:  -        static constexpr ui32 NUMBER_OF_TRIES_FOR_CAS = 3;  -  -        void Push(TIntrusiveNode* node) noexcept;  -        TIntrusiveNode* PopMany() noexcept;  -        TIntrusiveNode* Pop() noexcept;  -  -        void Push(void* node) noexcept {  -            Push(reinterpret_cast<TIntrusiveNode*>(node));  -        }  - -    private:  -        TIntrusiveNode* HeadForCaS = nullptr;  -        TIntrusiveNode* HeadForSwap = nullptr;  -        TIntrusiveNode* NotReadyChain = nullptr;  -        TIntrusiveNode* PopOneQueue = nullptr;  -    };  -}  +#pragma once + +/* +  Simple almost-wait-free unordered queue for low contention operations. + +  It's wait-free for producers. +  Hanging producer can hide some items from consumer. + */ + +#include <util/system/types.h> + +namespace NThreading { +    struct TIntrusiveNode { +        TIntrusiveNode* Next; +    }; + +    class TMPSCIntrusiveUnordered { +    public: +        static constexpr ui32 NUMBER_OF_TRIES_FOR_CAS = 3; + +        void Push(TIntrusiveNode* node) noexcept; +        TIntrusiveNode* PopMany() noexcept; +        TIntrusiveNode* Pop() noexcept; + +        void Push(void* node) noexcept { +            Push(reinterpret_cast<TIntrusiveNode*>(node)); +        } + +    private: +        TIntrusiveNode* HeadForCaS = nullptr; +        TIntrusiveNode* HeadForSwap = nullptr; +        TIntrusiveNode* NotReadyChain = nullptr; +        TIntrusiveNode* PopOneQueue = nullptr; +    }; +} diff --git a/library/cpp/threading/queue/mpsc_read_as_filled.cpp b/library/cpp/threading/queue/mpsc_read_as_filled.cpp index 3b89fb1df62..8b4664a6f32 100644 --- a/library/cpp/threading/queue/mpsc_read_as_filled.cpp +++ b/library/cpp/threading/queue/mpsc_read_as_filled.cpp @@ -1 +1 @@ -#include "mpsc_read_as_filled.h"  +#include "mpsc_read_as_filled.h" diff --git a/library/cpp/threading/queue/mpsc_read_as_filled.h b/library/cpp/threading/queue/mpsc_read_as_filled.h index 4dfdb1fbbfa..be33ba5a584 100644 --- a/library/cpp/threading/queue/mpsc_read_as_filled.h +++ b/library/cpp/threading/queue/mpsc_read_as_filled.h @@ -1,611 +1,611 @@ -#pragma once  -  -/*  -  Completely wait-free queue, multiple producers - one consumer. Strict order.  -  The queue algorithm is using concept of virtual infinite array.  -  +#pragma once + +/* +  Completely wait-free queue, multiple producers - one consumer. Strict order. +  The queue algorithm is using concept of virtual infinite array. +    A producer takes a number from a counter and atomically increments the counter. -  The number taken is a number of a slot for the producer to put a new message  -  into infinite array.  -  -  Then producer constructs a virtual infinite array by bidirectional linked list  -  of blocks. Each block contains several slots.  -  +  The number taken is a number of a slot for the producer to put a new message +  into infinite array. + +  Then producer constructs a virtual infinite array by bidirectional linked list +  of blocks. Each block contains several slots. +    There is a hint pointer which optimistically points to the last block -  of the list and never goes backward.  -  -  Consumer exploits the property of the hint pointer always going forward  -  to free old blocks eventually. Consumer periodically read the hint pointer  -  and the counter and thus deduce producers which potentially holds the pointer  -  to a block. Consumer can free the block if all that producers filled their  -  slots and left the queue.  -  -  No producer can stop the progress for other producers.  -  -  Consumer can't stop the progress for producers.  -  Consumer can skip not-yet-filled slots and read them later.  -  Thus no producer can stop the progress for consumer.  +  of the list and never goes backward. + +  Consumer exploits the property of the hint pointer always going forward +  to free old blocks eventually. Consumer periodically read the hint pointer +  and the counter and thus deduce producers which potentially holds the pointer +  to a block. Consumer can free the block if all that producers filled their +  slots and left the queue. + +  No producer can stop the progress for other producers. + +  Consumer can't stop the progress for producers. +  Consumer can skip not-yet-filled slots and read them later. +  Thus no producer can stop the progress for consumer.    The algorithm is virtually strictly ordered because it skips slots only -  if it is really does not matter in which order the slots were produced and  -  consumed.  -  -  WARNING: there is no wait¬ify mechanic for consumer,  -  consumer receives nullptr if queue was empty.  -  -  WARNING: though the algorithm itself is completely wait-free  -  but producers and consumer could be blocked by memory allocator  -  +  if it is really does not matter in which order the slots were produced and +  consumed. + +  WARNING: there is no wait¬ify mechanic for consumer, +  consumer receives nullptr if queue was empty. + +  WARNING: though the algorithm itself is completely wait-free +  but producers and consumer could be blocked by memory allocator +    WARNING: copy constructors of the queue are not thread-safe - */  -  -#include <util/generic/deque.h>  -#include <util/generic/ptr.h>  -#include <util/system/atomic.h>  -#include <util/system/spinlock.h>  -  -#include "tune.h"  -  -namespace NThreading {  -    namespace NReadAsFilledPrivate {  -        typedef void* TMsgLink;  -  -        static constexpr ui32 DEFAULT_BUNCH_SIZE = 251;  -  -        struct TEmpty {  -        };  -  -        struct TEmptyAux {  -            TEmptyAux Retrieve() const {  -                return TEmptyAux();  -            }  -  -            void Store(TEmptyAux&) {  -            }  -  -            static constexpr TEmptyAux Zero() {  -                return TEmptyAux();  -            }  -        };  -  -        template <typename TAux>  -        struct TSlot {  -            TMsgLink volatile Msg;  -            TAux AuxiliaryData;  -  -            inline void Store(TAux& aux) {  -                AuxiliaryData.Store(aux);  -            }  -  -            inline TAux Retrieve() const {  -                return AuxiliaryData.Retrieve();  -            }  -  -            static TSlot<TAux> NullElem() {  -                return {nullptr, TAux::Zero()};  -            }  -  -            static TSlot<TAux> Pair(TMsgLink msg, TAux aux) {  -                return {msg, std::move(aux)};  -            }  -        };  -  -        template <>  -        struct TSlot<TEmptyAux> {  -            TMsgLink volatile Msg;  -  -            inline void Store(TEmptyAux&) {  -            }  -  -            inline TEmptyAux Retrieve() const {  -                return TEmptyAux();  -            }  -  -            static TSlot<TEmptyAux> NullElem() {  -                return {nullptr};  -            }  -  -            static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) {  -                return {msg};  -            }  -        };  -  -        enum TPushResult {  -            PUSH_RESULT_OK,  -            PUSH_RESULT_BACKWARD,  -            PUSH_RESULT_FORWARD,  -        };  -  -        template <ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE,  -                  typename TBase = TEmpty,  -                  typename TAux = TEmptyAux>  -        struct TMsgBunch: public TBase {  -            static constexpr size_t RELEASE_SIZE = BUNCH_SIZE * 2;  -  -            ui64 FirstSlot;  -  -            TSlot<TAux> LinkArray[BUNCH_SIZE];  -  -            TMsgBunch* volatile NextBunch;  -            TMsgBunch* volatile BackLink;  -  -            ui64 volatile Token;  -            TMsgBunch* volatile NextToken;  -  -            /* this push can return PUSH_RESULT_BLOCKED */  + */ + +#include <util/generic/deque.h> +#include <util/generic/ptr.h> +#include <util/system/atomic.h> +#include <util/system/spinlock.h> + +#include "tune.h" + +namespace NThreading { +    namespace NReadAsFilledPrivate { +        typedef void* TMsgLink; + +        static constexpr ui32 DEFAULT_BUNCH_SIZE = 251; + +        struct TEmpty { +        }; + +        struct TEmptyAux { +            TEmptyAux Retrieve() const { +                return TEmptyAux(); +            } + +            void Store(TEmptyAux&) { +            } + +            static constexpr TEmptyAux Zero() { +                return TEmptyAux(); +            } +        }; + +        template <typename TAux> +        struct TSlot { +            TMsgLink volatile Msg; +            TAux AuxiliaryData; + +            inline void Store(TAux& aux) { +                AuxiliaryData.Store(aux); +            } + +            inline TAux Retrieve() const { +                return AuxiliaryData.Retrieve(); +            } + +            static TSlot<TAux> NullElem() { +                return {nullptr, TAux::Zero()}; +            } + +            static TSlot<TAux> Pair(TMsgLink msg, TAux aux) { +                return {msg, std::move(aux)}; +            } +        }; + +        template <> +        struct TSlot<TEmptyAux> { +            TMsgLink volatile Msg; + +            inline void Store(TEmptyAux&) { +            } + +            inline TEmptyAux Retrieve() const { +                return TEmptyAux(); +            } + +            static TSlot<TEmptyAux> NullElem() { +                return {nullptr}; +            } + +            static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) { +                return {msg}; +            } +        }; + +        enum TPushResult { +            PUSH_RESULT_OK, +            PUSH_RESULT_BACKWARD, +            PUSH_RESULT_FORWARD, +        }; + +        template <ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE, +                  typename TBase = TEmpty, +                  typename TAux = TEmptyAux> +        struct TMsgBunch: public TBase { +            static constexpr size_t RELEASE_SIZE = BUNCH_SIZE * 2; + +            ui64 FirstSlot; + +            TSlot<TAux> LinkArray[BUNCH_SIZE]; + +            TMsgBunch* volatile NextBunch; +            TMsgBunch* volatile BackLink; + +            ui64 volatile Token; +            TMsgBunch* volatile NextToken; + +            /* this push can return PUSH_RESULT_BLOCKED */              inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) { -                if (Y_UNLIKELY(slot < FirstSlot)) {  -                    return PUSH_RESULT_BACKWARD;  -                }  -  -                if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) {  -                    return PUSH_RESULT_FORWARD;  -                }  -  -                LinkArray[slot - FirstSlot].Store(auxiliary);  -  -                AtomicSet(LinkArray[slot - FirstSlot].Msg, msg);  -                return PUSH_RESULT_OK;  -            }  -  -            inline bool IsSlotHere(ui64 slot) {  -                return slot < FirstSlot + BUNCH_SIZE;  -            }  -  -            inline TMsgLink GetSlot(ui64 slot) const {  -                return AtomicGet(LinkArray[slot - FirstSlot].Msg);  -            }  -  -            inline TSlot<TAux> GetSlotAux(ui64 slot) const {  -                auto msg = GetSlot(slot);  -                auto aux = LinkArray[slot - FirstSlot].Retrieve();  -                return TSlot<TAux>::Pair(msg, aux);  -            }  -  -            inline TMsgBunch* GetNextBunch() const {  -                return AtomicGet(NextBunch);  -            }  -  -            inline bool SetNextBunch(TMsgBunch* ptr) {  -                return AtomicCas(&NextBunch, ptr, nullptr);  -            }  -  -            inline TMsgBunch* GetBackLink() const {  -                return AtomicGet(BackLink);  -            }  -  -            inline TMsgBunch* GetToken(ui64 slot) {  -                return reinterpret_cast<TMsgBunch*>(  -                    LinkArray[slot - FirstSlot].Msg);  -            }  -  -            inline void IncrementToken() {  -                AtomicIncrement(Token);  -            }  -  -            // the object could be destroyed after this method  -            inline void DecrementToken() {  -                if (Y_UNLIKELY(AtomicDecrement(Token) == RELEASE_SIZE)) {  -                    Release(this);  -                    AtomicGet(NextToken)->DecrementToken();  -                    // this could be invalid here  -                }  -            }  -  -            // the object could be destroyed after this method  -            inline void SetNextToken(TMsgBunch* next) {  -                AtomicSet(NextToken, next);  +                if (Y_UNLIKELY(slot < FirstSlot)) { +                    return PUSH_RESULT_BACKWARD; +                } + +                if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) { +                    return PUSH_RESULT_FORWARD; +                } + +                LinkArray[slot - FirstSlot].Store(auxiliary); + +                AtomicSet(LinkArray[slot - FirstSlot].Msg, msg); +                return PUSH_RESULT_OK; +            } + +            inline bool IsSlotHere(ui64 slot) { +                return slot < FirstSlot + BUNCH_SIZE; +            } + +            inline TMsgLink GetSlot(ui64 slot) const { +                return AtomicGet(LinkArray[slot - FirstSlot].Msg); +            } + +            inline TSlot<TAux> GetSlotAux(ui64 slot) const { +                auto msg = GetSlot(slot); +                auto aux = LinkArray[slot - FirstSlot].Retrieve(); +                return TSlot<TAux>::Pair(msg, aux); +            } + +            inline TMsgBunch* GetNextBunch() const { +                return AtomicGet(NextBunch); +            } + +            inline bool SetNextBunch(TMsgBunch* ptr) { +                return AtomicCas(&NextBunch, ptr, nullptr); +            } + +            inline TMsgBunch* GetBackLink() const { +                return AtomicGet(BackLink); +            } + +            inline TMsgBunch* GetToken(ui64 slot) { +                return reinterpret_cast<TMsgBunch*>( +                    LinkArray[slot - FirstSlot].Msg); +            } + +            inline void IncrementToken() { +                AtomicIncrement(Token); +            } + +            // the object could be destroyed after this method +            inline void DecrementToken() { +                if (Y_UNLIKELY(AtomicDecrement(Token) == RELEASE_SIZE)) { +                    Release(this); +                    AtomicGet(NextToken)->DecrementToken(); +                    // this could be invalid here +                } +            } + +            // the object could be destroyed after this method +            inline void SetNextToken(TMsgBunch* next) { +                AtomicSet(NextToken, next);                  if (Y_UNLIKELY(AtomicAdd(Token, RELEASE_SIZE) == RELEASE_SIZE)) { -                    Release(this);  -                    next->DecrementToken();  -                }  -                // this could be invalid here  -            }  -  -            TMsgBunch(ui64 start, TMsgBunch* backLink) {  -                AtomicSet(FirstSlot, start);  -                memset(&LinkArray, 0, sizeof(LinkArray));  -                AtomicSet(NextBunch, nullptr);  -                AtomicSet(BackLink, backLink);  -  -                AtomicSet(Token, 1);  -                AtomicSet(NextToken, nullptr);  -            }  -  -            static void Release(TMsgBunch* block) {  -                auto backLink = AtomicGet(block->BackLink);  -                if (backLink == nullptr) {  -                    return;  -                }  -                AtomicSet(block->BackLink, nullptr);  -  -                do {  -                    auto bbackLink = backLink->BackLink;  -                    delete backLink;  -                    backLink = bbackLink;  -                } while (backLink != nullptr);  -            }  -  -            void Destroy() {  -                for (auto tail = BackLink; tail != nullptr;) {  -                    auto next = tail->BackLink;  -                    delete tail;  -                    tail = next;  -                }  -  -                for (auto next = this; next != nullptr;) {  -                    auto nnext = next->NextBunch;  -                    delete next;  -                    next = nnext;  -                }  -            }  -        };  -  -        template <ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE,  -                  typename TBunchBase = NReadAsFilledPrivate::TEmpty,  -                  typename TAux = TEmptyAux>  -        class TWriteBucket {  -        public:  -            using TUsingAux = TAux; // for TReadBucket binding  -            using TBunch = TMsgBunch<BUNCH_SIZE, TBunchBase, TAux>;  -  -            TWriteBucket(TBunch* bunch = new TBunch(0, nullptr)) {  -                AtomicSet(LastBunch, bunch);  -                AtomicSet(SlotCounter, 0);  -            }  -  -            TWriteBucket(TWriteBucket&& move)  -                : LastBunch(move.LastBunch)  -                , SlotCounter(move.SlotCounter)  -            {  -                move.LastBunch = nullptr;  -            }  -  -            ~TWriteBucket() {  -                if (LastBunch != nullptr) {  -                    LastBunch->Destroy();  -                }  -            }  -  -            inline void Push(TMsgLink msg, TAux aux) {  -                ui64 pushSlot = AtomicGetAndIncrement(SlotCounter);  -                TBunch* hintBunch = GetLastBunch();  -  -                for (;;) {  -                    auto hint = hintBunch->Push(msg, pushSlot, aux);  -                    if (Y_LIKELY(hint == PUSH_RESULT_OK)) {  -                        return;  -                    }  -                    HandleHint(hintBunch, hint);  -                }  -            }  -  -        protected:  -            template <typename, template <typename, typename...> class>  -            friend class TReadBucket;  -  -            TBunch* volatile LastBunch; // Hint  -            volatile ui64 SlotCounter;  -  -            inline TBunch* GetLastBunch() const {  -                return AtomicGet(LastBunch);  -            }  -  -            void HandleHint(TBunch*& hintBunch, TPushResult hint) {  -                if (Y_UNLIKELY(hint == PUSH_RESULT_BACKWARD)) {  -                    hintBunch = hintBunch->GetBackLink();  -                    return;  -                }  -  -                // PUSH_RESULT_FORWARD  -                auto nextBunch = hintBunch->GetNextBunch();  -  -                if (nextBunch == nullptr) {  -                    auto first = hintBunch->FirstSlot + BUNCH_SIZE;  -                    nextBunch = new TBunch(first, hintBunch);  -                    if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) {  -                        delete nextBunch;  -                        nextBunch = hintBunch->GetNextBunch();  -                    }  -                }  -  -                // hintBunch could not be freed here so it cannot be reused  -                // it's alright if this CAS was not succeeded,  -                // it means that other thread did that recently  -                AtomicCas(&LastBunch, nextBunch, hintBunch);  -  -                hintBunch = nextBunch;  -            }  -        };  -  +                    Release(this); +                    next->DecrementToken(); +                } +                // this could be invalid here +            } + +            TMsgBunch(ui64 start, TMsgBunch* backLink) { +                AtomicSet(FirstSlot, start); +                memset(&LinkArray, 0, sizeof(LinkArray)); +                AtomicSet(NextBunch, nullptr); +                AtomicSet(BackLink, backLink); + +                AtomicSet(Token, 1); +                AtomicSet(NextToken, nullptr); +            } + +            static void Release(TMsgBunch* block) { +                auto backLink = AtomicGet(block->BackLink); +                if (backLink == nullptr) { +                    return; +                } +                AtomicSet(block->BackLink, nullptr); + +                do { +                    auto bbackLink = backLink->BackLink; +                    delete backLink; +                    backLink = bbackLink; +                } while (backLink != nullptr); +            } + +            void Destroy() { +                for (auto tail = BackLink; tail != nullptr;) { +                    auto next = tail->BackLink; +                    delete tail; +                    tail = next; +                } + +                for (auto next = this; next != nullptr;) { +                    auto nnext = next->NextBunch; +                    delete next; +                    next = nnext; +                } +            } +        }; + +        template <ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE, +                  typename TBunchBase = NReadAsFilledPrivate::TEmpty, +                  typename TAux = TEmptyAux> +        class TWriteBucket { +        public: +            using TUsingAux = TAux; // for TReadBucket binding +            using TBunch = TMsgBunch<BUNCH_SIZE, TBunchBase, TAux>; + +            TWriteBucket(TBunch* bunch = new TBunch(0, nullptr)) { +                AtomicSet(LastBunch, bunch); +                AtomicSet(SlotCounter, 0); +            } + +            TWriteBucket(TWriteBucket&& move) +                : LastBunch(move.LastBunch) +                , SlotCounter(move.SlotCounter) +            { +                move.LastBunch = nullptr; +            } + +            ~TWriteBucket() { +                if (LastBunch != nullptr) { +                    LastBunch->Destroy(); +                } +            } + +            inline void Push(TMsgLink msg, TAux aux) { +                ui64 pushSlot = AtomicGetAndIncrement(SlotCounter); +                TBunch* hintBunch = GetLastBunch(); + +                for (;;) { +                    auto hint = hintBunch->Push(msg, pushSlot, aux); +                    if (Y_LIKELY(hint == PUSH_RESULT_OK)) { +                        return; +                    } +                    HandleHint(hintBunch, hint); +                } +            } + +        protected: +            template <typename, template <typename, typename...> class> +            friend class TReadBucket; + +            TBunch* volatile LastBunch; // Hint +            volatile ui64 SlotCounter; + +            inline TBunch* GetLastBunch() const { +                return AtomicGet(LastBunch); +            } + +            void HandleHint(TBunch*& hintBunch, TPushResult hint) { +                if (Y_UNLIKELY(hint == PUSH_RESULT_BACKWARD)) { +                    hintBunch = hintBunch->GetBackLink(); +                    return; +                } + +                // PUSH_RESULT_FORWARD +                auto nextBunch = hintBunch->GetNextBunch(); + +                if (nextBunch == nullptr) { +                    auto first = hintBunch->FirstSlot + BUNCH_SIZE; +                    nextBunch = new TBunch(first, hintBunch); +                    if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) { +                        delete nextBunch; +                        nextBunch = hintBunch->GetNextBunch(); +                    } +                } + +                // hintBunch could not be freed here so it cannot be reused +                // it's alright if this CAS was not succeeded, +                // it means that other thread did that recently +                AtomicCas(&LastBunch, nextBunch, hintBunch); + +                hintBunch = nextBunch; +            } +        }; +          template <typename TWBucket = TWriteBucket<>,                    template <typename, typename...> class TContainer = TDeque> -        class TReadBucket {  -        public:  -            using TAux = typename TWBucket::TUsingAux;  -            using TBunch = typename TWBucket::TBunch;  -  -            static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 5;  -  -            TReadBucket(TWBucket* writer)  -                : Writer(writer)  -                , ReadBunch(writer->GetLastBunch())  -                , LastKnownPushBunch(writer->GetLastBunch())  -            {  -                ReadBunch->DecrementToken(); // no previous token  -            }  -  -            TReadBucket(TReadBucket toCopy, TWBucket* writer)  -                : TReadBucket(std::move(toCopy))  -            {  -                Writer = writer;  -            }  -  -            ui64 ReadyCount() const {  -                return AtomicGet(Writer->SlotCounter) - ReadSlot;  -            }  -  -            TMsgLink Pop() {  -                return PopAux().Msg;  -            }  -  -            TMsgLink Peek() {  -                return PeekAux().Msg;  -            }  -  -            TSlot<TAux> PopAux() {  -                for (;;) {  -                    if (Y_UNLIKELY(ReadNow.size() != 0)) {  -                        auto result = PopSkipped();  -                        if (Y_LIKELY(result.Msg != nullptr)) {  -                            return result;  -                        }  -                    }  -  -                    if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {  -                        if (Y_LIKELY(!RereadPushSlot())) {  -                            return TSlot<TAux>::NullElem();  -                        }  -                        continue;  -                    }  -  -                    if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {  -                        if (Y_UNLIKELY(!SwitchToNextBunch())) {  -                            return TSlot<TAux>::NullElem();  -                        }  -                    }  -  -                    auto result = ReadBunch->GetSlotAux(ReadSlot);  -                    if (Y_LIKELY(result.Msg != nullptr)) {  -                        ++ReadSlot;  -                        return result;  -                    }  -  -                    result = StubbornPop();  -                    if (Y_LIKELY(result.Msg != nullptr)) {  -                        return result;  -                    }  -                }  -            }  -  -            TSlot<TAux> PeekAux() {  -                for (;;) {  -                    if (Y_UNLIKELY(ReadNow.size() != 0)) {  -                        auto result = PeekSkipped();  -                        if (Y_LIKELY(result.Msg != nullptr)) {  -                            return result;  -                        }  -                    }  -  -                    if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {  -                        if (Y_LIKELY(!RereadPushSlot())) {  -                            return TSlot<TAux>::NullElem();  -                        }  -                        continue;  -                    }  -  -                    if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {  -                        if (Y_UNLIKELY(!SwitchToNextBunch())) {  -                            return TSlot<TAux>::NullElem();  -                        }  -                    }  -  -                    auto result = ReadBunch->GetSlotAux(ReadSlot);  -                    if (Y_LIKELY(result.Msg != nullptr)) {  -                        return result;  -                    }  -  -                    result = StubbornPeek();  -                    if (Y_LIKELY(result.Msg != nullptr)) {  -                        return result;  -                    }  -                }  -            }  -  -        private:  -            TWBucket* Writer;  -            TBunch* ReadBunch;  -            ui64 ReadSlot = 0;  -            TBunch* LastKnownPushBunch;  -            ui64 LastKnownPushSlot = 0;  -  -            struct TSkipItem {  -                TBunch* Bunch;  -                ui64 Slot;  -                TBunch* Token;  -            };  -  -            TContainer<TSkipItem> ReadNow;  -            TContainer<TSkipItem> ReadLater;  -  -            void AddToReadLater() {  -                ReadLater.push_back({ReadBunch, ReadSlot, LastKnownPushBunch});  -                LastKnownPushBunch->IncrementToken();  -                ++ReadSlot;  -            }  -  -            // MUST BE: ReadSlot == LastKnownPushSlot  -            bool RereadPushSlot() {  -                ReadNow = std::move(ReadLater);  -                ReadLater.clear();  -  -                auto oldSlot = LastKnownPushSlot;  -  -                auto currentPushBunch = Writer->GetLastBunch();  -                auto currentPushSlot = AtomicGet(Writer->SlotCounter);  -  -                if (currentPushBunch != LastKnownPushBunch) {  -                    // LastKnownPushBunch could be invalid after this line  -                    LastKnownPushBunch->SetNextToken(currentPushBunch);  -                }  -  -                LastKnownPushBunch = currentPushBunch;  -                LastKnownPushSlot = currentPushSlot;  -  -                return oldSlot != LastKnownPushSlot;  -            }  -  -            bool SwitchToNextBunch() {  -                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {  -                    auto next = ReadBunch->GetNextBunch();  -                    if (next != nullptr) {  -                        ReadBunch = next;  -                        return true;  -                    }  -                    SpinLockPause();  -                }  -                return false;  -            }  -  -            TSlot<TAux> StubbornPop() {  -                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {  -                    auto result = ReadBunch->GetSlotAux(ReadSlot);  -                    if (Y_LIKELY(result.Msg != nullptr)) {  -                        ++ReadSlot;  -                        return result;  -                    }  -                    SpinLockPause();  -                }  -  -                AddToReadLater();  -                return TSlot<TAux>::NullElem();  -            }  -  -            TSlot<TAux> StubbornPeek() {  -                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {  -                    auto result = ReadBunch->GetSlotAux(ReadSlot);  -                    if (Y_LIKELY(result.Msg != nullptr)) {  -                        return result;  -                    }  -                    SpinLockPause();  -                }  -  -                AddToReadLater();  -                return TSlot<TAux>::NullElem();  -            }  -  -            TSlot<TAux> PopSkipped() {  -                do {  -                    auto elem = ReadNow.front();  -                    ReadNow.pop_front();  -  -                    auto result = elem.Bunch->GetSlotAux(elem.Slot);  -                    if (Y_LIKELY(result.Msg != nullptr)) {  -                        elem.Token->DecrementToken();  -                        return result;  -                    }  -  -                    ReadLater.emplace_back(elem);  -  -                } while (ReadNow.size() > 0);  -  -                return TSlot<TAux>::NullElem();  -            }  -  -            TSlot<TAux> PeekSkipped() {  -                do {  -                    auto elem = ReadNow.front();  -  -                    auto result = elem.Bunch->GetSlotAux(elem.Slot);  -                    if (Y_LIKELY(result.Msg != nullptr)) {  -                        return result;  -                    }  -  -                    ReadNow.pop_front();  -                    ReadLater.emplace_back(elem);  -  -                } while (ReadNow.size() > 0);  -  -                return TSlot<TAux>::NullElem();  -            }  -        };  -  -        struct TDefaultParams {  -            static constexpr ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE;  -            using TBunchBase = TEmpty;  -  +        class TReadBucket { +        public: +            using TAux = typename TWBucket::TUsingAux; +            using TBunch = typename TWBucket::TBunch; + +            static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 5; + +            TReadBucket(TWBucket* writer) +                : Writer(writer) +                , ReadBunch(writer->GetLastBunch()) +                , LastKnownPushBunch(writer->GetLastBunch()) +            { +                ReadBunch->DecrementToken(); // no previous token +            } + +            TReadBucket(TReadBucket toCopy, TWBucket* writer) +                : TReadBucket(std::move(toCopy)) +            { +                Writer = writer; +            } + +            ui64 ReadyCount() const { +                return AtomicGet(Writer->SlotCounter) - ReadSlot; +            } + +            TMsgLink Pop() { +                return PopAux().Msg; +            } + +            TMsgLink Peek() { +                return PeekAux().Msg; +            } + +            TSlot<TAux> PopAux() { +                for (;;) { +                    if (Y_UNLIKELY(ReadNow.size() != 0)) { +                        auto result = PopSkipped(); +                        if (Y_LIKELY(result.Msg != nullptr)) { +                            return result; +                        } +                    } + +                    if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) { +                        if (Y_LIKELY(!RereadPushSlot())) { +                            return TSlot<TAux>::NullElem(); +                        } +                        continue; +                    } + +                    if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) { +                        if (Y_UNLIKELY(!SwitchToNextBunch())) { +                            return TSlot<TAux>::NullElem(); +                        } +                    } + +                    auto result = ReadBunch->GetSlotAux(ReadSlot); +                    if (Y_LIKELY(result.Msg != nullptr)) { +                        ++ReadSlot; +                        return result; +                    } + +                    result = StubbornPop(); +                    if (Y_LIKELY(result.Msg != nullptr)) { +                        return result; +                    } +                } +            } + +            TSlot<TAux> PeekAux() { +                for (;;) { +                    if (Y_UNLIKELY(ReadNow.size() != 0)) { +                        auto result = PeekSkipped(); +                        if (Y_LIKELY(result.Msg != nullptr)) { +                            return result; +                        } +                    } + +                    if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) { +                        if (Y_LIKELY(!RereadPushSlot())) { +                            return TSlot<TAux>::NullElem(); +                        } +                        continue; +                    } + +                    if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) { +                        if (Y_UNLIKELY(!SwitchToNextBunch())) { +                            return TSlot<TAux>::NullElem(); +                        } +                    } + +                    auto result = ReadBunch->GetSlotAux(ReadSlot); +                    if (Y_LIKELY(result.Msg != nullptr)) { +                        return result; +                    } + +                    result = StubbornPeek(); +                    if (Y_LIKELY(result.Msg != nullptr)) { +                        return result; +                    } +                } +            } + +        private: +            TWBucket* Writer; +            TBunch* ReadBunch; +            ui64 ReadSlot = 0; +            TBunch* LastKnownPushBunch; +            ui64 LastKnownPushSlot = 0; + +            struct TSkipItem { +                TBunch* Bunch; +                ui64 Slot; +                TBunch* Token; +            }; + +            TContainer<TSkipItem> ReadNow; +            TContainer<TSkipItem> ReadLater; + +            void AddToReadLater() { +                ReadLater.push_back({ReadBunch, ReadSlot, LastKnownPushBunch}); +                LastKnownPushBunch->IncrementToken(); +                ++ReadSlot; +            } + +            // MUST BE: ReadSlot == LastKnownPushSlot +            bool RereadPushSlot() { +                ReadNow = std::move(ReadLater); +                ReadLater.clear(); + +                auto oldSlot = LastKnownPushSlot; + +                auto currentPushBunch = Writer->GetLastBunch(); +                auto currentPushSlot = AtomicGet(Writer->SlotCounter); + +                if (currentPushBunch != LastKnownPushBunch) { +                    // LastKnownPushBunch could be invalid after this line +                    LastKnownPushBunch->SetNextToken(currentPushBunch); +                } + +                LastKnownPushBunch = currentPushBunch; +                LastKnownPushSlot = currentPushSlot; + +                return oldSlot != LastKnownPushSlot; +            } + +            bool SwitchToNextBunch() { +                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { +                    auto next = ReadBunch->GetNextBunch(); +                    if (next != nullptr) { +                        ReadBunch = next; +                        return true; +                    } +                    SpinLockPause(); +                } +                return false; +            } + +            TSlot<TAux> StubbornPop() { +                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { +                    auto result = ReadBunch->GetSlotAux(ReadSlot); +                    if (Y_LIKELY(result.Msg != nullptr)) { +                        ++ReadSlot; +                        return result; +                    } +                    SpinLockPause(); +                } + +                AddToReadLater(); +                return TSlot<TAux>::NullElem(); +            } + +            TSlot<TAux> StubbornPeek() { +                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { +                    auto result = ReadBunch->GetSlotAux(ReadSlot); +                    if (Y_LIKELY(result.Msg != nullptr)) { +                        return result; +                    } +                    SpinLockPause(); +                } + +                AddToReadLater(); +                return TSlot<TAux>::NullElem(); +            } + +            TSlot<TAux> PopSkipped() { +                do { +                    auto elem = ReadNow.front(); +                    ReadNow.pop_front(); + +                    auto result = elem.Bunch->GetSlotAux(elem.Slot); +                    if (Y_LIKELY(result.Msg != nullptr)) { +                        elem.Token->DecrementToken(); +                        return result; +                    } + +                    ReadLater.emplace_back(elem); + +                } while (ReadNow.size() > 0); + +                return TSlot<TAux>::NullElem(); +            } + +            TSlot<TAux> PeekSkipped() { +                do { +                    auto elem = ReadNow.front(); + +                    auto result = elem.Bunch->GetSlotAux(elem.Slot); +                    if (Y_LIKELY(result.Msg != nullptr)) { +                        return result; +                    } + +                    ReadNow.pop_front(); +                    ReadLater.emplace_back(elem); + +                } while (ReadNow.size() > 0); + +                return TSlot<TAux>::NullElem(); +            } +        }; + +        struct TDefaultParams { +            static constexpr ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE; +            using TBunchBase = TEmpty; +              template <typename TElem, typename... TRest>              using TContainer = TDeque<TElem, TRest...>; -  -            static constexpr bool DeleteItems = true;  -        };  -  -    } //namespace NReadAsFilledPrivate  -  -    DeclareTuneValueParam(TRaFQueueBunchSize, ui32, BUNCH_SIZE);  -    DeclareTuneTypeParam(TRaFQueueBunchBase, TBunchBase);  -    DeclareTuneContainer(TRaFQueueSkipContainer, TContainer);  -    DeclareTuneValueParam(TRaFQueueDeleteItems, bool, DeleteItems);  -  + +            static constexpr bool DeleteItems = true; +        }; + +    } //namespace NReadAsFilledPrivate + +    DeclareTuneValueParam(TRaFQueueBunchSize, ui32, BUNCH_SIZE); +    DeclareTuneTypeParam(TRaFQueueBunchBase, TBunchBase); +    DeclareTuneContainer(TRaFQueueSkipContainer, TContainer); +    DeclareTuneValueParam(TRaFQueueDeleteItems, bool, DeleteItems); +      template <typename TItem = void, typename... TParams> -    class TReadAsFilledQueue {  -    private:  -        using TTuned = TTune<NReadAsFilledPrivate::TDefaultParams, TParams...>;  -  -        static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE;  -  -        using TBunchBase = typename TTuned::TBunchBase;  -  +    class TReadAsFilledQueue { +    private: +        using TTuned = TTune<NReadAsFilledPrivate::TDefaultParams, TParams...>; + +        static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE; + +        using TBunchBase = typename TTuned::TBunchBase; +          template <typename TElem, typename... TRest> -        using TContainer =  -            typename TTuned::template TContainer<TElem, TRest...>;  -  -        using TWriteBucket =  -            NReadAsFilledPrivate::TWriteBucket<BUNCH_SIZE, TBunchBase>;  -        using TReadBucket =  -            NReadAsFilledPrivate::TReadBucket<TWriteBucket, TContainer>;  -  -    public:  -        TReadAsFilledQueue()  -            : RBucket(&WBucket)  -        {  -        }  -  -        ~TReadAsFilledQueue() {  -            if (TTuned::DeleteItems) {  -                for (;;) {  -                    auto msg = Pop();  -                    if (msg == nullptr) {  -                        break;  -                    }  -                    TDelete::Destroy(msg);  -                }  -            }  -        }  -  -        void Push(TItem* msg) {  -            WBucket.Push((void*)msg, NReadAsFilledPrivate::TEmptyAux());  -        }  -  -        TItem* Pop() {  -            return (TItem*)RBucket.Pop();  -        }  -  -        TItem* Peek() {  -            return (TItem*)RBucket.Peek();  -        }  -  -    protected:  -        TWriteBucket WBucket;  -        TReadBucket RBucket;  -    };  -}  +        using TContainer = +            typename TTuned::template TContainer<TElem, TRest...>; + +        using TWriteBucket = +            NReadAsFilledPrivate::TWriteBucket<BUNCH_SIZE, TBunchBase>; +        using TReadBucket = +            NReadAsFilledPrivate::TReadBucket<TWriteBucket, TContainer>; + +    public: +        TReadAsFilledQueue() +            : RBucket(&WBucket) +        { +        } + +        ~TReadAsFilledQueue() { +            if (TTuned::DeleteItems) { +                for (;;) { +                    auto msg = Pop(); +                    if (msg == nullptr) { +                        break; +                    } +                    TDelete::Destroy(msg); +                } +            } +        } + +        void Push(TItem* msg) { +            WBucket.Push((void*)msg, NReadAsFilledPrivate::TEmptyAux()); +        } + +        TItem* Pop() { +            return (TItem*)RBucket.Pop(); +        } + +        TItem* Peek() { +            return (TItem*)RBucket.Peek(); +        } + +    protected: +        TWriteBucket WBucket; +        TReadBucket RBucket; +    }; +} diff --git a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp index 00dbfeaa64e..2bd0c298216 100644 --- a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp +++ b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp @@ -1 +1 @@ -#include "mpsc_vinfarr_obstructive.h"  +#include "mpsc_vinfarr_obstructive.h" diff --git a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h index 3e1ae923420..5f91f1b5a84 100644 --- a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h +++ b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h @@ -1,528 +1,528 @@ -#pragma once  -  -/*  -  Semi-wait-free queue, multiple producers - one consumer. Strict order.  -  The queue algorithm is using concept of virtual infinite array.  -  -  A producer takes a number from a counter and atomicaly increments the counter.  -  The number taken is a number of a slot for the producer to put a new message  -  into infinite array.  -  -  Then producer constructs a virtual infinite array by bidirectional linked list  -  of blocks. Each block contains several slots.  -  -  There is a hint pointer which optimisticly points to the last block  -  of the list and never goes backward.  -  -  Consumer exploits the property of the hint pointer always going forward  -  to free old blocks eventually. Consumer periodically read the hint pointer  -  and the counter and thus deduce producers which potentially holds the pointer  -  to a block. Consumer can free the block if all that producers filled their  -  slots and left the queue.  -  -  No producer can stop the progress for other producers.  -  -  Consumer can obstruct a slot of a delayed producer by putting special mark.  -  Thus no producer can stop the progress for consumer.  -  But a slow producer may be forced to retry unlimited number of times.  -  Though it's very unlikely for a non-preempted producer to be obstructed.  -  That's why the algorithm is semi-wait-free.  -  -  WARNING: there is no wait¬ify mechanic for consumer,  -  consumer receives nullptr if queue was empty.  -  -  WARNING: though the algorithm itself is lock-free  -  but producers and consumer could be blocked by memory allocator  -  -  WARNING: copy constructers of the queue are not thread-safe  - */  -  -#include <util/generic/noncopyable.h>  -#include <util/generic/ptr.h>  -#include <util/system/atomic.h>  -#include <util/system/spinlock.h>  -  -#include "tune.h"  -  -namespace NThreading {  -    namespace NObstructiveQueuePrivate {  -        typedef void* TMsgLink;  -  -        struct TEmpty {  -        };  -  -        struct TEmptyAux {  -            TEmptyAux Retrieve() const {  -                return TEmptyAux();  -            }  -            void Store(TEmptyAux&) {  -            }  -            static constexpr TEmptyAux Zero() {  -                return TEmptyAux();  -            }  -        };  -  -        template <typename TAux>  -        struct TSlot {  -            TMsgLink volatile Msg;  -            TAux AuxiliaryData;  -  -            inline void Store(TAux& aux) {  -                AuxiliaryData.Store(aux);  -            }  -  -            inline TAux Retrieve() const {  -                return AuxiliaryData.Retrieve();  -            }  -  -            static TSlot<TAux> NullElem() {  -                return {nullptr, TAux::Zero()};  -            }  -  -            static TSlot<TAux> Pair(TMsgLink msg, TAux aux) {  -                return {msg, std::move(aux)};  -            }  -        };  -  -        template <>  -        struct TSlot<TEmptyAux> {  -            TMsgLink volatile Msg;  -            inline void Store(TEmptyAux&) {  -            }  -            inline TEmptyAux Retrieve() const {  -                return TEmptyAux();  -            }  -  -            static TSlot<TEmptyAux> NullElem() {  -                return {nullptr};  -            }  -  -            static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) {  -                return {msg};  -            }  -        };  -  -        enum TPushResult {  -            PUSH_RESULT_OK,  -            PUSH_RESULT_BACKWARD,  -            PUSH_RESULT_FORWARD,  -            PUSH_RESULT_BLOCKED,  -        };  -  -        template <typename TAux, ui32 BUNCH_SIZE, typename TBase = TEmpty>  -        struct TMsgBunch: public TBase {  -            ui64 FirstSlot;  -  -            TSlot<TAux> LinkArray[BUNCH_SIZE];  -  -            TMsgBunch* volatile NextBunch;  -            TMsgBunch* volatile BackLink;  -  -            ui64 volatile Token;  -            TMsgBunch* volatile NextToken;  -  -            /* this push can return PUSH_RESULT_BLOCKED */  -            inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) {  -                if (Y_UNLIKELY(slot < FirstSlot)) {  -                    return PUSH_RESULT_BACKWARD;  -                }  -  -                if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) {  -                    return PUSH_RESULT_FORWARD;  -                }  -  -                LinkArray[slot - FirstSlot].Store(auxiliary);  -  -                auto oldValue = AtomicSwap(&LinkArray[slot - FirstSlot].Msg, msg);  -  -                if (Y_LIKELY(oldValue == nullptr)) {  -                    return PUSH_RESULT_OK;  -                } else {  -                    LeaveBlocked(oldValue);  -                    return PUSH_RESULT_BLOCKED;  -                }  -            }  -  -            inline bool IsSlotHere(ui64 slot) {  -                return slot < FirstSlot + BUNCH_SIZE;  -            }  -  -            inline TMsgLink GetSlot(ui64 slot) const {  -                return AtomicGet(LinkArray[slot - FirstSlot].Msg);  -            }  -  -            inline TSlot<TAux> GetSlotAux(ui64 slot) const {  -                auto msg = GetSlot(slot);  -                auto aux = LinkArray[slot - FirstSlot].Retrieve();  -                return TSlot<TAux>::Pair(msg, aux);  -            }  -  -            void LeaveBlocked(ui64 slot) {  -                auto token = GetToken(slot);  -                token->DecrementToken();  -            }  -  -            void LeaveBlocked(TMsgLink msg) {  -                auto token = reinterpret_cast<TMsgBunch*>(msg);  -                token->DecrementToken();  -            }  -  -            TSlot<TAux> BlockSlotAux(ui64 slot, TMsgBunch* token) {  -                auto old =  -                    AtomicSwap(&LinkArray[slot - FirstSlot].Msg, (TMsgLink)token);  -                if (old == nullptr) {  -                    // It's valid to increment after AtomicCas  -                    // because token will release data only after SetNextToken  -                    token->IncrementToken();  -                    return TSlot<TAux>::NullElem();  -                }  -                return TSlot<TAux>::Pair(old, LinkArray[slot - FirstSlot].Retrieve());  -            }  -  -            inline TMsgBunch* GetNextBunch() const {  -                return AtomicGet(NextBunch);  -            }  -  -            inline bool SetNextBunch(TMsgBunch* ptr) {  -                return AtomicCas(&NextBunch, ptr, nullptr);  -            }  -  -            inline TMsgBunch* GetBackLink() const {  -                return AtomicGet(BackLink);  -            }  -  -            inline TMsgBunch* GetToken(ui64 slot) {  -                return reinterpret_cast<TMsgBunch*>(LinkArray[slot - FirstSlot].Msg);  -            }  -  -            inline void IncrementToken() {  -                AtomicIncrement(Token);  -            }  -  -            // the object could be destroyed after this method  -            inline void DecrementToken() {  -                if (Y_UNLIKELY(AtomicDecrement(Token) == BUNCH_SIZE)) {  -                    Release(this);  -                    AtomicGet(NextToken)->DecrementToken();  -                    // this could be invalid here  -                }  -            }  -  -            // the object could be destroyed after this method  -            inline void SetNextToken(TMsgBunch* next) {  -                AtomicSet(NextToken, next);  -                if (Y_UNLIKELY(AtomicAdd(Token, BUNCH_SIZE) == BUNCH_SIZE)) {  -                    Release(this);  -                    next->DecrementToken();  -                }  -                // this could be invalid here  -            }  -  -            TMsgBunch(ui64 start, TMsgBunch* backLink) {  -                AtomicSet(FirstSlot, start);  -                memset(&LinkArray, 0, sizeof(LinkArray));  -                AtomicSet(NextBunch, nullptr);  -                AtomicSet(BackLink, backLink);  -  -                AtomicSet(Token, 1);  -                AtomicSet(NextToken, nullptr);  -            }  -  -            static void Release(TMsgBunch* bunch) {  -                auto backLink = AtomicGet(bunch->BackLink);  -                if (backLink == nullptr) {  -                    return;  -                }  -                AtomicSet(bunch->BackLink, nullptr);  -  -                do {  -                    auto bbackLink = backLink->BackLink;  -                    delete backLink;  -                    backLink = bbackLink;  -                } while (backLink != nullptr);  -            }  -  -            void Destroy() {  -                for (auto tail = BackLink; tail != nullptr;) {  -                    auto next = tail->BackLink;  -                    delete tail;  -                    tail = next;  -                }  -  -                for (auto next = this; next != nullptr;) {  -                    auto nnext = next->NextBunch;  -                    delete next;  -                    next = nnext;  -                }  -            }  -        };  -  -        template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase = TEmpty>  -        class TWriteBucket {  -        public:  -            static const ui64 GROSS_SIZE;  -  -            using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>;  -  -            TWriteBucket(TBunch* bunch = new TBunch(0, nullptr))  -                : LastBunch(bunch)  -                , SlotCounter(0)  -            {  -            }  -  -            TWriteBucket(TWriteBucket&& move)  -                : LastBunch(move.LastBunch)  -                , SlotCounter(move.SlotCounter)  -            {  -                move.LastBunch = nullptr;  -            }  -  -            ~TWriteBucket() {  -                if (LastBunch != nullptr) {  -                    LastBunch->Destroy();  -                }  -            }  -  -            inline bool Push(TMsgLink msg, TAux aux) {  -                ui64 pushSlot = AtomicGetAndIncrement(SlotCounter);  -                TBunch* hintBunch = GetLastBunch();  -  -                for (;;) {  -                    auto hint = hintBunch->Push(msg, pushSlot, aux);  -                    if (Y_LIKELY(hint == PUSH_RESULT_OK)) {  -                        return true;  -                    }  -                    bool hhResult = HandleHint(hintBunch, hint);  -                    if (Y_UNLIKELY(!hhResult)) {  -                        return false;  -                    }  -                }  -            }  -  -        protected:  -            template <typename, ui32, typename>  -            friend class TReadBucket;  -  -            TBunch* volatile LastBunch; // Hint  -            volatile ui64 SlotCounter;  -  -            inline TBunch* GetLastBunch() const {  -                return AtomicGet(LastBunch);  -            }  -  -            bool HandleHint(TBunch*& hintBunch, TPushResult hint) {  -                if (Y_UNLIKELY(hint == PUSH_RESULT_BLOCKED)) {  -                    return false;  -                }  -  -                if (Y_UNLIKELY(hint == PUSH_RESULT_BACKWARD)) {  -                    hintBunch = hintBunch->GetBackLink();  -                    return true;  -                }  -  -                // PUSH_RESULT_FORWARD  -                auto nextBunch = hintBunch->GetNextBunch();  -  -                if (nextBunch == nullptr) {  -                    auto first = hintBunch->FirstSlot + BUNCH_SIZE;  -                    nextBunch = new TBunch(first, hintBunch);  -                    if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) {  -                        delete nextBunch;  -                        nextBunch = hintBunch->GetNextBunch();  -                    }  -                }  -  -                // hintBunch could not be freed here so it cannot be reused  -                // it's alright if this CAS was not succeeded,  -                // it means that other thread did that recently  -                AtomicCas(&LastBunch, nextBunch, hintBunch);  -  -                hintBunch = nextBunch;  -                return true;  -            }  -        };  -  -        template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase>  -        class TReadBucket {  -        public:  -            static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 20;  -  -            using TWBucket = TWriteBucket<TAux, BUNCH_SIZE, TBunchBase>;  -            using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>;  -  -            TReadBucket(TWBucket* writer)  -                : Writer(writer)  -                , ReadBunch(writer->GetLastBunch())  -                , LastKnownPushBunch(writer->GetLastBunch())  -            {  -                ReadBunch->DecrementToken(); // no previous token  -            }  -  -            TReadBucket(TReadBucket toCopy, TWBucket* writer)  -                : TReadBucket(std::move(toCopy))  -            {  -                Writer = writer;  -            }  -  -            ui64 ReadyCount() const {  -                return AtomicGet(Writer->SlotCounter) - ReadSlot;  -            }  -  -            inline TMsgLink Pop() {  -                return PopAux().Msg;  -            }  -  -            inline TSlot<TAux> PopAux() {  -                for (;;) {  -                    if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {  -                        if (Y_LIKELY(!RereadPushSlot())) {  -                            return TSlot<TAux>::NullElem();  -                        }  -                    }  -  -                    if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {  -                        if (Y_UNLIKELY(!SwitchToNextBunch())) {  -                            return TSlot<TAux>::NullElem();  -                        }  -                    }  -  -                    auto result = ReadBunch->GetSlotAux(ReadSlot);  -                    if (Y_LIKELY(result.Msg != nullptr)) {  -                        ++ReadSlot;  -                        return result;  -                    }  -  -                    if (ReadSlot + 1 == AtomicGet(Writer->SlotCounter)) {  -                        return TSlot<TAux>::NullElem();  -                    }  -  -                    result = StubbornPopAux();  -  -                    if (result.Msg != nullptr) {  -                        return result;  -                    }  -                }  -            }  -  -        private:  -            TWBucket* Writer;  -            TBunch* ReadBunch;  -            ui64 ReadSlot = 0;  -            TBunch* LastKnownPushBunch;  -            ui64 LastKnownPushSlot = 0;  -  -            // MUST BE: ReadSlot == LastKnownPushSlot  -            bool RereadPushSlot() {  -                auto oldSlot = LastKnownPushSlot;  -  -                auto currentPushBunch = Writer->GetLastBunch();  -                auto currentPushSlot = AtomicGet(Writer->SlotCounter);  -  -                if (currentPushBunch != LastKnownPushBunch) {  -                    // LastKnownPushBunch could be invalid after this line  -                    LastKnownPushBunch->SetNextToken(currentPushBunch);  -                }  -  -                LastKnownPushBunch = currentPushBunch;  -                LastKnownPushSlot = currentPushSlot;  -  -                return oldSlot != LastKnownPushSlot;  -            }  -  -            bool SwitchToNextBunch() {  -                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {  -                    auto next = ReadBunch->GetNextBunch();  -                    if (next != nullptr) {  -                        ReadBunch = next;  -                        return true;  -                    }  -                    SpinLockPause();  -                }  -                return false;  -            }  -  -            TSlot<TAux> StubbornPopAux() {  -                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {  -                    auto result = ReadBunch->GetSlotAux(ReadSlot);  -                    if (Y_LIKELY(result.Msg != nullptr)) {  -                        ++ReadSlot;  -                        return result;  -                    }  -                    SpinLockPause();  -                }  -  -                return ReadBunch->BlockSlotAux(ReadSlot++, LastKnownPushBunch);  -            }  -        };  -  -        struct TDefaultParams {  -            static constexpr bool DeleteItems = true;  -            using TAux = NObstructiveQueuePrivate::TEmptyAux;  -            using TBunchBase = NObstructiveQueuePrivate::TEmpty;  -            static constexpr ui32 BUNCH_SIZE = 251;  -        };  -  -    } //namespace NObstructiveQueuePrivate  -  -    DeclareTuneValueParam(TObstructiveQueueBunchSize, ui32, BUNCH_SIZE);  -    DeclareTuneValueParam(TObstructiveQueueDeleteItems, bool, DeleteItems);  -    DeclareTuneTypeParam(TObstructiveQueueBunchBase, TBunchBase);  -    DeclareTuneTypeParam(TObstructiveQueueAux, TAux);  -  +#pragma once + +/* +  Semi-wait-free queue, multiple producers - one consumer. Strict order. +  The queue algorithm is using concept of virtual infinite array. + +  A producer takes a number from a counter and atomicaly increments the counter. +  The number taken is a number of a slot for the producer to put a new message +  into infinite array. + +  Then producer constructs a virtual infinite array by bidirectional linked list +  of blocks. Each block contains several slots. + +  There is a hint pointer which optimisticly points to the last block +  of the list and never goes backward. + +  Consumer exploits the property of the hint pointer always going forward +  to free old blocks eventually. Consumer periodically read the hint pointer +  and the counter and thus deduce producers which potentially holds the pointer +  to a block. Consumer can free the block if all that producers filled their +  slots and left the queue. + +  No producer can stop the progress for other producers. + +  Consumer can obstruct a slot of a delayed producer by putting special mark. +  Thus no producer can stop the progress for consumer. +  But a slow producer may be forced to retry unlimited number of times. +  Though it's very unlikely for a non-preempted producer to be obstructed. +  That's why the algorithm is semi-wait-free. + +  WARNING: there is no wait¬ify mechanic for consumer, +  consumer receives nullptr if queue was empty. + +  WARNING: though the algorithm itself is lock-free +  but producers and consumer could be blocked by memory allocator + +  WARNING: copy constructers of the queue are not thread-safe + */ + +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/system/atomic.h> +#include <util/system/spinlock.h> + +#include "tune.h" + +namespace NThreading { +    namespace NObstructiveQueuePrivate { +        typedef void* TMsgLink; + +        struct TEmpty { +        }; + +        struct TEmptyAux { +            TEmptyAux Retrieve() const { +                return TEmptyAux(); +            } +            void Store(TEmptyAux&) { +            } +            static constexpr TEmptyAux Zero() { +                return TEmptyAux(); +            } +        }; + +        template <typename TAux> +        struct TSlot { +            TMsgLink volatile Msg; +            TAux AuxiliaryData; + +            inline void Store(TAux& aux) { +                AuxiliaryData.Store(aux); +            } + +            inline TAux Retrieve() const { +                return AuxiliaryData.Retrieve(); +            } + +            static TSlot<TAux> NullElem() { +                return {nullptr, TAux::Zero()}; +            } + +            static TSlot<TAux> Pair(TMsgLink msg, TAux aux) { +                return {msg, std::move(aux)}; +            } +        }; + +        template <> +        struct TSlot<TEmptyAux> { +            TMsgLink volatile Msg; +            inline void Store(TEmptyAux&) { +            } +            inline TEmptyAux Retrieve() const { +                return TEmptyAux(); +            } + +            static TSlot<TEmptyAux> NullElem() { +                return {nullptr}; +            } + +            static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) { +                return {msg}; +            } +        }; + +        enum TPushResult { +            PUSH_RESULT_OK, +            PUSH_RESULT_BACKWARD, +            PUSH_RESULT_FORWARD, +            PUSH_RESULT_BLOCKED, +        }; + +        template <typename TAux, ui32 BUNCH_SIZE, typename TBase = TEmpty> +        struct TMsgBunch: public TBase { +            ui64 FirstSlot; + +            TSlot<TAux> LinkArray[BUNCH_SIZE]; + +            TMsgBunch* volatile NextBunch; +            TMsgBunch* volatile BackLink; + +            ui64 volatile Token; +            TMsgBunch* volatile NextToken; + +            /* this push can return PUSH_RESULT_BLOCKED */ +            inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) { +                if (Y_UNLIKELY(slot < FirstSlot)) { +                    return PUSH_RESULT_BACKWARD; +                } + +                if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) { +                    return PUSH_RESULT_FORWARD; +                } + +                LinkArray[slot - FirstSlot].Store(auxiliary); + +                auto oldValue = AtomicSwap(&LinkArray[slot - FirstSlot].Msg, msg); + +                if (Y_LIKELY(oldValue == nullptr)) { +                    return PUSH_RESULT_OK; +                } else { +                    LeaveBlocked(oldValue); +                    return PUSH_RESULT_BLOCKED; +                } +            } + +            inline bool IsSlotHere(ui64 slot) { +                return slot < FirstSlot + BUNCH_SIZE; +            } + +            inline TMsgLink GetSlot(ui64 slot) const { +                return AtomicGet(LinkArray[slot - FirstSlot].Msg); +            } + +            inline TSlot<TAux> GetSlotAux(ui64 slot) const { +                auto msg = GetSlot(slot); +                auto aux = LinkArray[slot - FirstSlot].Retrieve(); +                return TSlot<TAux>::Pair(msg, aux); +            } + +            void LeaveBlocked(ui64 slot) { +                auto token = GetToken(slot); +                token->DecrementToken(); +            } + +            void LeaveBlocked(TMsgLink msg) { +                auto token = reinterpret_cast<TMsgBunch*>(msg); +                token->DecrementToken(); +            } + +            TSlot<TAux> BlockSlotAux(ui64 slot, TMsgBunch* token) { +                auto old = +                    AtomicSwap(&LinkArray[slot - FirstSlot].Msg, (TMsgLink)token); +                if (old == nullptr) { +                    // It's valid to increment after AtomicCas +                    // because token will release data only after SetNextToken +                    token->IncrementToken(); +                    return TSlot<TAux>::NullElem(); +                } +                return TSlot<TAux>::Pair(old, LinkArray[slot - FirstSlot].Retrieve()); +            } + +            inline TMsgBunch* GetNextBunch() const { +                return AtomicGet(NextBunch); +            } + +            inline bool SetNextBunch(TMsgBunch* ptr) { +                return AtomicCas(&NextBunch, ptr, nullptr); +            } + +            inline TMsgBunch* GetBackLink() const { +                return AtomicGet(BackLink); +            } + +            inline TMsgBunch* GetToken(ui64 slot) { +                return reinterpret_cast<TMsgBunch*>(LinkArray[slot - FirstSlot].Msg); +            } + +            inline void IncrementToken() { +                AtomicIncrement(Token); +            } + +            // the object could be destroyed after this method +            inline void DecrementToken() { +                if (Y_UNLIKELY(AtomicDecrement(Token) == BUNCH_SIZE)) { +                    Release(this); +                    AtomicGet(NextToken)->DecrementToken(); +                    // this could be invalid here +                } +            } + +            // the object could be destroyed after this method +            inline void SetNextToken(TMsgBunch* next) { +                AtomicSet(NextToken, next); +                if (Y_UNLIKELY(AtomicAdd(Token, BUNCH_SIZE) == BUNCH_SIZE)) { +                    Release(this); +                    next->DecrementToken(); +                } +                // this could be invalid here +            } + +            TMsgBunch(ui64 start, TMsgBunch* backLink) { +                AtomicSet(FirstSlot, start); +                memset(&LinkArray, 0, sizeof(LinkArray)); +                AtomicSet(NextBunch, nullptr); +                AtomicSet(BackLink, backLink); + +                AtomicSet(Token, 1); +                AtomicSet(NextToken, nullptr); +            } + +            static void Release(TMsgBunch* bunch) { +                auto backLink = AtomicGet(bunch->BackLink); +                if (backLink == nullptr) { +                    return; +                } +                AtomicSet(bunch->BackLink, nullptr); + +                do { +                    auto bbackLink = backLink->BackLink; +                    delete backLink; +                    backLink = bbackLink; +                } while (backLink != nullptr); +            } + +            void Destroy() { +                for (auto tail = BackLink; tail != nullptr;) { +                    auto next = tail->BackLink; +                    delete tail; +                    tail = next; +                } + +                for (auto next = this; next != nullptr;) { +                    auto nnext = next->NextBunch; +                    delete next; +                    next = nnext; +                } +            } +        }; + +        template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase = TEmpty> +        class TWriteBucket { +        public: +            static const ui64 GROSS_SIZE; + +            using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>; + +            TWriteBucket(TBunch* bunch = new TBunch(0, nullptr)) +                : LastBunch(bunch) +                , SlotCounter(0) +            { +            } + +            TWriteBucket(TWriteBucket&& move) +                : LastBunch(move.LastBunch) +                , SlotCounter(move.SlotCounter) +            { +                move.LastBunch = nullptr; +            } + +            ~TWriteBucket() { +                if (LastBunch != nullptr) { +                    LastBunch->Destroy(); +                } +            } + +            inline bool Push(TMsgLink msg, TAux aux) { +                ui64 pushSlot = AtomicGetAndIncrement(SlotCounter); +                TBunch* hintBunch = GetLastBunch(); + +                for (;;) { +                    auto hint = hintBunch->Push(msg, pushSlot, aux); +                    if (Y_LIKELY(hint == PUSH_RESULT_OK)) { +                        return true; +                    } +                    bool hhResult = HandleHint(hintBunch, hint); +                    if (Y_UNLIKELY(!hhResult)) { +                        return false; +                    } +                } +            } + +        protected: +            template <typename, ui32, typename> +            friend class TReadBucket; + +            TBunch* volatile LastBunch; // Hint +            volatile ui64 SlotCounter; + +            inline TBunch* GetLastBunch() const { +                return AtomicGet(LastBunch); +            } + +            bool HandleHint(TBunch*& hintBunch, TPushResult hint) { +                if (Y_UNLIKELY(hint == PUSH_RESULT_BLOCKED)) { +                    return false; +                } + +                if (Y_UNLIKELY(hint == PUSH_RESULT_BACKWARD)) { +                    hintBunch = hintBunch->GetBackLink(); +                    return true; +                } + +                // PUSH_RESULT_FORWARD +                auto nextBunch = hintBunch->GetNextBunch(); + +                if (nextBunch == nullptr) { +                    auto first = hintBunch->FirstSlot + BUNCH_SIZE; +                    nextBunch = new TBunch(first, hintBunch); +                    if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) { +                        delete nextBunch; +                        nextBunch = hintBunch->GetNextBunch(); +                    } +                } + +                // hintBunch could not be freed here so it cannot be reused +                // it's alright if this CAS was not succeeded, +                // it means that other thread did that recently +                AtomicCas(&LastBunch, nextBunch, hintBunch); + +                hintBunch = nextBunch; +                return true; +            } +        }; + +        template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase> +        class TReadBucket { +        public: +            static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 20; + +            using TWBucket = TWriteBucket<TAux, BUNCH_SIZE, TBunchBase>; +            using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>; + +            TReadBucket(TWBucket* writer) +                : Writer(writer) +                , ReadBunch(writer->GetLastBunch()) +                , LastKnownPushBunch(writer->GetLastBunch()) +            { +                ReadBunch->DecrementToken(); // no previous token +            } + +            TReadBucket(TReadBucket toCopy, TWBucket* writer) +                : TReadBucket(std::move(toCopy)) +            { +                Writer = writer; +            } + +            ui64 ReadyCount() const { +                return AtomicGet(Writer->SlotCounter) - ReadSlot; +            } + +            inline TMsgLink Pop() { +                return PopAux().Msg; +            } + +            inline TSlot<TAux> PopAux() { +                for (;;) { +                    if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) { +                        if (Y_LIKELY(!RereadPushSlot())) { +                            return TSlot<TAux>::NullElem(); +                        } +                    } + +                    if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) { +                        if (Y_UNLIKELY(!SwitchToNextBunch())) { +                            return TSlot<TAux>::NullElem(); +                        } +                    } + +                    auto result = ReadBunch->GetSlotAux(ReadSlot); +                    if (Y_LIKELY(result.Msg != nullptr)) { +                        ++ReadSlot; +                        return result; +                    } + +                    if (ReadSlot + 1 == AtomicGet(Writer->SlotCounter)) { +                        return TSlot<TAux>::NullElem(); +                    } + +                    result = StubbornPopAux(); + +                    if (result.Msg != nullptr) { +                        return result; +                    } +                } +            } + +        private: +            TWBucket* Writer; +            TBunch* ReadBunch; +            ui64 ReadSlot = 0; +            TBunch* LastKnownPushBunch; +            ui64 LastKnownPushSlot = 0; + +            // MUST BE: ReadSlot == LastKnownPushSlot +            bool RereadPushSlot() { +                auto oldSlot = LastKnownPushSlot; + +                auto currentPushBunch = Writer->GetLastBunch(); +                auto currentPushSlot = AtomicGet(Writer->SlotCounter); + +                if (currentPushBunch != LastKnownPushBunch) { +                    // LastKnownPushBunch could be invalid after this line +                    LastKnownPushBunch->SetNextToken(currentPushBunch); +                } + +                LastKnownPushBunch = currentPushBunch; +                LastKnownPushSlot = currentPushSlot; + +                return oldSlot != LastKnownPushSlot; +            } + +            bool SwitchToNextBunch() { +                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { +                    auto next = ReadBunch->GetNextBunch(); +                    if (next != nullptr) { +                        ReadBunch = next; +                        return true; +                    } +                    SpinLockPause(); +                } +                return false; +            } + +            TSlot<TAux> StubbornPopAux() { +                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { +                    auto result = ReadBunch->GetSlotAux(ReadSlot); +                    if (Y_LIKELY(result.Msg != nullptr)) { +                        ++ReadSlot; +                        return result; +                    } +                    SpinLockPause(); +                } + +                return ReadBunch->BlockSlotAux(ReadSlot++, LastKnownPushBunch); +            } +        }; + +        struct TDefaultParams { +            static constexpr bool DeleteItems = true; +            using TAux = NObstructiveQueuePrivate::TEmptyAux; +            using TBunchBase = NObstructiveQueuePrivate::TEmpty; +            static constexpr ui32 BUNCH_SIZE = 251; +        }; + +    } //namespace NObstructiveQueuePrivate + +    DeclareTuneValueParam(TObstructiveQueueBunchSize, ui32, BUNCH_SIZE); +    DeclareTuneValueParam(TObstructiveQueueDeleteItems, bool, DeleteItems); +    DeclareTuneTypeParam(TObstructiveQueueBunchBase, TBunchBase); +    DeclareTuneTypeParam(TObstructiveQueueAux, TAux); +      template <typename TItem = void, typename... TParams> -    class TObstructiveConsumerAuxQueue {  -    private:  -        using TTuned =  -            TTune<NObstructiveQueuePrivate::TDefaultParams, TParams...>;  -  -        using TAux = typename TTuned::TAux;  -        using TSlot = NObstructiveQueuePrivate::TSlot<TAux>;  -        using TMsgLink = NObstructiveQueuePrivate::TMsgLink;  -        using TBunchBase = typename TTuned::TBunchBase;  -        static constexpr bool DeleteItems = TTuned::DeleteItems;  -        static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE;  -  -    public:  -        TObstructiveConsumerAuxQueue()  -            : RBuckets(&WBucket)  -        {  -        }  -  -        ~TObstructiveConsumerAuxQueue() {  -            if (DeleteItems) {  -                for (;;) {  -                    auto msg = Pop();  -                    if (msg == nullptr) {  -                        break;  -                    }  -                    TDelete::Destroy(msg);  -                }  -            }  -        }  -  -        void Push(TItem* msg) {  -            while (!WBucket.Push(reinterpret_cast<TMsgLink>(msg), TAux())) {  -            }  -        }  -  -        TItem* Pop() {  -            return reinterpret_cast<TItem*>(RBuckets.Pop());  -        }  -  -        TSlot PopAux() {  -            return RBuckets.PopAux();  -        }  -  -    private:  -        NObstructiveQueuePrivate::TWriteBucket<TAux, BUNCH_SIZE, TBunchBase>  -            WBucket;  -        NObstructiveQueuePrivate::TReadBucket<TAux, BUNCH_SIZE, TBunchBase>  -            RBuckets;  -    };  -  -    template <typename TItem = void, bool DeleteItems = true>  -    class TObstructiveConsumerQueue  +    class TObstructiveConsumerAuxQueue { +    private: +        using TTuned = +            TTune<NObstructiveQueuePrivate::TDefaultParams, TParams...>; + +        using TAux = typename TTuned::TAux; +        using TSlot = NObstructiveQueuePrivate::TSlot<TAux>; +        using TMsgLink = NObstructiveQueuePrivate::TMsgLink; +        using TBunchBase = typename TTuned::TBunchBase; +        static constexpr bool DeleteItems = TTuned::DeleteItems; +        static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE; + +    public: +        TObstructiveConsumerAuxQueue() +            : RBuckets(&WBucket) +        { +        } + +        ~TObstructiveConsumerAuxQueue() { +            if (DeleteItems) { +                for (;;) { +                    auto msg = Pop(); +                    if (msg == nullptr) { +                        break; +                    } +                    TDelete::Destroy(msg); +                } +            } +        } + +        void Push(TItem* msg) { +            while (!WBucket.Push(reinterpret_cast<TMsgLink>(msg), TAux())) { +            } +        } + +        TItem* Pop() { +            return reinterpret_cast<TItem*>(RBuckets.Pop()); +        } + +        TSlot PopAux() { +            return RBuckets.PopAux(); +        } + +    private: +        NObstructiveQueuePrivate::TWriteBucket<TAux, BUNCH_SIZE, TBunchBase> +            WBucket; +        NObstructiveQueuePrivate::TReadBucket<TAux, BUNCH_SIZE, TBunchBase> +            RBuckets; +    }; + +    template <typename TItem = void, bool DeleteItems = true> +    class TObstructiveConsumerQueue         : public TObstructiveConsumerAuxQueue<TItem,                                                TObstructiveQueueDeleteItems<DeleteItems>> { -    };  +    };  } diff --git a/library/cpp/threading/queue/queue_ut.cpp b/library/cpp/threading/queue/queue_ut.cpp index 8b36437034c..80eca147da9 100644 --- a/library/cpp/threading/queue/queue_ut.cpp +++ b/library/cpp/threading/queue/queue_ut.cpp @@ -1,242 +1,242 @@  #include <library/cpp/testing/unittest/registar.h> -#include <util/system/thread.h>  -  -#include "ut_helpers.h"  -  -typedef void* TMsgLink;  -  +#include <util/system/thread.h> + +#include "ut_helpers.h" + +typedef void* TMsgLink; +  template <typename TQueueType> -class TQueueTestProcs: public TTestBase {  -private:  +class TQueueTestProcs: public TTestBase { +private:      UNIT_TEST_SUITE_DEMANGLE(TQueueTestProcs<TQueueType>); -    UNIT_TEST(Threads2_Push1M_Threads1_Pop2M)  -    UNIT_TEST(Threads4_Push1M_Threads1_Pop4M)  -    UNIT_TEST(Threads8_RndPush100K_Threads8_Queues)  +    UNIT_TEST(Threads2_Push1M_Threads1_Pop2M) +    UNIT_TEST(Threads4_Push1M_Threads1_Pop4M) +    UNIT_TEST(Threads8_RndPush100K_Threads8_Queues)      /* -    UNIT_TEST(Threads24_RndPush100K_Threads24_Queues)  -    UNIT_TEST(Threads24_RndPush100K_Threads8_Queues)  -    UNIT_TEST(Threads24_RndPush100K_Threads4_Queues)  -*/  -    UNIT_TEST_SUITE_END();  -  -public:  -    void Push1M_Pop1M() {  +    UNIT_TEST(Threads24_RndPush100K_Threads24_Queues) +    UNIT_TEST(Threads24_RndPush100K_Threads8_Queues) +    UNIT_TEST(Threads24_RndPush100K_Threads4_Queues) +*/ +    UNIT_TEST_SUITE_END(); + +public: +    void Push1M_Pop1M() {          TQueueType queue; -        TMsgLink msg = &queue;  -  -        auto pmsg = queue.Pop();  -        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);  -  -        for (int i = 0; i < 1000000; ++i) {  -            queue.Push((char*)msg + i);  -        }  -  -        for (int i = 0; i < 1000000; ++i) {  -            auto popped = queue.Pop();  -            UNIT_ASSERT_EQUAL((char*)msg + i, popped);  -        }  -  -        pmsg = queue.Pop();  -        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);  -    }  -  -    void Threads2_Push1M_Threads1_Pop2M() {  +        TMsgLink msg = &queue; + +        auto pmsg = queue.Pop(); +        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); + +        for (int i = 0; i < 1000000; ++i) { +            queue.Push((char*)msg + i); +        } + +        for (int i = 0; i < 1000000; ++i) { +            auto popped = queue.Pop(); +            UNIT_ASSERT_EQUAL((char*)msg + i, popped); +        } + +        pmsg = queue.Pop(); +        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); +    } + +    void Threads2_Push1M_Threads1_Pop2M() {          TQueueType queue; -  +          class TPusherThread: public ISimpleThread { -        public:  +        public:              TPusherThread(TQueueType& theQueue, char* start)                  : Queue(theQueue) -                , Arg(start)  -            {  -            }  -  +                , Arg(start) +            { +            } +              TQueueType& Queue; -            char* Arg;  -  -            void* ThreadProc() override {  -                for (int i = 0; i < 1000000; ++i) {  -                    Queue.Push(Arg + i);  -                }  -                return nullptr;  -            }  -        };  -  -        TPusherThread pusher1(queue, (char*)&queue);  -        TPusherThread pusher2(queue, (char*)&queue + 2000000);  -  -        pusher1.Start();  -        pusher2.Start();  -  -        for (int i = 0; i < 2000000; ++i) {  -            while (queue.Pop() == nullptr) {  -                SpinLockPause();  -            }  -        }  -  -        auto pmsg = queue.Pop();  -        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);  -    }  -  -    void Threads4_Push1M_Threads1_Pop4M() {  +            char* Arg; + +            void* ThreadProc() override { +                for (int i = 0; i < 1000000; ++i) { +                    Queue.Push(Arg + i); +                } +                return nullptr; +            } +        }; + +        TPusherThread pusher1(queue, (char*)&queue); +        TPusherThread pusher2(queue, (char*)&queue + 2000000); + +        pusher1.Start(); +        pusher2.Start(); + +        for (int i = 0; i < 2000000; ++i) { +            while (queue.Pop() == nullptr) { +                SpinLockPause(); +            } +        } + +        auto pmsg = queue.Pop(); +        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); +    } + +    void Threads4_Push1M_Threads1_Pop4M() {          TQueueType queue; -  +          class TPusherThread: public ISimpleThread { -        public:  +        public:              TPusherThread(TQueueType& theQueue, char* start)                  : Queue(theQueue) -                , Arg(start)  -            {  -            }  -  +                , Arg(start) +            { +            } +              TQueueType& Queue; -            char* Arg;  -  -            void* ThreadProc() override {  -                for (int i = 0; i < 1000000; ++i) {  -                    Queue.Push(Arg + i);  -                }  -                return nullptr;  -            }  -        };  -  -        TPusherThread pusher1(queue, (char*)&queue);  -        TPusherThread pusher2(queue, (char*)&queue + 2000000);  -        TPusherThread pusher3(queue, (char*)&queue + 4000000);  -        TPusherThread pusher4(queue, (char*)&queue + 6000000);  -  -        pusher1.Start();  -        pusher2.Start();  -        pusher3.Start();  -        pusher4.Start();  -  -        for (int i = 0; i < 4000000; ++i) {  -            while (queue.Pop() == nullptr) {  -                SpinLockPause();  -            }  -        }  -  -        auto pmsg = queue.Pop();  -        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);  -    }  -  -    template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES>  -    void ManyRndPush100K_ManyQueues() {  +            char* Arg; + +            void* ThreadProc() override { +                for (int i = 0; i < 1000000; ++i) { +                    Queue.Push(Arg + i); +                } +                return nullptr; +            } +        }; + +        TPusherThread pusher1(queue, (char*)&queue); +        TPusherThread pusher2(queue, (char*)&queue + 2000000); +        TPusherThread pusher3(queue, (char*)&queue + 4000000); +        TPusherThread pusher4(queue, (char*)&queue + 6000000); + +        pusher1.Start(); +        pusher2.Start(); +        pusher3.Start(); +        pusher4.Start(); + +        for (int i = 0; i < 4000000; ++i) { +            while (queue.Pop() == nullptr) { +                SpinLockPause(); +            } +        } + +        auto pmsg = queue.Pop(); +        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); +    } + +    template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES> +    void ManyRndPush100K_ManyQueues() {          TQueueType queue[NUMBER_OF_QUEUES]; -  +          class TPusherThread: public ISimpleThread { -        public:  +        public:              TPusherThread(TQueueType* queues, char* start) -                : Queues(queues)  -                , Arg(start)  -            {  -            }  -  +                : Queues(queues) +                , Arg(start) +            { +            } +              TQueueType* Queues; -            char* Arg;  -  -            void* ThreadProc() override {  -                ui64 counters[NUMBER_OF_QUEUES];  -                for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {  -                    counters[i] = 0;  -                }  -  -                for (int i = 0; i < 100000; ++i) {  -                    size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES;  -                    int cookie = counters[rnd]++;  -                    Queues[rnd].Push(Arg + cookie);  -                }  -  -                for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {  -                    Queues[i].Push((void*)2ULL);  -                }  -  -                return nullptr;  -            }  -        };  -  +            char* Arg; + +            void* ThreadProc() override { +                ui64 counters[NUMBER_OF_QUEUES]; +                for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { +                    counters[i] = 0; +                } + +                for (int i = 0; i < 100000; ++i) { +                    size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES; +                    int cookie = counters[rnd]++; +                    Queues[rnd].Push(Arg + cookie); +                } + +                for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { +                    Queues[i].Push((void*)2ULL); +                } + +                return nullptr; +            } +        }; +          class TPopperThread: public ISimpleThread { -        public:  +        public:              TPopperThread(TQueueType* theQueue, char* base)                  : Queue(theQueue) -                , Base(base)  -            {  -            }  -  +                , Base(base) +            { +            } +              TQueueType* Queue; -            char* Base;  -  -            void* ThreadProc() override {  -                ui64 counters[NUMBER_OF_PUSHERS];  -                for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {  -                    counters[i] = 0;  -                }  -  -                for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) {  -                    auto msg = Queue->Pop();  -                    if (msg == nullptr) {  -                        SpinLockPause();  -                        continue;  -                    }  -                    if (msg == (void*)2ULL) {  -                        ++fin;  -                        continue;  -                    }  -                    ui64 shift = (char*)msg - Base;  -                    auto pusherNum = shift / 200000000ULL;  -                    auto msgNum = shift % 200000000ULL;  -  -                    UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum);  -                    ++counters[pusherNum];  -                }  -  -                auto pmsg = Queue->Pop();  -                UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);  -  -                return nullptr;  -            }  -        };  -  +            char* Base; + +            void* ThreadProc() override { +                ui64 counters[NUMBER_OF_PUSHERS]; +                for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { +                    counters[i] = 0; +                } + +                for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) { +                    auto msg = Queue->Pop(); +                    if (msg == nullptr) { +                        SpinLockPause(); +                        continue; +                    } +                    if (msg == (void*)2ULL) { +                        ++fin; +                        continue; +                    } +                    ui64 shift = (char*)msg - Base; +                    auto pusherNum = shift / 200000000ULL; +                    auto msgNum = shift % 200000000ULL; + +                    UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum); +                    ++counters[pusherNum]; +                } + +                auto pmsg = Queue->Pop(); +                UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); + +                return nullptr; +            } +        }; +          TVector<TAutoPtr<TPopperThread>> poppers;          TVector<TAutoPtr<TPusherThread>> pushers; -  -        for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {  -            poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue));  -            poppers.back()->Start();  -        }  -  -        for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {  -            pushers.emplace_back(  -                new TPusherThread(queue, (char*)&queue + 200000000ULL * i));  -            pushers.back()->Start();  -        }  -  -        for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {  -            poppers[i]->Join();  -        }  -  -        for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {  -            pushers[i]->Join();  -        }  -    }  -  -    void Threads8_RndPush100K_Threads8_Queues() {  -        ManyRndPush100K_ManyQueues<8, 8>();  -    }  -  -    /*  -    void Threads24_RndPush100K_Threads24_Queues() {  -        ManyRndPush100K_ManyQueues<24, 24>();  -    }  -  -    void Threads24_RndPush100K_Threads8_Queues() {  -        ManyRndPush100K_ManyQueues<24, 8>();  -    }  -  -    void Threads24_RndPush100K_Threads4_Queues() {  -        ManyRndPush100K_ManyQueues<24, 4>();  -    }  -    */  -};  -  -REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestProcs);  + +        for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { +            poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue)); +            poppers.back()->Start(); +        } + +        for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { +            pushers.emplace_back( +                new TPusherThread(queue, (char*)&queue + 200000000ULL * i)); +            pushers.back()->Start(); +        } + +        for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { +            poppers[i]->Join(); +        } + +        for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { +            pushers[i]->Join(); +        } +    } + +    void Threads8_RndPush100K_Threads8_Queues() { +        ManyRndPush100K_ManyQueues<8, 8>(); +    } + +    /* +    void Threads24_RndPush100K_Threads24_Queues() { +        ManyRndPush100K_ManyQueues<24, 24>(); +    } + +    void Threads24_RndPush100K_Threads8_Queues() { +        ManyRndPush100K_ManyQueues<24, 8>(); +    } + +    void Threads24_RndPush100K_Threads4_Queues() { +        ManyRndPush100K_ManyQueues<24, 4>(); +    } +    */ +}; + +REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestProcs); diff --git a/library/cpp/threading/queue/tune.h b/library/cpp/threading/queue/tune.h index 43ad5efe3ef..50fc3dc17cd 100644 --- a/library/cpp/threading/queue/tune.h +++ b/library/cpp/threading/queue/tune.h @@ -1,101 +1,101 @@ -#pragma once  -  -/*  -  Motivation: consider you have a template class with many parameters  -  with default associations  -  -  template <typename A = TDefA,  -            typename B = TDefB,  -            typename C = TDefC,  -            typename D = TDefD>  -  class TExample {  -  };  -  -  consider you would like to provide easy to use interface to tune all  -  these parameters in position independed manner,  -  In that case TTune would be helpful for you.  -  -  How to use:  -  First step: declare a struct with all default associations  -  -  struct TDefaultTune {  -      using TStructA = TDefA;  -      using TStructB = TDefB;  -      using TStructC = TDefC;  -      using TStructD = TDefD;  -  };  -  -  Second step: declare helper names visible to a user  -  -  DeclareTuneTypeParam(TTuneParamA, TStructA);  -  DeclareTuneTypeParam(TTuneParamB, TStructB);  -  DeclareTuneTypeParam(TTuneParamC, TStructC);  -  DeclareTuneTypeParam(TTuneParamD, TStructD);  -  -  Third step: declare TExample this way:  -  -  template <typename...TParams>  -  class TExample {  -      using TMyParams = TTune<TDefaultTune, TParams...>;  -  -      using TActualA = TMyParams::TStructA;  -      using TActualB = TMyParams::TStructB;  -      ...  -  };  -  -  TTune<TDefaultTune, TParams...> is a struct with the default parameteres  -  taken from TDefaultTune and overridden from "TParams...".  -  -  for example:  "TTune<TDefaultTune, TTuneParamC<TUserClass>>"  -  will be virtually the same as:  -  -  struct TTunedClass {  -      using TStructA = TDefA;  -      using TStructB = TDefB;  -      using TStructC = TUserClass;  -      using TStructD = TDefD;  -  };  -  -  From now on you can tune your TExample in the following manner:  -  -  using TCustomClass =  -      TExample <TTuneParamA<TUserStruct1>, TTuneParamD<TUserStruct2>>;  -  -  You can also tweak constant expressions in your TDefaultTune.  -  Consider you have:  -  -  struct TDefaultTune {  -      static constexpr ui32 MySize = 42;  -  };  -  -  declare an interface to modify the parameter this way:  -  -  DeclareTuneValueParam(TStructSize, ui32, MySize);  -  -  and tweak your class:  -  -  using TTwiceBigger = TExample<TStructSize<84>>;  -  - */  -  -#define DeclareTuneTypeParam(TParamName, InternalName) \  -    template <typename TNewType>                       \  -    struct TParamName {                                \  -        template <typename TBase>                      \  -        struct TApply: public TBase {                  \  -            using InternalName = TNewType;             \  -        };                                             \  -    }  -  -#define DeclareTuneValueParam(TParamName, TValueType, InternalName) \  -    template <TValueType NewValue>                                  \  -    struct TParamName {                                             \  -        template <typename TBase>                                   \  -        struct TApply: public TBase {                               \  -            static constexpr TValueType InternalName = NewValue;    \  -        };                                                          \  -    }  -  +#pragma once + +/* +  Motivation: consider you have a template class with many parameters +  with default associations + +  template <typename A = TDefA, +            typename B = TDefB, +            typename C = TDefC, +            typename D = TDefD> +  class TExample { +  }; + +  consider you would like to provide easy to use interface to tune all +  these parameters in position independed manner, +  In that case TTune would be helpful for you. + +  How to use: +  First step: declare a struct with all default associations + +  struct TDefaultTune { +      using TStructA = TDefA; +      using TStructB = TDefB; +      using TStructC = TDefC; +      using TStructD = TDefD; +  }; + +  Second step: declare helper names visible to a user + +  DeclareTuneTypeParam(TTuneParamA, TStructA); +  DeclareTuneTypeParam(TTuneParamB, TStructB); +  DeclareTuneTypeParam(TTuneParamC, TStructC); +  DeclareTuneTypeParam(TTuneParamD, TStructD); + +  Third step: declare TExample this way: + +  template <typename...TParams> +  class TExample { +      using TMyParams = TTune<TDefaultTune, TParams...>; + +      using TActualA = TMyParams::TStructA; +      using TActualB = TMyParams::TStructB; +      ... +  }; + +  TTune<TDefaultTune, TParams...> is a struct with the default parameteres +  taken from TDefaultTune and overridden from "TParams...". + +  for example:  "TTune<TDefaultTune, TTuneParamC<TUserClass>>" +  will be virtually the same as: + +  struct TTunedClass { +      using TStructA = TDefA; +      using TStructB = TDefB; +      using TStructC = TUserClass; +      using TStructD = TDefD; +  }; + +  From now on you can tune your TExample in the following manner: + +  using TCustomClass = +      TExample <TTuneParamA<TUserStruct1>, TTuneParamD<TUserStruct2>>; + +  You can also tweak constant expressions in your TDefaultTune. +  Consider you have: + +  struct TDefaultTune { +      static constexpr ui32 MySize = 42; +  }; + +  declare an interface to modify the parameter this way: + +  DeclareTuneValueParam(TStructSize, ui32, MySize); + +  and tweak your class: + +  using TTwiceBigger = TExample<TStructSize<84>>; + + */ + +#define DeclareTuneTypeParam(TParamName, InternalName) \ +    template <typename TNewType>                       \ +    struct TParamName {                                \ +        template <typename TBase>                      \ +        struct TApply: public TBase {                  \ +            using InternalName = TNewType;             \ +        };                                             \ +    } + +#define DeclareTuneValueParam(TParamName, TValueType, InternalName) \ +    template <TValueType NewValue>                                  \ +    struct TParamName {                                             \ +        template <typename TBase>                                   \ +        struct TApply: public TBase {                               \ +            static constexpr TValueType InternalName = NewValue;    \ +        };                                                          \ +    } +  #define DeclareTuneContainer(TParamName, InternalName)              \      template <template <typename, typename...> class TNewContainer> \      struct TParamName {                                             \ @@ -104,22 +104,22 @@              template <typename TElem, typename... TRest>            \              using InternalName = TNewContainer<TElem, TRest...>;    \          };                                                          \ -    }  -  -namespace NTunePrivate {  -    template <typename TBase, typename... TParams>  -    struct TFold;  -  -    template <typename TBase>  -    struct TFold<TBase>: public TBase {  -    };  -  -    template <typename TBase, typename TFirstArg, typename... TRest>  -    struct TFold<TBase, TFirstArg, TRest...>  -       : public TFold<typename TFirstArg::template TApply<TBase>, TRest...> {  -    };  -}  -  -template <typename TDefault, typename... TParams>  -struct TTune: public NTunePrivate::TFold<TDefault, TParams...> {  -};  +    } + +namespace NTunePrivate { +    template <typename TBase, typename... TParams> +    struct TFold; + +    template <typename TBase> +    struct TFold<TBase>: public TBase { +    }; + +    template <typename TBase, typename TFirstArg, typename... TRest> +    struct TFold<TBase, TFirstArg, TRest...> +       : public TFold<typename TFirstArg::template TApply<TBase>, TRest...> { +    }; +} + +template <typename TDefault, typename... TParams> +struct TTune: public NTunePrivate::TFold<TDefault, TParams...> { +}; diff --git a/library/cpp/threading/queue/tune_ut.cpp b/library/cpp/threading/queue/tune_ut.cpp index 64bc8fd4279..7e980d3e27e 100644 --- a/library/cpp/threading/queue/tune_ut.cpp +++ b/library/cpp/threading/queue/tune_ut.cpp @@ -1,118 +1,118 @@  #include <library/cpp/testing/unittest/registar.h> -#include "tune.h"  -  -struct TDefaultStructA {  -};  -  -struct TDefaultStructB {  -};  -  -struct TDefaults {  -    using TStructA = TDefaultStructA;  -    using TStructB = TDefaultStructB;  -    static constexpr ui32 Param1 = 42;  -    static constexpr ui32 Param2 = 42;  -};  -  -DeclareTuneTypeParam(TweakStructA, TStructA);  -DeclareTuneTypeParam(TweakStructB, TStructB);  -DeclareTuneValueParam(TweakParam1, ui32, Param1);  -DeclareTuneValueParam(TweakParam2, ui32, Param2);  -  +#include "tune.h" + +struct TDefaultStructA { +}; + +struct TDefaultStructB { +}; + +struct TDefaults { +    using TStructA = TDefaultStructA; +    using TStructB = TDefaultStructB; +    static constexpr ui32 Param1 = 42; +    static constexpr ui32 Param2 = 42; +}; + +DeclareTuneTypeParam(TweakStructA, TStructA); +DeclareTuneTypeParam(TweakStructB, TStructB); +DeclareTuneValueParam(TweakParam1, ui32, Param1); +DeclareTuneValueParam(TweakParam2, ui32, Param2); +  Y_UNIT_TEST_SUITE(TestTuning) {      Y_UNIT_TEST(Defaults) { -        using TTuned = TTune<TDefaults>;  -        using TunedA = TTuned::TStructA;  -        using TunedB = TTuned::TStructB;  -        auto sameA = std::is_same<TDefaultStructA, TunedA>::value;  -        auto sameB = std::is_same<TDefaultStructB, TunedB>::value;  -        auto param1 = TTuned::Param1;  -        auto param2 = TTuned::Param2;  -  -        UNIT_ASSERT(sameA);  -        UNIT_ASSERT(sameB);  -        UNIT_ASSERT_EQUAL(param1, 42);  -        UNIT_ASSERT_EQUAL(param2, 42);  -    }  -  +        using TTuned = TTune<TDefaults>; +        using TunedA = TTuned::TStructA; +        using TunedB = TTuned::TStructB; +        auto sameA = std::is_same<TDefaultStructA, TunedA>::value; +        auto sameB = std::is_same<TDefaultStructB, TunedB>::value; +        auto param1 = TTuned::Param1; +        auto param2 = TTuned::Param2; + +        UNIT_ASSERT(sameA); +        UNIT_ASSERT(sameB); +        UNIT_ASSERT_EQUAL(param1, 42); +        UNIT_ASSERT_EQUAL(param2, 42); +    } +      Y_UNIT_TEST(TuneStructA) { -        struct TMyStruct {  -        };  -  -        using TTuned = TTune<TDefaults, TweakStructA<TMyStruct>>;  -  -        using TunedA = TTuned::TStructA;  -        using TunedB = TTuned::TStructB;  -        //auto sameA = std::is_same<TDefaultStructA, TunedA>::value;  -        auto sameB = std::is_same<TDefaultStructB, TunedB>::value;  -        auto param1 = TTuned::Param1;  -        auto param2 = TTuned::Param2;  -  -        auto sameA = std::is_same<TMyStruct, TunedA>::value;  -  -        UNIT_ASSERT(sameA);  -        UNIT_ASSERT(sameB);  -        UNIT_ASSERT_EQUAL(param1, 42);  -        UNIT_ASSERT_EQUAL(param2, 42);  -    }  -  +        struct TMyStruct { +        }; + +        using TTuned = TTune<TDefaults, TweakStructA<TMyStruct>>; + +        using TunedA = TTuned::TStructA; +        using TunedB = TTuned::TStructB; +        //auto sameA = std::is_same<TDefaultStructA, TunedA>::value; +        auto sameB = std::is_same<TDefaultStructB, TunedB>::value; +        auto param1 = TTuned::Param1; +        auto param2 = TTuned::Param2; + +        auto sameA = std::is_same<TMyStruct, TunedA>::value; + +        UNIT_ASSERT(sameA); +        UNIT_ASSERT(sameB); +        UNIT_ASSERT_EQUAL(param1, 42); +        UNIT_ASSERT_EQUAL(param2, 42); +    } +      Y_UNIT_TEST(TuneParam1) { -        using TTuned = TTune<TDefaults, TweakParam1<24>>;  -  -        using TunedA = TTuned::TStructA;  -        using TunedB = TTuned::TStructB;  -        auto sameA = std::is_same<TDefaultStructA, TunedA>::value;  -        auto sameB = std::is_same<TDefaultStructB, TunedB>::value;  -        auto param1 = TTuned::Param1;  -        auto param2 = TTuned::Param2;  -  -        UNIT_ASSERT(sameA);  -        UNIT_ASSERT(sameB);  -        UNIT_ASSERT_EQUAL(param1, 24);  -        UNIT_ASSERT_EQUAL(param2, 42);  -    }  -  +        using TTuned = TTune<TDefaults, TweakParam1<24>>; + +        using TunedA = TTuned::TStructA; +        using TunedB = TTuned::TStructB; +        auto sameA = std::is_same<TDefaultStructA, TunedA>::value; +        auto sameB = std::is_same<TDefaultStructB, TunedB>::value; +        auto param1 = TTuned::Param1; +        auto param2 = TTuned::Param2; + +        UNIT_ASSERT(sameA); +        UNIT_ASSERT(sameB); +        UNIT_ASSERT_EQUAL(param1, 24); +        UNIT_ASSERT_EQUAL(param2, 42); +    } +      Y_UNIT_TEST(TuneStructAAndParam1) { -        struct TMyStruct {  -        };  -  -        using TTuned =  -            TTune<TDefaults, TweakStructA<TMyStruct>, TweakParam1<24>>;  -  -        using TunedA = TTuned::TStructA;  -        using TunedB = TTuned::TStructB;  -        //auto sameA = std::is_same<TDefaultStructA, TunedA>::value;  -        auto sameB = std::is_same<TDefaultStructB, TunedB>::value;  -        auto param1 = TTuned::Param1;  -        auto param2 = TTuned::Param2;  -  -        auto sameA = std::is_same<TMyStruct, TunedA>::value;  -  -        UNIT_ASSERT(sameA);  -        UNIT_ASSERT(sameB);  -        UNIT_ASSERT_EQUAL(param1, 24);  -        UNIT_ASSERT_EQUAL(param2, 42);  -    }  -  +        struct TMyStruct { +        }; + +        using TTuned = +            TTune<TDefaults, TweakStructA<TMyStruct>, TweakParam1<24>>; + +        using TunedA = TTuned::TStructA; +        using TunedB = TTuned::TStructB; +        //auto sameA = std::is_same<TDefaultStructA, TunedA>::value; +        auto sameB = std::is_same<TDefaultStructB, TunedB>::value; +        auto param1 = TTuned::Param1; +        auto param2 = TTuned::Param2; + +        auto sameA = std::is_same<TMyStruct, TunedA>::value; + +        UNIT_ASSERT(sameA); +        UNIT_ASSERT(sameB); +        UNIT_ASSERT_EQUAL(param1, 24); +        UNIT_ASSERT_EQUAL(param2, 42); +    } +      Y_UNIT_TEST(TuneParam1AndStructA) { -        struct TMyStruct {  -        };  -  -        using TTuned =  -            TTune<TDefaults, TweakParam1<24>, TweakStructA<TMyStruct>>;  -  -        using TunedA = TTuned::TStructA;  -        using TunedB = TTuned::TStructB;  -        //auto sameA = std::is_same<TDefaultStructA, TunedA>::value;  -        auto sameB = std::is_same<TDefaultStructB, TunedB>::value;  -        auto param1 = TTuned::Param1;  -        auto param2 = TTuned::Param2;  -  -        auto sameA = std::is_same<TMyStruct, TunedA>::value;  -  -        UNIT_ASSERT(sameA);  -        UNIT_ASSERT(sameB);  -        UNIT_ASSERT_EQUAL(param1, 24);  -        UNIT_ASSERT_EQUAL(param2, 42);  -    }  -}  +        struct TMyStruct { +        }; + +        using TTuned = +            TTune<TDefaults, TweakParam1<24>, TweakStructA<TMyStruct>>; + +        using TunedA = TTuned::TStructA; +        using TunedB = TTuned::TStructB; +        //auto sameA = std::is_same<TDefaultStructA, TunedA>::value; +        auto sameB = std::is_same<TDefaultStructB, TunedB>::value; +        auto param1 = TTuned::Param1; +        auto param2 = TTuned::Param2; + +        auto sameA = std::is_same<TMyStruct, TunedA>::value; + +        UNIT_ASSERT(sameA); +        UNIT_ASSERT(sameB); +        UNIT_ASSERT_EQUAL(param1, 24); +        UNIT_ASSERT_EQUAL(param2, 42); +    } +} diff --git a/library/cpp/threading/queue/unordered_ut.cpp b/library/cpp/threading/queue/unordered_ut.cpp index 2018538bf77..a43b7f520e5 100644 --- a/library/cpp/threading/queue/unordered_ut.cpp +++ b/library/cpp/threading/queue/unordered_ut.cpp @@ -1,154 +1,154 @@  #include <library/cpp/testing/unittest/registar.h> -#include <util/system/thread.h>  -#include <algorithm>  -#include <util/generic/vector.h>  -#include <util/random/fast.h>  -  -#include "ut_helpers.h"  -  +#include <util/system/thread.h> +#include <algorithm> +#include <util/generic/vector.h> +#include <util/random/fast.h> + +#include "ut_helpers.h" +  template <typename TQueueType> -class TTestUnorderedQueue: public TTestBase {  -private:  -    using TLink = TIntrusiveLink;  -  +class TTestUnorderedQueue: public TTestBase { +private: +    using TLink = TIntrusiveLink; +      UNIT_TEST_SUITE_DEMANGLE(TTestUnorderedQueue<TQueueType>); -    UNIT_TEST(Push1M_Pop1M_Unordered)  -    UNIT_TEST_SUITE_END();  -  -public:  -    void Push1M_Pop1M_Unordered() {  -        constexpr int REPEAT = 1000000;  +    UNIT_TEST(Push1M_Pop1M_Unordered) +    UNIT_TEST_SUITE_END(); + +public: +    void Push1M_Pop1M_Unordered() { +        constexpr int REPEAT = 1000000;          TQueueType queue; -        TLink msg[REPEAT];  -  -        auto pmsg = queue.Pop();  -        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);  -  -        for (int i = 0; i < REPEAT; ++i) {  -            queue.Push(&msg[i]);  -        }  -  +        TLink msg[REPEAT]; + +        auto pmsg = queue.Pop(); +        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); + +        for (int i = 0; i < REPEAT; ++i) { +            queue.Push(&msg[i]); +        } +          TVector<TLink*> popped; -        popped.reserve(REPEAT);  -        for (int i = 0; i < REPEAT; ++i) {  -            popped.push_back((TLink*)queue.Pop());  -        }  -  -        pmsg = queue.Pop();  -        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);  -  -        std::sort(popped.begin(), popped.end());  -        for (int i = 0; i < REPEAT; ++i) {  -            UNIT_ASSERT_VALUES_EQUAL(&msg[i], popped[i]);  -        }  -    }  -};  -  +        popped.reserve(REPEAT); +        for (int i = 0; i < REPEAT; ++i) { +            popped.push_back((TLink*)queue.Pop()); +        } + +        pmsg = queue.Pop(); +        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); + +        std::sort(popped.begin(), popped.end()); +        for (int i = 0; i < REPEAT; ++i) { +            UNIT_ASSERT_VALUES_EQUAL(&msg[i], popped[i]); +        } +    } +}; +  template <typename TQueueType> -class TTestWeakQueue: public TTestBase {  -private:  +class TTestWeakQueue: public TTestBase { +private:      UNIT_TEST_SUITE_DEMANGLE(TTestWeakQueue<TQueueType>); -    UNIT_TEST(Threads8_Rnd_Exchange)  -    UNIT_TEST_SUITE_END();  -  -public:  -    template <ui16 COUNT = 48, ui32 MSG_COUNT = 10000>  -    void ManyThreadsRndExchange() {  +    UNIT_TEST(Threads8_Rnd_Exchange) +    UNIT_TEST_SUITE_END(); + +public: +    template <ui16 COUNT = 48, ui32 MSG_COUNT = 10000> +    void ManyThreadsRndExchange() {          TQueueType queues[COUNT]; -  +          class TWorker: public ISimpleThread { -        public:  -            TWorker(  +        public: +            TWorker(                  TQueueType* queues_,                  ui16 mine,                  TAtomic* pushDone) -                : Queues(queues_)  -                , MineQueue(mine)  -                , PushDone(pushDone)  -            {  -            }  -  +                : Queues(queues_) +                , MineQueue(mine) +                , PushDone(pushDone) +            { +            } +              TQueueType* Queues; -            ui16 MineQueue;  +            ui16 MineQueue;              TVector<uintptr_t> Received; -            TAtomic* PushDone;  -  -            void* ThreadProc() override {  -                TReallyFastRng32 rng(GetCycleCount());  -                Received.reserve(MSG_COUNT * 2);  -  -                for (ui32 loop = 1; loop <= MSG_COUNT; ++loop) {  -                    for (;;) {  -                        auto msg = Queues[MineQueue].Pop();  -                        if (msg == nullptr) {  -                            break;  -                        }  -  -                        Received.push_back((uintptr_t)msg);  -                    }  -  -                    ui16 rnd = rng.GenRand64() % COUNT;  -                    ui64 msg = ((ui64)MineQueue << 32) + loop;  -                    while (!Queues[rnd].Push((void*)msg)) {  -                    }  -                }  -  -                AtomicIncrement(*PushDone);  -  -                for (;;) {  -                    bool isItLast = AtomicGet(*PushDone) == COUNT;  -                    auto msg = Queues[MineQueue].Pop();  -                    if (msg != nullptr) {  -                        Received.push_back((uintptr_t)msg);  -                    } else {  -                        if (isItLast) {  -                            break;  -                        }  -                        SpinLockPause();  -                    }  -                }  -  -                for (ui64 last = 0;;) {  -                    auto msg = Queues[MineQueue].UnsafeScanningPop(&last);  -                    if (msg == nullptr) {  -                        break;  -                    }  -                    Received.push_back((uintptr_t)msg);  -                }  -  -                return nullptr;  -            }  -        };  -  +            TAtomic* PushDone; + +            void* ThreadProc() override { +                TReallyFastRng32 rng(GetCycleCount()); +                Received.reserve(MSG_COUNT * 2); + +                for (ui32 loop = 1; loop <= MSG_COUNT; ++loop) { +                    for (;;) { +                        auto msg = Queues[MineQueue].Pop(); +                        if (msg == nullptr) { +                            break; +                        } + +                        Received.push_back((uintptr_t)msg); +                    } + +                    ui16 rnd = rng.GenRand64() % COUNT; +                    ui64 msg = ((ui64)MineQueue << 32) + loop; +                    while (!Queues[rnd].Push((void*)msg)) { +                    } +                } + +                AtomicIncrement(*PushDone); + +                for (;;) { +                    bool isItLast = AtomicGet(*PushDone) == COUNT; +                    auto msg = Queues[MineQueue].Pop(); +                    if (msg != nullptr) { +                        Received.push_back((uintptr_t)msg); +                    } else { +                        if (isItLast) { +                            break; +                        } +                        SpinLockPause(); +                    } +                } + +                for (ui64 last = 0;;) { +                    auto msg = Queues[MineQueue].UnsafeScanningPop(&last); +                    if (msg == nullptr) { +                        break; +                    } +                    Received.push_back((uintptr_t)msg); +                } + +                return nullptr; +            } +        }; +          TVector<TAutoPtr<TWorker>> workers; -        TAtomic pushDone = 0;  -  -        for (ui32 i = 0; i < COUNT; ++i) {  -            workers.emplace_back(new TWorker(&queues[0], i, &pushDone));  -            workers.back()->Start();  -        }  -  +        TAtomic pushDone = 0; + +        for (ui32 i = 0; i < COUNT; ++i) { +            workers.emplace_back(new TWorker(&queues[0], i, &pushDone)); +            workers.back()->Start(); +        } +          TVector<uintptr_t> all; -        for (ui32 i = 0; i < COUNT; ++i) {  -            workers[i]->Join();  -            all.insert(all.begin(),  +        for (ui32 i = 0; i < COUNT; ++i) { +            workers[i]->Join(); +            all.insert(all.begin(),                         workers[i]->Received.begin(), workers[i]->Received.end()); -        }  -  -        std::sort(all.begin(), all.end());  -        auto iter = all.begin();  -        for (ui32 i = 0; i < COUNT; ++i) {  -            for (ui32 k = 1; k <= MSG_COUNT; ++k) {  -                UNIT_ASSERT_VALUES_EQUAL(((ui64)i << 32) + k, *iter);  -                ++iter;  -            }  -        }  -    }  -  -    void Threads8_Rnd_Exchange() {  -        ManyThreadsRndExchange<8>();  -    }  -};  -  -REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TTestUnorderedQueue);  -UNIT_TEST_SUITE_REGISTRATION(TTestWeakQueue<TMPMCUnorderedRing>);  +        } + +        std::sort(all.begin(), all.end()); +        auto iter = all.begin(); +        for (ui32 i = 0; i < COUNT; ++i) { +            for (ui32 k = 1; k <= MSG_COUNT; ++k) { +                UNIT_ASSERT_VALUES_EQUAL(((ui64)i << 32) + k, *iter); +                ++iter; +            } +        } +    } + +    void Threads8_Rnd_Exchange() { +        ManyThreadsRndExchange<8>(); +    } +}; + +REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TTestUnorderedQueue); +UNIT_TEST_SUITE_REGISTRATION(TTestWeakQueue<TMPMCUnorderedRing>); diff --git a/library/cpp/threading/queue/ut/ya.make b/library/cpp/threading/queue/ut/ya.make index dda204155eb..8883d9bf693 100644 --- a/library/cpp/threading/queue/ut/ya.make +++ b/library/cpp/threading/queue/ut/ya.make @@ -1,16 +1,16 @@  UNITTEST_FOR(library/cpp/threading/queue) -  +  OWNER(agri) -  -ALLOCATOR(B)  -  -SRCS(  -    basic_ut.cpp  -    queue_ut.cpp  -    tune_ut.cpp  -    unordered_ut.cpp  -    ut_helpers.cpp  -    ut_helpers.h  -)  -  -END()  + +ALLOCATOR(B) + +SRCS( +    basic_ut.cpp +    queue_ut.cpp +    tune_ut.cpp +    unordered_ut.cpp +    ut_helpers.cpp +    ut_helpers.h +) + +END() diff --git a/library/cpp/threading/queue/ut_helpers.cpp b/library/cpp/threading/queue/ut_helpers.cpp index 342aa125a0b..aa3a8314411 100644 --- a/library/cpp/threading/queue/ut_helpers.cpp +++ b/library/cpp/threading/queue/ut_helpers.cpp @@ -1 +1 @@ -#include "ut_helpers.h"  +#include "ut_helpers.h" diff --git a/library/cpp/threading/queue/ut_helpers.h b/library/cpp/threading/queue/ut_helpers.h index c7203665930..2756b52601e 100644 --- a/library/cpp/threading/queue/ut_helpers.h +++ b/library/cpp/threading/queue/ut_helpers.h @@ -1,40 +1,40 @@ -#pragma once  -  -#include "mpsc_read_as_filled.h"  -#include "mpsc_htswap.h"  -#include "mpsc_vinfarr_obstructive.h"  -#include "mpsc_intrusive_unordered.h"  -#include "mpmc_unordered_ring.h"  -  -struct TBasicHTSwap: public NThreading::THTSwapQueue<> {  -};  -  -struct TBasicReadAsFilled: public NThreading::TReadAsFilledQueue<> {  -};  -  -struct TBasicObstructiveConsumer  +#pragma once + +#include "mpsc_read_as_filled.h" +#include "mpsc_htswap.h" +#include "mpsc_vinfarr_obstructive.h" +#include "mpsc_intrusive_unordered.h" +#include "mpmc_unordered_ring.h" + +struct TBasicHTSwap: public NThreading::THTSwapQueue<> { +}; + +struct TBasicReadAsFilled: public NThreading::TReadAsFilledQueue<> { +}; + +struct TBasicObstructiveConsumer     : public NThreading::TObstructiveConsumerQueue<> { -};  -  -struct TBasicMPSCIntrusiveUnordered  +}; + +struct TBasicMPSCIntrusiveUnordered     : public NThreading::TMPSCIntrusiveUnordered { -};  -  -struct TIntrusiveLink: public NThreading::TIntrusiveNode {  -};  -  -struct TMPMCUnorderedRing: public NThreading::TMPMCUnorderedRing {  -    TMPMCUnorderedRing()  -        : NThreading::TMPMCUnorderedRing(10000000)  -    {  -    }  -};  -  +}; + +struct TIntrusiveLink: public NThreading::TIntrusiveNode { +}; + +struct TMPMCUnorderedRing: public NThreading::TMPMCUnorderedRing { +    TMPMCUnorderedRing() +        : NThreading::TMPMCUnorderedRing(10000000) +    { +    } +}; +  #define REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TestTemplate)         \      UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicHTSwap>);       \      UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicReadAsFilled>); \ -    UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicObstructiveConsumer>)  -  +    UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicObstructiveConsumer>) +  #define REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TestTemplate)                 \ -    UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicMPSCIntrusiveUnordered>); \  -    UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TMPMCUnorderedRing>);  +    UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicMPSCIntrusiveUnordered>); \ +    UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TMPMCUnorderedRing>); diff --git a/library/cpp/threading/queue/ya.make b/library/cpp/threading/queue/ya.make index 3a11eb2d925..6570b38ce59 100644 --- a/library/cpp/threading/queue/ya.make +++ b/library/cpp/threading/queue/ya.make @@ -1,18 +1,18 @@ -LIBRARY()  -  -OWNER(agri)  -  -SRCS(  -    mpmc_unordered_ring.cpp  -    mpmc_unordered_ring.h  -    mpsc_htswap.cpp  -    mpsc_htswap.h  -    mpsc_intrusive_unordered.cpp  -    mpsc_intrusive_unordered.h  -    mpsc_read_as_filled.cpp  -    mpsc_read_as_filled.h  -    mpsc_vinfarr_obstructive.cpp  -    mpsc_vinfarr_obstructive.h  -)  -  -END()  +LIBRARY() + +OWNER(agri) + +SRCS( +    mpmc_unordered_ring.cpp +    mpmc_unordered_ring.h +    mpsc_htswap.cpp +    mpsc_htswap.h +    mpsc_intrusive_unordered.cpp +    mpsc_intrusive_unordered.h +    mpsc_read_as_filled.cpp +    mpsc_read_as_filled.h +    mpsc_vinfarr_obstructive.cpp +    mpsc_vinfarr_obstructive.h +) + +END() | 
