diff options
| author | agri <[email protected]> | 2022-02-10 16:48:12 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:48:12 +0300 | 
| commit | d3530b2692e400bd4d29bd4f07cafaee139164e7 (patch) | |
| tree | b7ae636a74490e649a2ed0fdd5361f1bec83b9f9 /library/cpp/threading/queue | |
| parent | 0f4c5d1e8c0672bf0a1f2f2d8acac5ba24772435 (diff) | |
Restoring authorship annotation for <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/queue')
19 files changed, 2215 insertions, 2215 deletions
| diff --git a/library/cpp/threading/queue/basic_ut.cpp b/library/cpp/threading/queue/basic_ut.cpp index 5f56f8583ec..2db5d6e8e83 100644 --- a/library/cpp/threading/queue/basic_ut.cpp +++ b/library/cpp/threading/queue/basic_ut.cpp @@ -1,92 +1,92 @@  #include <library/cpp/testing/unittest/registar.h> -#include <util/generic/vector.h> -#include <util/system/thread.h> - -#include "ut_helpers.h" - +#include <util/generic/vector.h>  +#include <util/system/thread.h>  +  +#include "ut_helpers.h"  +   template <typename TQueueType> -class TQueueTestsInSingleThread: public TTestBase { -private: +class TQueueTestsInSingleThread: public TTestBase {  +private:       using TSelf = TQueueTestsInSingleThread<TQueueType>; -    using TLink = TIntrusiveLink; - -    UNIT_TEST_SUITE_DEMANGLE(TSelf); -    UNIT_TEST(OnePushOnePop) -    UNIT_TEST(OnePushOnePop_Repeat1M) -    UNIT_TEST(Threads8_Repeat1M_Push1Pop1) -    UNIT_TEST_SUITE_END(); - -public: -    void OnePushOnePop() { +    using TLink = TIntrusiveLink;  +  +    UNIT_TEST_SUITE_DEMANGLE(TSelf);  +    UNIT_TEST(OnePushOnePop)  +    UNIT_TEST(OnePushOnePop_Repeat1M)  +    UNIT_TEST(Threads8_Repeat1M_Push1Pop1)  +    UNIT_TEST_SUITE_END();  +  +public:  +    void OnePushOnePop() {           TQueueType queue; - -        auto popped = queue.Pop(); -        UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); - -        TLink msg; -        queue.Push(&msg); -        popped = queue.Pop(); -        UNIT_ASSERT_VALUES_EQUAL(&msg, popped); - -        popped = queue.Pop(); -        UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); -    }; - -    void OnePushOnePop_Repeat1M() { +  +        auto popped = queue.Pop();  +        UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);  +  +        TLink msg;  +        queue.Push(&msg);  +        popped = queue.Pop();  +        UNIT_ASSERT_VALUES_EQUAL(&msg, popped);  +  +        popped = queue.Pop();  +        UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);  +    };  +  +    void OnePushOnePop_Repeat1M() {           TQueueType queue; -        TLink msg; - -        auto popped = queue.Pop(); -        UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); - -        for (int i = 0; i < 1000000; ++i) { -            queue.Push(&msg); -            popped = queue.Pop(); -            UNIT_ASSERT_VALUES_EQUAL(&msg, popped); - -            popped = queue.Pop(); -            UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); -        } -    } - -    template <size_t NUMBER_OF_THREADS> -    void RepeatPush1Pop1_InManyThreads() { +        TLink msg;  +  +        auto popped = queue.Pop();  +        UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);  +  +        for (int i = 0; i < 1000000; ++i) {  +            queue.Push(&msg);  +            popped = queue.Pop();  +            UNIT_ASSERT_VALUES_EQUAL(&msg, popped);  +  +            popped = queue.Pop();  +            UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);  +        }  +    }  +  +    template <size_t NUMBER_OF_THREADS>  +    void RepeatPush1Pop1_InManyThreads() {           class TCycleThread: public ISimpleThread { -        public: -            void* ThreadProc() override { +        public:  +            void* ThreadProc() override {                   TQueueType queue; -                TLink msg; -                auto popped = queue.Pop(); -                UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); - -                for (size_t i = 0; i < 1000000; ++i) { -                    queue.Push(&msg); -                    popped = queue.Pop(); -                    UNIT_ASSERT_VALUES_EQUAL(popped, &msg); - -                    popped = queue.Pop(); -                    UNIT_ASSERT_VALUES_EQUAL(popped, nullptr); -                } -                return nullptr; -            } -        }; - +                TLink msg;  +                auto popped = queue.Pop();  +                UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);  +  +                for (size_t i = 0; i < 1000000; ++i) {  +                    queue.Push(&msg);  +                    popped = queue.Pop();  +                    UNIT_ASSERT_VALUES_EQUAL(popped, &msg);  +  +                    popped = queue.Pop();  +                    UNIT_ASSERT_VALUES_EQUAL(popped, nullptr);  +                }  +                return nullptr;  +            }  +        };  +           TVector<TAutoPtr<TCycleThread>> cyclers; - -        for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) { -            cyclers.emplace_back(new TCycleThread); -            cyclers.back()->Start(); -        } - -        for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) { -            cyclers[i]->Join(); -        } -    } - -    void Threads8_Repeat1M_Push1Pop1() { -        RepeatPush1Pop1_InManyThreads<8>(); -    } -}; - -REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestsInSingleThread); -REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TQueueTestsInSingleThread) +  +        for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) {  +            cyclers.emplace_back(new TCycleThread);  +            cyclers.back()->Start();  +        }  +  +        for (size_t i = 0; i < NUMBER_OF_THREADS; ++i) {  +            cyclers[i]->Join();  +        }  +    }  +  +    void Threads8_Repeat1M_Push1Pop1() {  +        RepeatPush1Pop1_InManyThreads<8>();  +    }  +};  +  +REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestsInSingleThread);  +REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TQueueTestsInSingleThread)  diff --git a/library/cpp/threading/queue/mpmc_unordered_ring.cpp b/library/cpp/threading/queue/mpmc_unordered_ring.cpp index 160547f5946..df48182210f 100644 --- a/library/cpp/threading/queue/mpmc_unordered_ring.cpp +++ b/library/cpp/threading/queue/mpmc_unordered_ring.cpp @@ -1,74 +1,74 @@ -#include "mpmc_unordered_ring.h" - -namespace NThreading { -    TMPMCUnorderedRing::TMPMCUnorderedRing(size_t size) { -        Y_VERIFY(size > 0); -        RingSize = size; -        RingBuffer.Reset(new void*[size]); -        memset(&RingBuffer[0], 0, sizeof(void*) * size); -    } - -    bool TMPMCUnorderedRing::Push(void* msg, ui16 retryCount) noexcept { -        if (retryCount == 0) { -            StubbornPush(msg); -            return true; -        } -        for (ui16 itry = retryCount; itry-- > 0;) { -            if (WeakPush(msg)) { -                return true; -            } -        } -        return false; -    } - -    bool TMPMCUnorderedRing::WeakPush(void* msg) noexcept { -        auto pawl = AtomicIncrement(WritePawl); -        if (pawl - AtomicGet(ReadFront) >= RingSize) { -            // Queue is full -            AtomicDecrement(WritePawl); -            return false; -        } - -        auto writeSlot = AtomicGetAndIncrement(WriteFront); -        if (AtomicCas(&RingBuffer[writeSlot % RingSize], msg, nullptr)) { -            return true; -        } -        // slot is occupied for some reason, retry -        return false; -    } - -    void* TMPMCUnorderedRing::Pop() noexcept { -        ui64 readSlot; - -        for (ui16 itry = MAX_POP_TRIES; itry-- > 0;) { -            auto pawl = AtomicIncrement(ReadPawl); -            if (pawl > AtomicGet(WriteFront)) { -                // Queue is empty -                AtomicDecrement(ReadPawl); -                return nullptr; -            } - -            readSlot = AtomicGetAndIncrement(ReadFront); - -            auto msg = AtomicSwap(&RingBuffer[readSlot % RingSize], nullptr); -            if (msg != nullptr) { -                return msg; -            } -        } - -        /* got no message in the slot, let's try to rollback readfront */ -        AtomicCas(&ReadFront, readSlot - 1, readSlot); -        return nullptr; -    } - -    void* TMPMCUnorderedRing::UnsafeScanningPop(ui64* last) noexcept { -        for (; *last < RingSize;) { -            auto msg = AtomicSwap(&RingBuffer[*last], nullptr); -            ++*last; -            if (msg != nullptr) { -                return msg; -            } -        } -        return nullptr; -    } -} +#include "mpmc_unordered_ring.h"  +  +namespace NThreading {  +    TMPMCUnorderedRing::TMPMCUnorderedRing(size_t size) {  +        Y_VERIFY(size > 0);  +        RingSize = size;  +        RingBuffer.Reset(new void*[size]);  +        memset(&RingBuffer[0], 0, sizeof(void*) * size);  +    }  +  +    bool TMPMCUnorderedRing::Push(void* msg, ui16 retryCount) noexcept {  +        if (retryCount == 0) {  +            StubbornPush(msg);  +            return true;  +        }  +        for (ui16 itry = retryCount; itry-- > 0;) {  +            if (WeakPush(msg)) {  +                return true;  +            }  +        }  +        return false;  +    }  +  +    bool TMPMCUnorderedRing::WeakPush(void* msg) noexcept {  +        auto pawl = AtomicIncrement(WritePawl);  +        if (pawl - AtomicGet(ReadFront) >= RingSize) {  +            // Queue is full  +            AtomicDecrement(WritePawl);  +            return false;  +        }  +  +        auto writeSlot = AtomicGetAndIncrement(WriteFront);  +        if (AtomicCas(&RingBuffer[writeSlot % RingSize], msg, nullptr)) {  +            return true;  +        }  +        // slot is occupied for some reason, retry  +        return false;  +    }  +  +    void* TMPMCUnorderedRing::Pop() noexcept {  +        ui64 readSlot;  +  +        for (ui16 itry = MAX_POP_TRIES; itry-- > 0;) {  +            auto pawl = AtomicIncrement(ReadPawl);  +            if (pawl > AtomicGet(WriteFront)) {  +                // Queue is empty  +                AtomicDecrement(ReadPawl);  +                return nullptr;  +            }  +  +            readSlot = AtomicGetAndIncrement(ReadFront);  +  +            auto msg = AtomicSwap(&RingBuffer[readSlot % RingSize], nullptr);  +            if (msg != nullptr) {  +                return msg;  +            }  +        }  +  +        /* got no message in the slot, let's try to rollback readfront */  +        AtomicCas(&ReadFront, readSlot - 1, readSlot);  +        return nullptr;  +    }  +  +    void* TMPMCUnorderedRing::UnsafeScanningPop(ui64* last) noexcept {  +        for (; *last < RingSize;) {  +            auto msg = AtomicSwap(&RingBuffer[*last], nullptr);  +            ++*last;  +            if (msg != nullptr) {  +                return msg;  +            }  +        }  +        return nullptr;  +    }  +}  diff --git a/library/cpp/threading/queue/mpmc_unordered_ring.h b/library/cpp/threading/queue/mpmc_unordered_ring.h index 5042f7528e8..59758d2c352 100644 --- a/library/cpp/threading/queue/mpmc_unordered_ring.h +++ b/library/cpp/threading/queue/mpmc_unordered_ring.h @@ -1,42 +1,42 @@ -#pragma once - -/* -  It's not a general purpose queue. -  No order guarantee, but it mostly ordered. -  Items may stuck in almost empty queue. -  Use UnsafeScanningPop to pop all stuck items. -  Almost wait-free for producers and consumers. - */ - -#include <util/system/atomic.h> -#include <util/generic/ptr.h> - -namespace NThreading { -    struct TMPMCUnorderedRing { -    public: -        static constexpr ui16 MAX_PUSH_TRIES = 4; -        static constexpr ui16 MAX_POP_TRIES = 4; - -        TMPMCUnorderedRing(size_t size); - -        bool Push(void* msg, ui16 retryCount = MAX_PUSH_TRIES) noexcept; -        void StubbornPush(void* msg) { -            while (!WeakPush(msg)) { -            } -        } - -        void* Pop() noexcept; - -        void* UnsafeScanningPop(ui64* last) noexcept; - -    private: -        bool WeakPush(void* msg) noexcept; - -        size_t RingSize; -        TArrayPtr<void*> RingBuffer; -        ui64 WritePawl = 0; -        ui64 WriteFront = 0; -        ui64 ReadPawl = 0; -        ui64 ReadFront = 0; -    }; -} +#pragma once  +  +/*  +  It's not a general purpose queue.  +  No order guarantee, but it mostly ordered.  +  Items may stuck in almost empty queue.  +  Use UnsafeScanningPop to pop all stuck items.  +  Almost wait-free for producers and consumers.  + */  +  +#include <util/system/atomic.h>  +#include <util/generic/ptr.h>  +  +namespace NThreading {  +    struct TMPMCUnorderedRing {  +    public:  +        static constexpr ui16 MAX_PUSH_TRIES = 4;  +        static constexpr ui16 MAX_POP_TRIES = 4;  +  +        TMPMCUnorderedRing(size_t size);  +  +        bool Push(void* msg, ui16 retryCount = MAX_PUSH_TRIES) noexcept;  +        void StubbornPush(void* msg) {  +            while (!WeakPush(msg)) {  +            }  +        }  +  +        void* Pop() noexcept;  +  +        void* UnsafeScanningPop(ui64* last) noexcept;  +  +    private:  +        bool WeakPush(void* msg) noexcept;  +  +        size_t RingSize;  +        TArrayPtr<void*> RingBuffer;  +        ui64 WritePawl = 0;  +        ui64 WriteFront = 0;  +        ui64 ReadPawl = 0;  +        ui64 ReadFront = 0;  +    };  +}  diff --git a/library/cpp/threading/queue/mpsc_htswap.cpp b/library/cpp/threading/queue/mpsc_htswap.cpp index 610c8f67f13..d8ab0d4f488 100644 --- a/library/cpp/threading/queue/mpsc_htswap.cpp +++ b/library/cpp/threading/queue/mpsc_htswap.cpp @@ -1 +1 @@ -#include "mpsc_htswap.h" +#include "mpsc_htswap.h"  diff --git a/library/cpp/threading/queue/mpsc_htswap.h b/library/cpp/threading/queue/mpsc_htswap.h index c42caa7ac02..2d0bfd1146a 100644 --- a/library/cpp/threading/queue/mpsc_htswap.h +++ b/library/cpp/threading/queue/mpsc_htswap.h @@ -1,132 +1,132 @@ -#pragma once - -/* -  http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue - -  Simple semi-wait-free queue. Many producers - one consumer. -  Tracking of allocated memory is not required. -  No CAS. Only atomic swap (exchange) operations. - -  WARNING: a sleeping producer can stop progress for consumer. - -  WARNING: there is no wait¬ify mechanic for consumer, -  consumer receives nullptr if queue was empty. - -  WARNING: the algorithm itself is lock-free -  but producers and consumer could be blocked by memory allocator - -  Reference design: rtmapreduce/libs/threading/lfqueue.h - */ - -#include <util/generic/noncopyable.h> -#include <util/system/types.h> -#include <util/system/atomic.h> - -#include "tune.h" - -namespace NThreading { -    namespace NHTSwapPrivate { -        template <typename T, typename TTuneup> -        struct TNode +#pragma once  +  +/*  +  http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue  +  +  Simple semi-wait-free queue. Many producers - one consumer.  +  Tracking of allocated memory is not required.  +  No CAS. Only atomic swap (exchange) operations.  +  +  WARNING: a sleeping producer can stop progress for consumer.  +  +  WARNING: there is no wait¬ify mechanic for consumer,  +  consumer receives nullptr if queue was empty.  +  +  WARNING: the algorithm itself is lock-free  +  but producers and consumer could be blocked by memory allocator  +  +  Reference design: rtmapreduce/libs/threading/lfqueue.h  + */  +  +#include <util/generic/noncopyable.h>  +#include <util/system/types.h>  +#include <util/system/atomic.h>  +  +#include "tune.h"  +  +namespace NThreading {  +    namespace NHTSwapPrivate {  +        template <typename T, typename TTuneup>  +        struct TNode              : public TTuneup::TNodeBase,                public TTuneup::template TNodeLayout<TNode<T, TTuneup>, T> { -            TNode(const T& item) { -                this->Next = nullptr; -                this->Item = item; -            } - -            TNode(T&& item) { -                this->Next = nullptr; -                this->Item = std::move(item); -            } -        }; - -        struct TDefaultTuneup { -            struct TNodeBase: private TNonCopyable { -            }; - -            template <typename TNode, typename T> -            struct TNodeLayout { -                TNode* Next; -                T Item; -            }; - -            template <typename TNode> -            struct TQueueLayout { -                TNode* Head; -                TNode* Tail; -            }; -        }; - -        template <typename T, typename TTuneup> -        class THTSwapQueueImpl +            TNode(const T& item) {  +                this->Next = nullptr;  +                this->Item = item;  +            }  +  +            TNode(T&& item) {  +                this->Next = nullptr;  +                this->Item = std::move(item);  +            }  +        };  +  +        struct TDefaultTuneup {  +            struct TNodeBase: private TNonCopyable {  +            };  +  +            template <typename TNode, typename T>  +            struct TNodeLayout {  +                TNode* Next;  +                T Item;  +            };  +  +            template <typename TNode>  +            struct TQueueLayout {  +                TNode* Head;  +                TNode* Tail;  +            };  +        };  +  +        template <typename T, typename TTuneup>  +        class THTSwapQueueImpl              : protected  TTuneup::template TQueueLayout<TNode<T, TTuneup>> { -        protected: -            using TTunedNode = TNode<T, TTuneup>; - -        public: -            using TItem = T; - -            THTSwapQueueImpl() { -                this->Head = new TTunedNode(T()); -                this->Tail = this->Head; -            } - -            ~THTSwapQueueImpl() { -                TTunedNode* node = this->Head; -                while (node != nullptr) { -                    TTunedNode* next = node->Next; -                    delete node; -                    node = next; -                } -            } - -            template <typename TT> -            void Push(TT&& item) { -                Enqueue(new TTunedNode(std::forward<TT>(item))); -            } - -            T Peek() { -                TTunedNode* next = AtomicGet(this->Head->Next); -                if (next == nullptr) { -                    return T(); -                } -                return next->Item; -            } - -            void Enqueue(TTunedNode* node) { -                // our goal is to avoid expensive CAS here, -                // but now consumer will be blocked until new tail linked. -                // fortunately 'window of inconsistency' is extremely small. -                TTunedNode* prev = AtomicSwap(&this->Tail, node); -                AtomicSet(prev->Next, node); -            } - -            T Pop() { -                TTunedNode* next = AtomicGet(this->Head->Next); -                if (next == nullptr) { -                    return nullptr; -                } -                auto item = std::move(next->Item); -                std::swap(this->Head, next); // no need atomic here -                delete next; -                return item; -            } - -            bool IsEmpty() const { -                TTunedNode* next = AtomicGet(this->Head->Next); -                return (next == nullptr); -            } -        }; -    } - -    DeclareTuneTypeParam(THTSwapNodeBase, TNodeBase); -    DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout); -    DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout); - +        protected:  +            using TTunedNode = TNode<T, TTuneup>;  +  +        public:  +            using TItem = T;  +  +            THTSwapQueueImpl() {  +                this->Head = new TTunedNode(T());  +                this->Tail = this->Head;  +            }  +  +            ~THTSwapQueueImpl() {  +                TTunedNode* node = this->Head;  +                while (node != nullptr) {  +                    TTunedNode* next = node->Next;  +                    delete node;  +                    node = next;  +                }  +            }  +  +            template <typename TT>  +            void Push(TT&& item) {  +                Enqueue(new TTunedNode(std::forward<TT>(item)));  +            }  +  +            T Peek() {  +                TTunedNode* next = AtomicGet(this->Head->Next);  +                if (next == nullptr) {  +                    return T();  +                }  +                return next->Item;  +            }  +  +            void Enqueue(TTunedNode* node) {  +                // our goal is to avoid expensive CAS here,  +                // but now consumer will be blocked until new tail linked.  +                // fortunately 'window of inconsistency' is extremely small.  +                TTunedNode* prev = AtomicSwap(&this->Tail, node);  +                AtomicSet(prev->Next, node);  +            }  +  +            T Pop() {  +                TTunedNode* next = AtomicGet(this->Head->Next);  +                if (next == nullptr) {  +                    return nullptr;  +                }  +                auto item = std::move(next->Item);  +                std::swap(this->Head, next); // no need atomic here  +                delete next;  +                return item;  +            }  +  +            bool IsEmpty() const {  +                TTunedNode* next = AtomicGet(this->Head->Next);  +                return (next == nullptr);  +            }  +        };  +    }  +  +    DeclareTuneTypeParam(THTSwapNodeBase, TNodeBase);  +    DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout);  +    DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout);  +       template <typename T = void*, typename... TParams> -    class THTSwapQueue +    class THTSwapQueue          : public NHTSwapPrivate::THTSwapQueueImpl<T,                                                    TTune<NHTSwapPrivate::TDefaultTuneup, TParams...>> { -    }; -} +    };  +}  diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp index 3bb1a04f7e9..a6a2fcef398 100644 --- a/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp +++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp @@ -1,79 +1,79 @@ -#include "mpsc_intrusive_unordered.h" -#include <util/system/atomic.h> - -namespace NThreading { -    void TMPSCIntrusiveUnordered::Push(TIntrusiveNode* node) noexcept { -        auto head = AtomicGet(HeadForCaS); -        for (ui32 i = NUMBER_OF_TRIES_FOR_CAS; i-- > 0;) { -            // no ABA here, because Next is exactly head -            // it does not matter how many travels head was made/ -            node->Next = head; -            auto prev = AtomicGetAndCas(&HeadForCaS, node, head); -            if (head == prev) { -                return; -            } -            head = prev; -        } -        // boring of trying to do cas, let's just swap - -        // no need for atomic here, because the next is atomic swap -        node->Next = 0; - -        head = AtomicSwap(&HeadForSwap, node); -        if (head != nullptr) { -            AtomicSet(node->Next, head); -        } else { -            // consumer must know if no other thread may access the memory, -            // setting Next to node is a way to notify consumer -            AtomicSet(node->Next, node); -        } -    } - -    TIntrusiveNode* TMPSCIntrusiveUnordered::PopMany() noexcept { -        if (NotReadyChain == nullptr) { -            auto head = AtomicSwap(&HeadForSwap, nullptr); -            NotReadyChain = head; -        } - -        if (NotReadyChain != nullptr) { -            auto next = AtomicGet(NotReadyChain->Next); -            if (next != nullptr) { -                auto ready = NotReadyChain; -                TIntrusiveNode* cut; -                do { -                    cut = NotReadyChain; -                    NotReadyChain = next; -                    next = AtomicGet(NotReadyChain->Next); -                    if (next == NotReadyChain) { -                        cut = NotReadyChain; -                        NotReadyChain = nullptr; -                        break; -                    } -                } while (next != nullptr); -                cut->Next = nullptr; -                return ready; -            } -        } - -        if (AtomicGet(HeadForCaS) != nullptr) { -            return AtomicSwap(&HeadForCaS, nullptr); -        } -        return nullptr; -    } - -    TIntrusiveNode* TMPSCIntrusiveUnordered::Pop() noexcept { -        if (PopOneQueue != nullptr) { -            auto head = PopOneQueue; -            PopOneQueue = PopOneQueue->Next; -            return head; -        } - -        PopOneQueue = PopMany(); -        if (PopOneQueue != nullptr) { -            auto head = PopOneQueue; -            PopOneQueue = PopOneQueue->Next; -            return head; -        } -        return nullptr; -    } -} +#include "mpsc_intrusive_unordered.h"  +#include <util/system/atomic.h>  +  +namespace NThreading {  +    void TMPSCIntrusiveUnordered::Push(TIntrusiveNode* node) noexcept {  +        auto head = AtomicGet(HeadForCaS);  +        for (ui32 i = NUMBER_OF_TRIES_FOR_CAS; i-- > 0;) {  +            // no ABA here, because Next is exactly head  +            // it does not matter how many travels head was made/  +            node->Next = head;  +            auto prev = AtomicGetAndCas(&HeadForCaS, node, head);  +            if (head == prev) {  +                return;  +            }  +            head = prev;  +        }  +        // boring of trying to do cas, let's just swap  +  +        // no need for atomic here, because the next is atomic swap  +        node->Next = 0;  +  +        head = AtomicSwap(&HeadForSwap, node);  +        if (head != nullptr) {  +            AtomicSet(node->Next, head);  +        } else {  +            // consumer must know if no other thread may access the memory,  +            // setting Next to node is a way to notify consumer  +            AtomicSet(node->Next, node);  +        }  +    }  +  +    TIntrusiveNode* TMPSCIntrusiveUnordered::PopMany() noexcept {  +        if (NotReadyChain == nullptr) {  +            auto head = AtomicSwap(&HeadForSwap, nullptr);  +            NotReadyChain = head;  +        }  +  +        if (NotReadyChain != nullptr) {  +            auto next = AtomicGet(NotReadyChain->Next);  +            if (next != nullptr) {  +                auto ready = NotReadyChain;  +                TIntrusiveNode* cut;  +                do {  +                    cut = NotReadyChain;  +                    NotReadyChain = next;  +                    next = AtomicGet(NotReadyChain->Next);  +                    if (next == NotReadyChain) {  +                        cut = NotReadyChain;  +                        NotReadyChain = nullptr;  +                        break;  +                    }  +                } while (next != nullptr);  +                cut->Next = nullptr;  +                return ready;  +            }  +        }  +  +        if (AtomicGet(HeadForCaS) != nullptr) {  +            return AtomicSwap(&HeadForCaS, nullptr);  +        }  +        return nullptr;  +    }  +  +    TIntrusiveNode* TMPSCIntrusiveUnordered::Pop() noexcept {  +        if (PopOneQueue != nullptr) {  +            auto head = PopOneQueue;  +            PopOneQueue = PopOneQueue->Next;  +            return head;  +        }  +  +        PopOneQueue = PopMany();  +        if (PopOneQueue != nullptr) {  +            auto head = PopOneQueue;  +            PopOneQueue = PopOneQueue->Next;  +            return head;  +        }  +        return nullptr;  +    }  +}  diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.h b/library/cpp/threading/queue/mpsc_intrusive_unordered.h index 6ac7537ae9a..c07cf761f67 100644 --- a/library/cpp/threading/queue/mpsc_intrusive_unordered.h +++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.h @@ -1,35 +1,35 @@ -#pragma once - -/* -  Simple almost-wait-free unordered queue for low contention operations. - -  It's wait-free for producers. -  Hanging producer can hide some items from consumer. - */ - -#include <util/system/types.h> - -namespace NThreading { -    struct TIntrusiveNode { -        TIntrusiveNode* Next; -    }; - -    class TMPSCIntrusiveUnordered { -    public: -        static constexpr ui32 NUMBER_OF_TRIES_FOR_CAS = 3; - -        void Push(TIntrusiveNode* node) noexcept; -        TIntrusiveNode* PopMany() noexcept; -        TIntrusiveNode* Pop() noexcept; - -        void Push(void* node) noexcept { -            Push(reinterpret_cast<TIntrusiveNode*>(node)); -        } - -    private: -        TIntrusiveNode* HeadForCaS = nullptr; -        TIntrusiveNode* HeadForSwap = nullptr; -        TIntrusiveNode* NotReadyChain = nullptr; -        TIntrusiveNode* PopOneQueue = nullptr; -    }; -} +#pragma once  +  +/*  +  Simple almost-wait-free unordered queue for low contention operations.  +  +  It's wait-free for producers.  +  Hanging producer can hide some items from consumer.  + */  +  +#include <util/system/types.h>  +  +namespace NThreading {  +    struct TIntrusiveNode {  +        TIntrusiveNode* Next;  +    };  +  +    class TMPSCIntrusiveUnordered {  +    public:  +        static constexpr ui32 NUMBER_OF_TRIES_FOR_CAS = 3;  +  +        void Push(TIntrusiveNode* node) noexcept;  +        TIntrusiveNode* PopMany() noexcept;  +        TIntrusiveNode* Pop() noexcept;  +  +        void Push(void* node) noexcept {  +            Push(reinterpret_cast<TIntrusiveNode*>(node));  +        }  + +    private:  +        TIntrusiveNode* HeadForCaS = nullptr;  +        TIntrusiveNode* HeadForSwap = nullptr;  +        TIntrusiveNode* NotReadyChain = nullptr;  +        TIntrusiveNode* PopOneQueue = nullptr;  +    };  +}  diff --git a/library/cpp/threading/queue/mpsc_read_as_filled.cpp b/library/cpp/threading/queue/mpsc_read_as_filled.cpp index 8b4664a6f32..3b89fb1df62 100644 --- a/library/cpp/threading/queue/mpsc_read_as_filled.cpp +++ b/library/cpp/threading/queue/mpsc_read_as_filled.cpp @@ -1 +1 @@ -#include "mpsc_read_as_filled.h" +#include "mpsc_read_as_filled.h"  diff --git a/library/cpp/threading/queue/mpsc_read_as_filled.h b/library/cpp/threading/queue/mpsc_read_as_filled.h index be33ba5a584..4dfdb1fbbfa 100644 --- a/library/cpp/threading/queue/mpsc_read_as_filled.h +++ b/library/cpp/threading/queue/mpsc_read_as_filled.h @@ -1,611 +1,611 @@ -#pragma once - -/* -  Completely wait-free queue, multiple producers - one consumer. Strict order. -  The queue algorithm is using concept of virtual infinite array. - +#pragma once  +  +/*  +  Completely wait-free queue, multiple producers - one consumer. Strict order.  +  The queue algorithm is using concept of virtual infinite array.  +     A producer takes a number from a counter and atomically increments the counter. -  The number taken is a number of a slot for the producer to put a new message -  into infinite array. - -  Then producer constructs a virtual infinite array by bidirectional linked list -  of blocks. Each block contains several slots. - +  The number taken is a number of a slot for the producer to put a new message  +  into infinite array.  +  +  Then producer constructs a virtual infinite array by bidirectional linked list  +  of blocks. Each block contains several slots.  +     There is a hint pointer which optimistically points to the last block -  of the list and never goes backward. - -  Consumer exploits the property of the hint pointer always going forward -  to free old blocks eventually. Consumer periodically read the hint pointer -  and the counter and thus deduce producers which potentially holds the pointer -  to a block. Consumer can free the block if all that producers filled their -  slots and left the queue. - -  No producer can stop the progress for other producers. - -  Consumer can't stop the progress for producers. -  Consumer can skip not-yet-filled slots and read them later. -  Thus no producer can stop the progress for consumer. +  of the list and never goes backward.  +  +  Consumer exploits the property of the hint pointer always going forward  +  to free old blocks eventually. Consumer periodically read the hint pointer  +  and the counter and thus deduce producers which potentially holds the pointer  +  to a block. Consumer can free the block if all that producers filled their  +  slots and left the queue.  +  +  No producer can stop the progress for other producers.  +  +  Consumer can't stop the progress for producers.  +  Consumer can skip not-yet-filled slots and read them later.  +  Thus no producer can stop the progress for consumer.     The algorithm is virtually strictly ordered because it skips slots only -  if it is really does not matter in which order the slots were produced and -  consumed. - -  WARNING: there is no wait¬ify mechanic for consumer, -  consumer receives nullptr if queue was empty. - -  WARNING: though the algorithm itself is completely wait-free -  but producers and consumer could be blocked by memory allocator - +  if it is really does not matter in which order the slots were produced and  +  consumed.  +  +  WARNING: there is no wait¬ify mechanic for consumer,  +  consumer receives nullptr if queue was empty.  +  +  WARNING: though the algorithm itself is completely wait-free  +  but producers and consumer could be blocked by memory allocator  +     WARNING: copy constructors of the queue are not thread-safe - */ - -#include <util/generic/deque.h> -#include <util/generic/ptr.h> -#include <util/system/atomic.h> -#include <util/system/spinlock.h> - -#include "tune.h" - -namespace NThreading { -    namespace NReadAsFilledPrivate { -        typedef void* TMsgLink; - -        static constexpr ui32 DEFAULT_BUNCH_SIZE = 251; - -        struct TEmpty { -        }; - -        struct TEmptyAux { -            TEmptyAux Retrieve() const { -                return TEmptyAux(); -            } - -            void Store(TEmptyAux&) { -            } - -            static constexpr TEmptyAux Zero() { -                return TEmptyAux(); -            } -        }; - -        template <typename TAux> -        struct TSlot { -            TMsgLink volatile Msg; -            TAux AuxiliaryData; - -            inline void Store(TAux& aux) { -                AuxiliaryData.Store(aux); -            } - -            inline TAux Retrieve() const { -                return AuxiliaryData.Retrieve(); -            } - -            static TSlot<TAux> NullElem() { -                return {nullptr, TAux::Zero()}; -            } - -            static TSlot<TAux> Pair(TMsgLink msg, TAux aux) { -                return {msg, std::move(aux)}; -            } -        }; - -        template <> -        struct TSlot<TEmptyAux> { -            TMsgLink volatile Msg; - -            inline void Store(TEmptyAux&) { -            } - -            inline TEmptyAux Retrieve() const { -                return TEmptyAux(); -            } - -            static TSlot<TEmptyAux> NullElem() { -                return {nullptr}; -            } - -            static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) { -                return {msg}; -            } -        }; - -        enum TPushResult { -            PUSH_RESULT_OK, -            PUSH_RESULT_BACKWARD, -            PUSH_RESULT_FORWARD, -        }; - -        template <ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE, -                  typename TBase = TEmpty, -                  typename TAux = TEmptyAux> -        struct TMsgBunch: public TBase { -            static constexpr size_t RELEASE_SIZE = BUNCH_SIZE * 2; - -            ui64 FirstSlot; - -            TSlot<TAux> LinkArray[BUNCH_SIZE]; - -            TMsgBunch* volatile NextBunch; -            TMsgBunch* volatile BackLink; - -            ui64 volatile Token; -            TMsgBunch* volatile NextToken; - -            /* this push can return PUSH_RESULT_BLOCKED */ + */  +  +#include <util/generic/deque.h>  +#include <util/generic/ptr.h>  +#include <util/system/atomic.h>  +#include <util/system/spinlock.h>  +  +#include "tune.h"  +  +namespace NThreading {  +    namespace NReadAsFilledPrivate {  +        typedef void* TMsgLink;  +  +        static constexpr ui32 DEFAULT_BUNCH_SIZE = 251;  +  +        struct TEmpty {  +        };  +  +        struct TEmptyAux {  +            TEmptyAux Retrieve() const {  +                return TEmptyAux();  +            }  +  +            void Store(TEmptyAux&) {  +            }  +  +            static constexpr TEmptyAux Zero() {  +                return TEmptyAux();  +            }  +        };  +  +        template <typename TAux>  +        struct TSlot {  +            TMsgLink volatile Msg;  +            TAux AuxiliaryData;  +  +            inline void Store(TAux& aux) {  +                AuxiliaryData.Store(aux);  +            }  +  +            inline TAux Retrieve() const {  +                return AuxiliaryData.Retrieve();  +            }  +  +            static TSlot<TAux> NullElem() {  +                return {nullptr, TAux::Zero()};  +            }  +  +            static TSlot<TAux> Pair(TMsgLink msg, TAux aux) {  +                return {msg, std::move(aux)};  +            }  +        };  +  +        template <>  +        struct TSlot<TEmptyAux> {  +            TMsgLink volatile Msg;  +  +            inline void Store(TEmptyAux&) {  +            }  +  +            inline TEmptyAux Retrieve() const {  +                return TEmptyAux();  +            }  +  +            static TSlot<TEmptyAux> NullElem() {  +                return {nullptr};  +            }  +  +            static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) {  +                return {msg};  +            }  +        };  +  +        enum TPushResult {  +            PUSH_RESULT_OK,  +            PUSH_RESULT_BACKWARD,  +            PUSH_RESULT_FORWARD,  +        };  +  +        template <ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE,  +                  typename TBase = TEmpty,  +                  typename TAux = TEmptyAux>  +        struct TMsgBunch: public TBase {  +            static constexpr size_t RELEASE_SIZE = BUNCH_SIZE * 2;  +  +            ui64 FirstSlot;  +  +            TSlot<TAux> LinkArray[BUNCH_SIZE];  +  +            TMsgBunch* volatile NextBunch;  +            TMsgBunch* volatile BackLink;  +  +            ui64 volatile Token;  +            TMsgBunch* volatile NextToken;  +  +            /* this push can return PUSH_RESULT_BLOCKED */               inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) { -                if (Y_UNLIKELY(slot < FirstSlot)) { -                    return PUSH_RESULT_BACKWARD; -                } - -                if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) { -                    return PUSH_RESULT_FORWARD; -                } - -                LinkArray[slot - FirstSlot].Store(auxiliary); - -                AtomicSet(LinkArray[slot - FirstSlot].Msg, msg); -                return PUSH_RESULT_OK; -            } - -            inline bool IsSlotHere(ui64 slot) { -                return slot < FirstSlot + BUNCH_SIZE; -            } - -            inline TMsgLink GetSlot(ui64 slot) const { -                return AtomicGet(LinkArray[slot - FirstSlot].Msg); -            } - -            inline TSlot<TAux> GetSlotAux(ui64 slot) const { -                auto msg = GetSlot(slot); -                auto aux = LinkArray[slot - FirstSlot].Retrieve(); -                return TSlot<TAux>::Pair(msg, aux); -            } - -            inline TMsgBunch* GetNextBunch() const { -                return AtomicGet(NextBunch); -            } - -            inline bool SetNextBunch(TMsgBunch* ptr) { -                return AtomicCas(&NextBunch, ptr, nullptr); -            } - -            inline TMsgBunch* GetBackLink() const { -                return AtomicGet(BackLink); -            } - -            inline TMsgBunch* GetToken(ui64 slot) { -                return reinterpret_cast<TMsgBunch*>( -                    LinkArray[slot - FirstSlot].Msg); -            } - -            inline void IncrementToken() { -                AtomicIncrement(Token); -            } - -            // the object could be destroyed after this method -            inline void DecrementToken() { -                if (Y_UNLIKELY(AtomicDecrement(Token) == RELEASE_SIZE)) { -                    Release(this); -                    AtomicGet(NextToken)->DecrementToken(); -                    // this could be invalid here -                } -            } - -            // the object could be destroyed after this method -            inline void SetNextToken(TMsgBunch* next) { -                AtomicSet(NextToken, next); +                if (Y_UNLIKELY(slot < FirstSlot)) {  +                    return PUSH_RESULT_BACKWARD;  +                }  +  +                if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) {  +                    return PUSH_RESULT_FORWARD;  +                }  +  +                LinkArray[slot - FirstSlot].Store(auxiliary);  +  +                AtomicSet(LinkArray[slot - FirstSlot].Msg, msg);  +                return PUSH_RESULT_OK;  +            }  +  +            inline bool IsSlotHere(ui64 slot) {  +                return slot < FirstSlot + BUNCH_SIZE;  +            }  +  +            inline TMsgLink GetSlot(ui64 slot) const {  +                return AtomicGet(LinkArray[slot - FirstSlot].Msg);  +            }  +  +            inline TSlot<TAux> GetSlotAux(ui64 slot) const {  +                auto msg = GetSlot(slot);  +                auto aux = LinkArray[slot - FirstSlot].Retrieve();  +                return TSlot<TAux>::Pair(msg, aux);  +            }  +  +            inline TMsgBunch* GetNextBunch() const {  +                return AtomicGet(NextBunch);  +            }  +  +            inline bool SetNextBunch(TMsgBunch* ptr) {  +                return AtomicCas(&NextBunch, ptr, nullptr);  +            }  +  +            inline TMsgBunch* GetBackLink() const {  +                return AtomicGet(BackLink);  +            }  +  +            inline TMsgBunch* GetToken(ui64 slot) {  +                return reinterpret_cast<TMsgBunch*>(  +                    LinkArray[slot - FirstSlot].Msg);  +            }  +  +            inline void IncrementToken() {  +                AtomicIncrement(Token);  +            }  +  +            // the object could be destroyed after this method  +            inline void DecrementToken() {  +                if (Y_UNLIKELY(AtomicDecrement(Token) == RELEASE_SIZE)) {  +                    Release(this);  +                    AtomicGet(NextToken)->DecrementToken();  +                    // this could be invalid here  +                }  +            }  +  +            // the object could be destroyed after this method  +            inline void SetNextToken(TMsgBunch* next) {  +                AtomicSet(NextToken, next);                   if (Y_UNLIKELY(AtomicAdd(Token, RELEASE_SIZE) == RELEASE_SIZE)) { -                    Release(this); -                    next->DecrementToken(); -                } -                // this could be invalid here -            } - -            TMsgBunch(ui64 start, TMsgBunch* backLink) { -                AtomicSet(FirstSlot, start); -                memset(&LinkArray, 0, sizeof(LinkArray)); -                AtomicSet(NextBunch, nullptr); -                AtomicSet(BackLink, backLink); - -                AtomicSet(Token, 1); -                AtomicSet(NextToken, nullptr); -            } - -            static void Release(TMsgBunch* block) { -                auto backLink = AtomicGet(block->BackLink); -                if (backLink == nullptr) { -                    return; -                } -                AtomicSet(block->BackLink, nullptr); - -                do { -                    auto bbackLink = backLink->BackLink; -                    delete backLink; -                    backLink = bbackLink; -                } while (backLink != nullptr); -            } - -            void Destroy() { -                for (auto tail = BackLink; tail != nullptr;) { -                    auto next = tail->BackLink; -                    delete tail; -                    tail = next; -                } - -                for (auto next = this; next != nullptr;) { -                    auto nnext = next->NextBunch; -                    delete next; -                    next = nnext; -                } -            } -        }; - -        template <ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE, -                  typename TBunchBase = NReadAsFilledPrivate::TEmpty, -                  typename TAux = TEmptyAux> -        class TWriteBucket { -        public: -            using TUsingAux = TAux; // for TReadBucket binding -            using TBunch = TMsgBunch<BUNCH_SIZE, TBunchBase, TAux>; - -            TWriteBucket(TBunch* bunch = new TBunch(0, nullptr)) { -                AtomicSet(LastBunch, bunch); -                AtomicSet(SlotCounter, 0); -            } - -            TWriteBucket(TWriteBucket&& move) -                : LastBunch(move.LastBunch) -                , SlotCounter(move.SlotCounter) -            { -                move.LastBunch = nullptr; -            } - -            ~TWriteBucket() { -                if (LastBunch != nullptr) { -                    LastBunch->Destroy(); -                } -            } - -            inline void Push(TMsgLink msg, TAux aux) { -                ui64 pushSlot = AtomicGetAndIncrement(SlotCounter); -                TBunch* hintBunch = GetLastBunch(); - -                for (;;) { -                    auto hint = hintBunch->Push(msg, pushSlot, aux); -                    if (Y_LIKELY(hint == PUSH_RESULT_OK)) { -                        return; -                    } -                    HandleHint(hintBunch, hint); -                } -            } - -        protected: -            template <typename, template <typename, typename...> class> -            friend class TReadBucket; - -            TBunch* volatile LastBunch; // Hint -            volatile ui64 SlotCounter; - -            inline TBunch* GetLastBunch() const { -                return AtomicGet(LastBunch); -            } - -            void HandleHint(TBunch*& hintBunch, TPushResult hint) { -                if (Y_UNLIKELY(hint == PUSH_RESULT_BACKWARD)) { -                    hintBunch = hintBunch->GetBackLink(); -                    return; -                } - -                // PUSH_RESULT_FORWARD -                auto nextBunch = hintBunch->GetNextBunch(); - -                if (nextBunch == nullptr) { -                    auto first = hintBunch->FirstSlot + BUNCH_SIZE; -                    nextBunch = new TBunch(first, hintBunch); -                    if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) { -                        delete nextBunch; -                        nextBunch = hintBunch->GetNextBunch(); -                    } -                } - -                // hintBunch could not be freed here so it cannot be reused -                // it's alright if this CAS was not succeeded, -                // it means that other thread did that recently -                AtomicCas(&LastBunch, nextBunch, hintBunch); - -                hintBunch = nextBunch; -            } -        }; - +                    Release(this);  +                    next->DecrementToken();  +                }  +                // this could be invalid here  +            }  +  +            TMsgBunch(ui64 start, TMsgBunch* backLink) {  +                AtomicSet(FirstSlot, start);  +                memset(&LinkArray, 0, sizeof(LinkArray));  +                AtomicSet(NextBunch, nullptr);  +                AtomicSet(BackLink, backLink);  +  +                AtomicSet(Token, 1);  +                AtomicSet(NextToken, nullptr);  +            }  +  +            static void Release(TMsgBunch* block) {  +                auto backLink = AtomicGet(block->BackLink);  +                if (backLink == nullptr) {  +                    return;  +                }  +                AtomicSet(block->BackLink, nullptr);  +  +                do {  +                    auto bbackLink = backLink->BackLink;  +                    delete backLink;  +                    backLink = bbackLink;  +                } while (backLink != nullptr);  +            }  +  +            void Destroy() {  +                for (auto tail = BackLink; tail != nullptr;) {  +                    auto next = tail->BackLink;  +                    delete tail;  +                    tail = next;  +                }  +  +                for (auto next = this; next != nullptr;) {  +                    auto nnext = next->NextBunch;  +                    delete next;  +                    next = nnext;  +                }  +            }  +        };  +  +        template <ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE,  +                  typename TBunchBase = NReadAsFilledPrivate::TEmpty,  +                  typename TAux = TEmptyAux>  +        class TWriteBucket {  +        public:  +            using TUsingAux = TAux; // for TReadBucket binding  +            using TBunch = TMsgBunch<BUNCH_SIZE, TBunchBase, TAux>;  +  +            TWriteBucket(TBunch* bunch = new TBunch(0, nullptr)) {  +                AtomicSet(LastBunch, bunch);  +                AtomicSet(SlotCounter, 0);  +            }  +  +            TWriteBucket(TWriteBucket&& move)  +                : LastBunch(move.LastBunch)  +                , SlotCounter(move.SlotCounter)  +            {  +                move.LastBunch = nullptr;  +            }  +  +            ~TWriteBucket() {  +                if (LastBunch != nullptr) {  +                    LastBunch->Destroy();  +                }  +            }  +  +            inline void Push(TMsgLink msg, TAux aux) {  +                ui64 pushSlot = AtomicGetAndIncrement(SlotCounter);  +                TBunch* hintBunch = GetLastBunch();  +  +                for (;;) {  +                    auto hint = hintBunch->Push(msg, pushSlot, aux);  +                    if (Y_LIKELY(hint == PUSH_RESULT_OK)) {  +                        return;  +                    }  +                    HandleHint(hintBunch, hint);  +                }  +            }  +  +        protected:  +            template <typename, template <typename, typename...> class>  +            friend class TReadBucket;  +  +            TBunch* volatile LastBunch; // Hint  +            volatile ui64 SlotCounter;  +  +            inline TBunch* GetLastBunch() const {  +                return AtomicGet(LastBunch);  +            }  +  +            void HandleHint(TBunch*& hintBunch, TPushResult hint) {  +                if (Y_UNLIKELY(hint == PUSH_RESULT_BACKWARD)) {  +                    hintBunch = hintBunch->GetBackLink();  +                    return;  +                }  +  +                // PUSH_RESULT_FORWARD  +                auto nextBunch = hintBunch->GetNextBunch();  +  +                if (nextBunch == nullptr) {  +                    auto first = hintBunch->FirstSlot + BUNCH_SIZE;  +                    nextBunch = new TBunch(first, hintBunch);  +                    if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) {  +                        delete nextBunch;  +                        nextBunch = hintBunch->GetNextBunch();  +                    }  +                }  +  +                // hintBunch could not be freed here so it cannot be reused  +                // it's alright if this CAS was not succeeded,  +                // it means that other thread did that recently  +                AtomicCas(&LastBunch, nextBunch, hintBunch);  +  +                hintBunch = nextBunch;  +            }  +        };  +           template <typename TWBucket = TWriteBucket<>,                    template <typename, typename...> class TContainer = TDeque> -        class TReadBucket { -        public: -            using TAux = typename TWBucket::TUsingAux; -            using TBunch = typename TWBucket::TBunch; - -            static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 5; - -            TReadBucket(TWBucket* writer) -                : Writer(writer) -                , ReadBunch(writer->GetLastBunch()) -                , LastKnownPushBunch(writer->GetLastBunch()) -            { -                ReadBunch->DecrementToken(); // no previous token -            } - -            TReadBucket(TReadBucket toCopy, TWBucket* writer) -                : TReadBucket(std::move(toCopy)) -            { -                Writer = writer; -            } - -            ui64 ReadyCount() const { -                return AtomicGet(Writer->SlotCounter) - ReadSlot; -            } - -            TMsgLink Pop() { -                return PopAux().Msg; -            } - -            TMsgLink Peek() { -                return PeekAux().Msg; -            } - -            TSlot<TAux> PopAux() { -                for (;;) { -                    if (Y_UNLIKELY(ReadNow.size() != 0)) { -                        auto result = PopSkipped(); -                        if (Y_LIKELY(result.Msg != nullptr)) { -                            return result; -                        } -                    } - -                    if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) { -                        if (Y_LIKELY(!RereadPushSlot())) { -                            return TSlot<TAux>::NullElem(); -                        } -                        continue; -                    } - -                    if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) { -                        if (Y_UNLIKELY(!SwitchToNextBunch())) { -                            return TSlot<TAux>::NullElem(); -                        } -                    } - -                    auto result = ReadBunch->GetSlotAux(ReadSlot); -                    if (Y_LIKELY(result.Msg != nullptr)) { -                        ++ReadSlot; -                        return result; -                    } - -                    result = StubbornPop(); -                    if (Y_LIKELY(result.Msg != nullptr)) { -                        return result; -                    } -                } -            } - -            TSlot<TAux> PeekAux() { -                for (;;) { -                    if (Y_UNLIKELY(ReadNow.size() != 0)) { -                        auto result = PeekSkipped(); -                        if (Y_LIKELY(result.Msg != nullptr)) { -                            return result; -                        } -                    } - -                    if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) { -                        if (Y_LIKELY(!RereadPushSlot())) { -                            return TSlot<TAux>::NullElem(); -                        } -                        continue; -                    } - -                    if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) { -                        if (Y_UNLIKELY(!SwitchToNextBunch())) { -                            return TSlot<TAux>::NullElem(); -                        } -                    } - -                    auto result = ReadBunch->GetSlotAux(ReadSlot); -                    if (Y_LIKELY(result.Msg != nullptr)) { -                        return result; -                    } - -                    result = StubbornPeek(); -                    if (Y_LIKELY(result.Msg != nullptr)) { -                        return result; -                    } -                } -            } - -        private: -            TWBucket* Writer; -            TBunch* ReadBunch; -            ui64 ReadSlot = 0; -            TBunch* LastKnownPushBunch; -            ui64 LastKnownPushSlot = 0; - -            struct TSkipItem { -                TBunch* Bunch; -                ui64 Slot; -                TBunch* Token; -            }; - -            TContainer<TSkipItem> ReadNow; -            TContainer<TSkipItem> ReadLater; - -            void AddToReadLater() { -                ReadLater.push_back({ReadBunch, ReadSlot, LastKnownPushBunch}); -                LastKnownPushBunch->IncrementToken(); -                ++ReadSlot; -            } - -            // MUST BE: ReadSlot == LastKnownPushSlot -            bool RereadPushSlot() { -                ReadNow = std::move(ReadLater); -                ReadLater.clear(); - -                auto oldSlot = LastKnownPushSlot; - -                auto currentPushBunch = Writer->GetLastBunch(); -                auto currentPushSlot = AtomicGet(Writer->SlotCounter); - -                if (currentPushBunch != LastKnownPushBunch) { -                    // LastKnownPushBunch could be invalid after this line -                    LastKnownPushBunch->SetNextToken(currentPushBunch); -                } - -                LastKnownPushBunch = currentPushBunch; -                LastKnownPushSlot = currentPushSlot; - -                return oldSlot != LastKnownPushSlot; -            } - -            bool SwitchToNextBunch() { -                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { -                    auto next = ReadBunch->GetNextBunch(); -                    if (next != nullptr) { -                        ReadBunch = next; -                        return true; -                    } -                    SpinLockPause(); -                } -                return false; -            } - -            TSlot<TAux> StubbornPop() { -                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { -                    auto result = ReadBunch->GetSlotAux(ReadSlot); -                    if (Y_LIKELY(result.Msg != nullptr)) { -                        ++ReadSlot; -                        return result; -                    } -                    SpinLockPause(); -                } - -                AddToReadLater(); -                return TSlot<TAux>::NullElem(); -            } - -            TSlot<TAux> StubbornPeek() { -                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { -                    auto result = ReadBunch->GetSlotAux(ReadSlot); -                    if (Y_LIKELY(result.Msg != nullptr)) { -                        return result; -                    } -                    SpinLockPause(); -                } - -                AddToReadLater(); -                return TSlot<TAux>::NullElem(); -            } - -            TSlot<TAux> PopSkipped() { -                do { -                    auto elem = ReadNow.front(); -                    ReadNow.pop_front(); - -                    auto result = elem.Bunch->GetSlotAux(elem.Slot); -                    if (Y_LIKELY(result.Msg != nullptr)) { -                        elem.Token->DecrementToken(); -                        return result; -                    } - -                    ReadLater.emplace_back(elem); - -                } while (ReadNow.size() > 0); - -                return TSlot<TAux>::NullElem(); -            } - -            TSlot<TAux> PeekSkipped() { -                do { -                    auto elem = ReadNow.front(); - -                    auto result = elem.Bunch->GetSlotAux(elem.Slot); -                    if (Y_LIKELY(result.Msg != nullptr)) { -                        return result; -                    } - -                    ReadNow.pop_front(); -                    ReadLater.emplace_back(elem); - -                } while (ReadNow.size() > 0); - -                return TSlot<TAux>::NullElem(); -            } -        }; - -        struct TDefaultParams { -            static constexpr ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE; -            using TBunchBase = TEmpty; - +        class TReadBucket {  +        public:  +            using TAux = typename TWBucket::TUsingAux;  +            using TBunch = typename TWBucket::TBunch;  +  +            static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 5;  +  +            TReadBucket(TWBucket* writer)  +                : Writer(writer)  +                , ReadBunch(writer->GetLastBunch())  +                , LastKnownPushBunch(writer->GetLastBunch())  +            {  +                ReadBunch->DecrementToken(); // no previous token  +            }  +  +            TReadBucket(TReadBucket toCopy, TWBucket* writer)  +                : TReadBucket(std::move(toCopy))  +            {  +                Writer = writer;  +            }  +  +            ui64 ReadyCount() const {  +                return AtomicGet(Writer->SlotCounter) - ReadSlot;  +            }  +  +            TMsgLink Pop() {  +                return PopAux().Msg;  +            }  +  +            TMsgLink Peek() {  +                return PeekAux().Msg;  +            }  +  +            TSlot<TAux> PopAux() {  +                for (;;) {  +                    if (Y_UNLIKELY(ReadNow.size() != 0)) {  +                        auto result = PopSkipped();  +                        if (Y_LIKELY(result.Msg != nullptr)) {  +                            return result;  +                        }  +                    }  +  +                    if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {  +                        if (Y_LIKELY(!RereadPushSlot())) {  +                            return TSlot<TAux>::NullElem();  +                        }  +                        continue;  +                    }  +  +                    if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {  +                        if (Y_UNLIKELY(!SwitchToNextBunch())) {  +                            return TSlot<TAux>::NullElem();  +                        }  +                    }  +  +                    auto result = ReadBunch->GetSlotAux(ReadSlot);  +                    if (Y_LIKELY(result.Msg != nullptr)) {  +                        ++ReadSlot;  +                        return result;  +                    }  +  +                    result = StubbornPop();  +                    if (Y_LIKELY(result.Msg != nullptr)) {  +                        return result;  +                    }  +                }  +            }  +  +            TSlot<TAux> PeekAux() {  +                for (;;) {  +                    if (Y_UNLIKELY(ReadNow.size() != 0)) {  +                        auto result = PeekSkipped();  +                        if (Y_LIKELY(result.Msg != nullptr)) {  +                            return result;  +                        }  +                    }  +  +                    if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {  +                        if (Y_LIKELY(!RereadPushSlot())) {  +                            return TSlot<TAux>::NullElem();  +                        }  +                        continue;  +                    }  +  +                    if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {  +                        if (Y_UNLIKELY(!SwitchToNextBunch())) {  +                            return TSlot<TAux>::NullElem();  +                        }  +                    }  +  +                    auto result = ReadBunch->GetSlotAux(ReadSlot);  +                    if (Y_LIKELY(result.Msg != nullptr)) {  +                        return result;  +                    }  +  +                    result = StubbornPeek();  +                    if (Y_LIKELY(result.Msg != nullptr)) {  +                        return result;  +                    }  +                }  +            }  +  +        private:  +            TWBucket* Writer;  +            TBunch* ReadBunch;  +            ui64 ReadSlot = 0;  +            TBunch* LastKnownPushBunch;  +            ui64 LastKnownPushSlot = 0;  +  +            struct TSkipItem {  +                TBunch* Bunch;  +                ui64 Slot;  +                TBunch* Token;  +            };  +  +            TContainer<TSkipItem> ReadNow;  +            TContainer<TSkipItem> ReadLater;  +  +            void AddToReadLater() {  +                ReadLater.push_back({ReadBunch, ReadSlot, LastKnownPushBunch});  +                LastKnownPushBunch->IncrementToken();  +                ++ReadSlot;  +            }  +  +            // MUST BE: ReadSlot == LastKnownPushSlot  +            bool RereadPushSlot() {  +                ReadNow = std::move(ReadLater);  +                ReadLater.clear();  +  +                auto oldSlot = LastKnownPushSlot;  +  +                auto currentPushBunch = Writer->GetLastBunch();  +                auto currentPushSlot = AtomicGet(Writer->SlotCounter);  +  +                if (currentPushBunch != LastKnownPushBunch) {  +                    // LastKnownPushBunch could be invalid after this line  +                    LastKnownPushBunch->SetNextToken(currentPushBunch);  +                }  +  +                LastKnownPushBunch = currentPushBunch;  +                LastKnownPushSlot = currentPushSlot;  +  +                return oldSlot != LastKnownPushSlot;  +            }  +  +            bool SwitchToNextBunch() {  +                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {  +                    auto next = ReadBunch->GetNextBunch();  +                    if (next != nullptr) {  +                        ReadBunch = next;  +                        return true;  +                    }  +                    SpinLockPause();  +                }  +                return false;  +            }  +  +            TSlot<TAux> StubbornPop() {  +                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {  +                    auto result = ReadBunch->GetSlotAux(ReadSlot);  +                    if (Y_LIKELY(result.Msg != nullptr)) {  +                        ++ReadSlot;  +                        return result;  +                    }  +                    SpinLockPause();  +                }  +  +                AddToReadLater();  +                return TSlot<TAux>::NullElem();  +            }  +  +            TSlot<TAux> StubbornPeek() {  +                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {  +                    auto result = ReadBunch->GetSlotAux(ReadSlot);  +                    if (Y_LIKELY(result.Msg != nullptr)) {  +                        return result;  +                    }  +                    SpinLockPause();  +                }  +  +                AddToReadLater();  +                return TSlot<TAux>::NullElem();  +            }  +  +            TSlot<TAux> PopSkipped() {  +                do {  +                    auto elem = ReadNow.front();  +                    ReadNow.pop_front();  +  +                    auto result = elem.Bunch->GetSlotAux(elem.Slot);  +                    if (Y_LIKELY(result.Msg != nullptr)) {  +                        elem.Token->DecrementToken();  +                        return result;  +                    }  +  +                    ReadLater.emplace_back(elem);  +  +                } while (ReadNow.size() > 0);  +  +                return TSlot<TAux>::NullElem();  +            }  +  +            TSlot<TAux> PeekSkipped() {  +                do {  +                    auto elem = ReadNow.front();  +  +                    auto result = elem.Bunch->GetSlotAux(elem.Slot);  +                    if (Y_LIKELY(result.Msg != nullptr)) {  +                        return result;  +                    }  +  +                    ReadNow.pop_front();  +                    ReadLater.emplace_back(elem);  +  +                } while (ReadNow.size() > 0);  +  +                return TSlot<TAux>::NullElem();  +            }  +        };  +  +        struct TDefaultParams {  +            static constexpr ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE;  +            using TBunchBase = TEmpty;  +               template <typename TElem, typename... TRest>              using TContainer = TDeque<TElem, TRest...>; - -            static constexpr bool DeleteItems = true; -        }; - -    } //namespace NReadAsFilledPrivate - -    DeclareTuneValueParam(TRaFQueueBunchSize, ui32, BUNCH_SIZE); -    DeclareTuneTypeParam(TRaFQueueBunchBase, TBunchBase); -    DeclareTuneContainer(TRaFQueueSkipContainer, TContainer); -    DeclareTuneValueParam(TRaFQueueDeleteItems, bool, DeleteItems); - +  +            static constexpr bool DeleteItems = true;  +        };  +  +    } //namespace NReadAsFilledPrivate  +  +    DeclareTuneValueParam(TRaFQueueBunchSize, ui32, BUNCH_SIZE);  +    DeclareTuneTypeParam(TRaFQueueBunchBase, TBunchBase);  +    DeclareTuneContainer(TRaFQueueSkipContainer, TContainer);  +    DeclareTuneValueParam(TRaFQueueDeleteItems, bool, DeleteItems);  +       template <typename TItem = void, typename... TParams> -    class TReadAsFilledQueue { -    private: -        using TTuned = TTune<NReadAsFilledPrivate::TDefaultParams, TParams...>; - -        static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE; - -        using TBunchBase = typename TTuned::TBunchBase; - +    class TReadAsFilledQueue {  +    private:  +        using TTuned = TTune<NReadAsFilledPrivate::TDefaultParams, TParams...>;  +  +        static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE;  +  +        using TBunchBase = typename TTuned::TBunchBase;  +           template <typename TElem, typename... TRest> -        using TContainer = -            typename TTuned::template TContainer<TElem, TRest...>; - -        using TWriteBucket = -            NReadAsFilledPrivate::TWriteBucket<BUNCH_SIZE, TBunchBase>; -        using TReadBucket = -            NReadAsFilledPrivate::TReadBucket<TWriteBucket, TContainer>; - -    public: -        TReadAsFilledQueue() -            : RBucket(&WBucket) -        { -        } - -        ~TReadAsFilledQueue() { -            if (TTuned::DeleteItems) { -                for (;;) { -                    auto msg = Pop(); -                    if (msg == nullptr) { -                        break; -                    } -                    TDelete::Destroy(msg); -                } -            } -        } - -        void Push(TItem* msg) { -            WBucket.Push((void*)msg, NReadAsFilledPrivate::TEmptyAux()); -        } - -        TItem* Pop() { -            return (TItem*)RBucket.Pop(); -        } - -        TItem* Peek() { -            return (TItem*)RBucket.Peek(); -        } - -    protected: -        TWriteBucket WBucket; -        TReadBucket RBucket; -    }; -} +        using TContainer =  +            typename TTuned::template TContainer<TElem, TRest...>;  +  +        using TWriteBucket =  +            NReadAsFilledPrivate::TWriteBucket<BUNCH_SIZE, TBunchBase>;  +        using TReadBucket =  +            NReadAsFilledPrivate::TReadBucket<TWriteBucket, TContainer>;  +  +    public:  +        TReadAsFilledQueue()  +            : RBucket(&WBucket)  +        {  +        }  +  +        ~TReadAsFilledQueue() {  +            if (TTuned::DeleteItems) {  +                for (;;) {  +                    auto msg = Pop();  +                    if (msg == nullptr) {  +                        break;  +                    }  +                    TDelete::Destroy(msg);  +                }  +            }  +        }  +  +        void Push(TItem* msg) {  +            WBucket.Push((void*)msg, NReadAsFilledPrivate::TEmptyAux());  +        }  +  +        TItem* Pop() {  +            return (TItem*)RBucket.Pop();  +        }  +  +        TItem* Peek() {  +            return (TItem*)RBucket.Peek();  +        }  +  +    protected:  +        TWriteBucket WBucket;  +        TReadBucket RBucket;  +    };  +}  diff --git a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp index 2bd0c298216..00dbfeaa64e 100644 --- a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp +++ b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.cpp @@ -1 +1 @@ -#include "mpsc_vinfarr_obstructive.h" +#include "mpsc_vinfarr_obstructive.h"  diff --git a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h index 5f91f1b5a84..3e1ae923420 100644 --- a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h +++ b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h @@ -1,528 +1,528 @@ -#pragma once - -/* -  Semi-wait-free queue, multiple producers - one consumer. Strict order. -  The queue algorithm is using concept of virtual infinite array. - -  A producer takes a number from a counter and atomicaly increments the counter. -  The number taken is a number of a slot for the producer to put a new message -  into infinite array. - -  Then producer constructs a virtual infinite array by bidirectional linked list -  of blocks. Each block contains several slots. - -  There is a hint pointer which optimisticly points to the last block -  of the list and never goes backward. - -  Consumer exploits the property of the hint pointer always going forward -  to free old blocks eventually. Consumer periodically read the hint pointer -  and the counter and thus deduce producers which potentially holds the pointer -  to a block. Consumer can free the block if all that producers filled their -  slots and left the queue. - -  No producer can stop the progress for other producers. - -  Consumer can obstruct a slot of a delayed producer by putting special mark. -  Thus no producer can stop the progress for consumer. -  But a slow producer may be forced to retry unlimited number of times. -  Though it's very unlikely for a non-preempted producer to be obstructed. -  That's why the algorithm is semi-wait-free. - -  WARNING: there is no wait¬ify mechanic for consumer, -  consumer receives nullptr if queue was empty. - -  WARNING: though the algorithm itself is lock-free -  but producers and consumer could be blocked by memory allocator - -  WARNING: copy constructers of the queue are not thread-safe - */ - -#include <util/generic/noncopyable.h> -#include <util/generic/ptr.h> -#include <util/system/atomic.h> -#include <util/system/spinlock.h> - -#include "tune.h" - -namespace NThreading { -    namespace NObstructiveQueuePrivate { -        typedef void* TMsgLink; - -        struct TEmpty { -        }; - -        struct TEmptyAux { -            TEmptyAux Retrieve() const { -                return TEmptyAux(); -            } -            void Store(TEmptyAux&) { -            } -            static constexpr TEmptyAux Zero() { -                return TEmptyAux(); -            } -        }; - -        template <typename TAux> -        struct TSlot { -            TMsgLink volatile Msg; -            TAux AuxiliaryData; - -            inline void Store(TAux& aux) { -                AuxiliaryData.Store(aux); -            } - -            inline TAux Retrieve() const { -                return AuxiliaryData.Retrieve(); -            } - -            static TSlot<TAux> NullElem() { -                return {nullptr, TAux::Zero()}; -            } - -            static TSlot<TAux> Pair(TMsgLink msg, TAux aux) { -                return {msg, std::move(aux)}; -            } -        }; - -        template <> -        struct TSlot<TEmptyAux> { -            TMsgLink volatile Msg; -            inline void Store(TEmptyAux&) { -            } -            inline TEmptyAux Retrieve() const { -                return TEmptyAux(); -            } - -            static TSlot<TEmptyAux> NullElem() { -                return {nullptr}; -            } - -            static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) { -                return {msg}; -            } -        }; - -        enum TPushResult { -            PUSH_RESULT_OK, -            PUSH_RESULT_BACKWARD, -            PUSH_RESULT_FORWARD, -            PUSH_RESULT_BLOCKED, -        }; - -        template <typename TAux, ui32 BUNCH_SIZE, typename TBase = TEmpty> -        struct TMsgBunch: public TBase { -            ui64 FirstSlot; - -            TSlot<TAux> LinkArray[BUNCH_SIZE]; - -            TMsgBunch* volatile NextBunch; -            TMsgBunch* volatile BackLink; - -            ui64 volatile Token; -            TMsgBunch* volatile NextToken; - -            /* this push can return PUSH_RESULT_BLOCKED */ -            inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) { -                if (Y_UNLIKELY(slot < FirstSlot)) { -                    return PUSH_RESULT_BACKWARD; -                } - -                if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) { -                    return PUSH_RESULT_FORWARD; -                } - -                LinkArray[slot - FirstSlot].Store(auxiliary); - -                auto oldValue = AtomicSwap(&LinkArray[slot - FirstSlot].Msg, msg); - -                if (Y_LIKELY(oldValue == nullptr)) { -                    return PUSH_RESULT_OK; -                } else { -                    LeaveBlocked(oldValue); -                    return PUSH_RESULT_BLOCKED; -                } -            } - -            inline bool IsSlotHere(ui64 slot) { -                return slot < FirstSlot + BUNCH_SIZE; -            } - -            inline TMsgLink GetSlot(ui64 slot) const { -                return AtomicGet(LinkArray[slot - FirstSlot].Msg); -            } - -            inline TSlot<TAux> GetSlotAux(ui64 slot) const { -                auto msg = GetSlot(slot); -                auto aux = LinkArray[slot - FirstSlot].Retrieve(); -                return TSlot<TAux>::Pair(msg, aux); -            } - -            void LeaveBlocked(ui64 slot) { -                auto token = GetToken(slot); -                token->DecrementToken(); -            } - -            void LeaveBlocked(TMsgLink msg) { -                auto token = reinterpret_cast<TMsgBunch*>(msg); -                token->DecrementToken(); -            } - -            TSlot<TAux> BlockSlotAux(ui64 slot, TMsgBunch* token) { -                auto old = -                    AtomicSwap(&LinkArray[slot - FirstSlot].Msg, (TMsgLink)token); -                if (old == nullptr) { -                    // It's valid to increment after AtomicCas -                    // because token will release data only after SetNextToken -                    token->IncrementToken(); -                    return TSlot<TAux>::NullElem(); -                } -                return TSlot<TAux>::Pair(old, LinkArray[slot - FirstSlot].Retrieve()); -            } - -            inline TMsgBunch* GetNextBunch() const { -                return AtomicGet(NextBunch); -            } - -            inline bool SetNextBunch(TMsgBunch* ptr) { -                return AtomicCas(&NextBunch, ptr, nullptr); -            } - -            inline TMsgBunch* GetBackLink() const { -                return AtomicGet(BackLink); -            } - -            inline TMsgBunch* GetToken(ui64 slot) { -                return reinterpret_cast<TMsgBunch*>(LinkArray[slot - FirstSlot].Msg); -            } - -            inline void IncrementToken() { -                AtomicIncrement(Token); -            } - -            // the object could be destroyed after this method -            inline void DecrementToken() { -                if (Y_UNLIKELY(AtomicDecrement(Token) == BUNCH_SIZE)) { -                    Release(this); -                    AtomicGet(NextToken)->DecrementToken(); -                    // this could be invalid here -                } -            } - -            // the object could be destroyed after this method -            inline void SetNextToken(TMsgBunch* next) { -                AtomicSet(NextToken, next); -                if (Y_UNLIKELY(AtomicAdd(Token, BUNCH_SIZE) == BUNCH_SIZE)) { -                    Release(this); -                    next->DecrementToken(); -                } -                // this could be invalid here -            } - -            TMsgBunch(ui64 start, TMsgBunch* backLink) { -                AtomicSet(FirstSlot, start); -                memset(&LinkArray, 0, sizeof(LinkArray)); -                AtomicSet(NextBunch, nullptr); -                AtomicSet(BackLink, backLink); - -                AtomicSet(Token, 1); -                AtomicSet(NextToken, nullptr); -            } - -            static void Release(TMsgBunch* bunch) { -                auto backLink = AtomicGet(bunch->BackLink); -                if (backLink == nullptr) { -                    return; -                } -                AtomicSet(bunch->BackLink, nullptr); - -                do { -                    auto bbackLink = backLink->BackLink; -                    delete backLink; -                    backLink = bbackLink; -                } while (backLink != nullptr); -            } - -            void Destroy() { -                for (auto tail = BackLink; tail != nullptr;) { -                    auto next = tail->BackLink; -                    delete tail; -                    tail = next; -                } - -                for (auto next = this; next != nullptr;) { -                    auto nnext = next->NextBunch; -                    delete next; -                    next = nnext; -                } -            } -        }; - -        template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase = TEmpty> -        class TWriteBucket { -        public: -            static const ui64 GROSS_SIZE; - -            using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>; - -            TWriteBucket(TBunch* bunch = new TBunch(0, nullptr)) -                : LastBunch(bunch) -                , SlotCounter(0) -            { -            } - -            TWriteBucket(TWriteBucket&& move) -                : LastBunch(move.LastBunch) -                , SlotCounter(move.SlotCounter) -            { -                move.LastBunch = nullptr; -            } - -            ~TWriteBucket() { -                if (LastBunch != nullptr) { -                    LastBunch->Destroy(); -                } -            } - -            inline bool Push(TMsgLink msg, TAux aux) { -                ui64 pushSlot = AtomicGetAndIncrement(SlotCounter); -                TBunch* hintBunch = GetLastBunch(); - -                for (;;) { -                    auto hint = hintBunch->Push(msg, pushSlot, aux); -                    if (Y_LIKELY(hint == PUSH_RESULT_OK)) { -                        return true; -                    } -                    bool hhResult = HandleHint(hintBunch, hint); -                    if (Y_UNLIKELY(!hhResult)) { -                        return false; -                    } -                } -            } - -        protected: -            template <typename, ui32, typename> -            friend class TReadBucket; - -            TBunch* volatile LastBunch; // Hint -            volatile ui64 SlotCounter; - -            inline TBunch* GetLastBunch() const { -                return AtomicGet(LastBunch); -            } - -            bool HandleHint(TBunch*& hintBunch, TPushResult hint) { -                if (Y_UNLIKELY(hint == PUSH_RESULT_BLOCKED)) { -                    return false; -                } - -                if (Y_UNLIKELY(hint == PUSH_RESULT_BACKWARD)) { -                    hintBunch = hintBunch->GetBackLink(); -                    return true; -                } - -                // PUSH_RESULT_FORWARD -                auto nextBunch = hintBunch->GetNextBunch(); - -                if (nextBunch == nullptr) { -                    auto first = hintBunch->FirstSlot + BUNCH_SIZE; -                    nextBunch = new TBunch(first, hintBunch); -                    if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) { -                        delete nextBunch; -                        nextBunch = hintBunch->GetNextBunch(); -                    } -                } - -                // hintBunch could not be freed here so it cannot be reused -                // it's alright if this CAS was not succeeded, -                // it means that other thread did that recently -                AtomicCas(&LastBunch, nextBunch, hintBunch); - -                hintBunch = nextBunch; -                return true; -            } -        }; - -        template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase> -        class TReadBucket { -        public: -            static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 20; - -            using TWBucket = TWriteBucket<TAux, BUNCH_SIZE, TBunchBase>; -            using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>; - -            TReadBucket(TWBucket* writer) -                : Writer(writer) -                , ReadBunch(writer->GetLastBunch()) -                , LastKnownPushBunch(writer->GetLastBunch()) -            { -                ReadBunch->DecrementToken(); // no previous token -            } - -            TReadBucket(TReadBucket toCopy, TWBucket* writer) -                : TReadBucket(std::move(toCopy)) -            { -                Writer = writer; -            } - -            ui64 ReadyCount() const { -                return AtomicGet(Writer->SlotCounter) - ReadSlot; -            } - -            inline TMsgLink Pop() { -                return PopAux().Msg; -            } - -            inline TSlot<TAux> PopAux() { -                for (;;) { -                    if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) { -                        if (Y_LIKELY(!RereadPushSlot())) { -                            return TSlot<TAux>::NullElem(); -                        } -                    } - -                    if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) { -                        if (Y_UNLIKELY(!SwitchToNextBunch())) { -                            return TSlot<TAux>::NullElem(); -                        } -                    } - -                    auto result = ReadBunch->GetSlotAux(ReadSlot); -                    if (Y_LIKELY(result.Msg != nullptr)) { -                        ++ReadSlot; -                        return result; -                    } - -                    if (ReadSlot + 1 == AtomicGet(Writer->SlotCounter)) { -                        return TSlot<TAux>::NullElem(); -                    } - -                    result = StubbornPopAux(); - -                    if (result.Msg != nullptr) { -                        return result; -                    } -                } -            } - -        private: -            TWBucket* Writer; -            TBunch* ReadBunch; -            ui64 ReadSlot = 0; -            TBunch* LastKnownPushBunch; -            ui64 LastKnownPushSlot = 0; - -            // MUST BE: ReadSlot == LastKnownPushSlot -            bool RereadPushSlot() { -                auto oldSlot = LastKnownPushSlot; - -                auto currentPushBunch = Writer->GetLastBunch(); -                auto currentPushSlot = AtomicGet(Writer->SlotCounter); - -                if (currentPushBunch != LastKnownPushBunch) { -                    // LastKnownPushBunch could be invalid after this line -                    LastKnownPushBunch->SetNextToken(currentPushBunch); -                } - -                LastKnownPushBunch = currentPushBunch; -                LastKnownPushSlot = currentPushSlot; - -                return oldSlot != LastKnownPushSlot; -            } - -            bool SwitchToNextBunch() { -                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { -                    auto next = ReadBunch->GetNextBunch(); -                    if (next != nullptr) { -                        ReadBunch = next; -                        return true; -                    } -                    SpinLockPause(); -                } -                return false; -            } - -            TSlot<TAux> StubbornPopAux() { -                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) { -                    auto result = ReadBunch->GetSlotAux(ReadSlot); -                    if (Y_LIKELY(result.Msg != nullptr)) { -                        ++ReadSlot; -                        return result; -                    } -                    SpinLockPause(); -                } - -                return ReadBunch->BlockSlotAux(ReadSlot++, LastKnownPushBunch); -            } -        }; - -        struct TDefaultParams { -            static constexpr bool DeleteItems = true; -            using TAux = NObstructiveQueuePrivate::TEmptyAux; -            using TBunchBase = NObstructiveQueuePrivate::TEmpty; -            static constexpr ui32 BUNCH_SIZE = 251; -        }; - -    } //namespace NObstructiveQueuePrivate - -    DeclareTuneValueParam(TObstructiveQueueBunchSize, ui32, BUNCH_SIZE); -    DeclareTuneValueParam(TObstructiveQueueDeleteItems, bool, DeleteItems); -    DeclareTuneTypeParam(TObstructiveQueueBunchBase, TBunchBase); -    DeclareTuneTypeParam(TObstructiveQueueAux, TAux); - +#pragma once  +  +/*  +  Semi-wait-free queue, multiple producers - one consumer. Strict order.  +  The queue algorithm is using concept of virtual infinite array.  +  +  A producer takes a number from a counter and atomicaly increments the counter.  +  The number taken is a number of a slot for the producer to put a new message  +  into infinite array.  +  +  Then producer constructs a virtual infinite array by bidirectional linked list  +  of blocks. Each block contains several slots.  +  +  There is a hint pointer which optimisticly points to the last block  +  of the list and never goes backward.  +  +  Consumer exploits the property of the hint pointer always going forward  +  to free old blocks eventually. Consumer periodically read the hint pointer  +  and the counter and thus deduce producers which potentially holds the pointer  +  to a block. Consumer can free the block if all that producers filled their  +  slots and left the queue.  +  +  No producer can stop the progress for other producers.  +  +  Consumer can obstruct a slot of a delayed producer by putting special mark.  +  Thus no producer can stop the progress for consumer.  +  But a slow producer may be forced to retry unlimited number of times.  +  Though it's very unlikely for a non-preempted producer to be obstructed.  +  That's why the algorithm is semi-wait-free.  +  +  WARNING: there is no wait¬ify mechanic for consumer,  +  consumer receives nullptr if queue was empty.  +  +  WARNING: though the algorithm itself is lock-free  +  but producers and consumer could be blocked by memory allocator  +  +  WARNING: copy constructers of the queue are not thread-safe  + */  +  +#include <util/generic/noncopyable.h>  +#include <util/generic/ptr.h>  +#include <util/system/atomic.h>  +#include <util/system/spinlock.h>  +  +#include "tune.h"  +  +namespace NThreading {  +    namespace NObstructiveQueuePrivate {  +        typedef void* TMsgLink;  +  +        struct TEmpty {  +        };  +  +        struct TEmptyAux {  +            TEmptyAux Retrieve() const {  +                return TEmptyAux();  +            }  +            void Store(TEmptyAux&) {  +            }  +            static constexpr TEmptyAux Zero() {  +                return TEmptyAux();  +            }  +        };  +  +        template <typename TAux>  +        struct TSlot {  +            TMsgLink volatile Msg;  +            TAux AuxiliaryData;  +  +            inline void Store(TAux& aux) {  +                AuxiliaryData.Store(aux);  +            }  +  +            inline TAux Retrieve() const {  +                return AuxiliaryData.Retrieve();  +            }  +  +            static TSlot<TAux> NullElem() {  +                return {nullptr, TAux::Zero()};  +            }  +  +            static TSlot<TAux> Pair(TMsgLink msg, TAux aux) {  +                return {msg, std::move(aux)};  +            }  +        };  +  +        template <>  +        struct TSlot<TEmptyAux> {  +            TMsgLink volatile Msg;  +            inline void Store(TEmptyAux&) {  +            }  +            inline TEmptyAux Retrieve() const {  +                return TEmptyAux();  +            }  +  +            static TSlot<TEmptyAux> NullElem() {  +                return {nullptr};  +            }  +  +            static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) {  +                return {msg};  +            }  +        };  +  +        enum TPushResult {  +            PUSH_RESULT_OK,  +            PUSH_RESULT_BACKWARD,  +            PUSH_RESULT_FORWARD,  +            PUSH_RESULT_BLOCKED,  +        };  +  +        template <typename TAux, ui32 BUNCH_SIZE, typename TBase = TEmpty>  +        struct TMsgBunch: public TBase {  +            ui64 FirstSlot;  +  +            TSlot<TAux> LinkArray[BUNCH_SIZE];  +  +            TMsgBunch* volatile NextBunch;  +            TMsgBunch* volatile BackLink;  +  +            ui64 volatile Token;  +            TMsgBunch* volatile NextToken;  +  +            /* this push can return PUSH_RESULT_BLOCKED */  +            inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) {  +                if (Y_UNLIKELY(slot < FirstSlot)) {  +                    return PUSH_RESULT_BACKWARD;  +                }  +  +                if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) {  +                    return PUSH_RESULT_FORWARD;  +                }  +  +                LinkArray[slot - FirstSlot].Store(auxiliary);  +  +                auto oldValue = AtomicSwap(&LinkArray[slot - FirstSlot].Msg, msg);  +  +                if (Y_LIKELY(oldValue == nullptr)) {  +                    return PUSH_RESULT_OK;  +                } else {  +                    LeaveBlocked(oldValue);  +                    return PUSH_RESULT_BLOCKED;  +                }  +            }  +  +            inline bool IsSlotHere(ui64 slot) {  +                return slot < FirstSlot + BUNCH_SIZE;  +            }  +  +            inline TMsgLink GetSlot(ui64 slot) const {  +                return AtomicGet(LinkArray[slot - FirstSlot].Msg);  +            }  +  +            inline TSlot<TAux> GetSlotAux(ui64 slot) const {  +                auto msg = GetSlot(slot);  +                auto aux = LinkArray[slot - FirstSlot].Retrieve();  +                return TSlot<TAux>::Pair(msg, aux);  +            }  +  +            void LeaveBlocked(ui64 slot) {  +                auto token = GetToken(slot);  +                token->DecrementToken();  +            }  +  +            void LeaveBlocked(TMsgLink msg) {  +                auto token = reinterpret_cast<TMsgBunch*>(msg);  +                token->DecrementToken();  +            }  +  +            TSlot<TAux> BlockSlotAux(ui64 slot, TMsgBunch* token) {  +                auto old =  +                    AtomicSwap(&LinkArray[slot - FirstSlot].Msg, (TMsgLink)token);  +                if (old == nullptr) {  +                    // It's valid to increment after AtomicCas  +                    // because token will release data only after SetNextToken  +                    token->IncrementToken();  +                    return TSlot<TAux>::NullElem();  +                }  +                return TSlot<TAux>::Pair(old, LinkArray[slot - FirstSlot].Retrieve());  +            }  +  +            inline TMsgBunch* GetNextBunch() const {  +                return AtomicGet(NextBunch);  +            }  +  +            inline bool SetNextBunch(TMsgBunch* ptr) {  +                return AtomicCas(&NextBunch, ptr, nullptr);  +            }  +  +            inline TMsgBunch* GetBackLink() const {  +                return AtomicGet(BackLink);  +            }  +  +            inline TMsgBunch* GetToken(ui64 slot) {  +                return reinterpret_cast<TMsgBunch*>(LinkArray[slot - FirstSlot].Msg);  +            }  +  +            inline void IncrementToken() {  +                AtomicIncrement(Token);  +            }  +  +            // the object could be destroyed after this method  +            inline void DecrementToken() {  +                if (Y_UNLIKELY(AtomicDecrement(Token) == BUNCH_SIZE)) {  +                    Release(this);  +                    AtomicGet(NextToken)->DecrementToken();  +                    // this could be invalid here  +                }  +            }  +  +            // the object could be destroyed after this method  +            inline void SetNextToken(TMsgBunch* next) {  +                AtomicSet(NextToken, next);  +                if (Y_UNLIKELY(AtomicAdd(Token, BUNCH_SIZE) == BUNCH_SIZE)) {  +                    Release(this);  +                    next->DecrementToken();  +                }  +                // this could be invalid here  +            }  +  +            TMsgBunch(ui64 start, TMsgBunch* backLink) {  +                AtomicSet(FirstSlot, start);  +                memset(&LinkArray, 0, sizeof(LinkArray));  +                AtomicSet(NextBunch, nullptr);  +                AtomicSet(BackLink, backLink);  +  +                AtomicSet(Token, 1);  +                AtomicSet(NextToken, nullptr);  +            }  +  +            static void Release(TMsgBunch* bunch) {  +                auto backLink = AtomicGet(bunch->BackLink);  +                if (backLink == nullptr) {  +                    return;  +                }  +                AtomicSet(bunch->BackLink, nullptr);  +  +                do {  +                    auto bbackLink = backLink->BackLink;  +                    delete backLink;  +                    backLink = bbackLink;  +                } while (backLink != nullptr);  +            }  +  +            void Destroy() {  +                for (auto tail = BackLink; tail != nullptr;) {  +                    auto next = tail->BackLink;  +                    delete tail;  +                    tail = next;  +                }  +  +                for (auto next = this; next != nullptr;) {  +                    auto nnext = next->NextBunch;  +                    delete next;  +                    next = nnext;  +                }  +            }  +        };  +  +        template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase = TEmpty>  +        class TWriteBucket {  +        public:  +            static const ui64 GROSS_SIZE;  +  +            using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>;  +  +            TWriteBucket(TBunch* bunch = new TBunch(0, nullptr))  +                : LastBunch(bunch)  +                , SlotCounter(0)  +            {  +            }  +  +            TWriteBucket(TWriteBucket&& move)  +                : LastBunch(move.LastBunch)  +                , SlotCounter(move.SlotCounter)  +            {  +                move.LastBunch = nullptr;  +            }  +  +            ~TWriteBucket() {  +                if (LastBunch != nullptr) {  +                    LastBunch->Destroy();  +                }  +            }  +  +            inline bool Push(TMsgLink msg, TAux aux) {  +                ui64 pushSlot = AtomicGetAndIncrement(SlotCounter);  +                TBunch* hintBunch = GetLastBunch();  +  +                for (;;) {  +                    auto hint = hintBunch->Push(msg, pushSlot, aux);  +                    if (Y_LIKELY(hint == PUSH_RESULT_OK)) {  +                        return true;  +                    }  +                    bool hhResult = HandleHint(hintBunch, hint);  +                    if (Y_UNLIKELY(!hhResult)) {  +                        return false;  +                    }  +                }  +            }  +  +        protected:  +            template <typename, ui32, typename>  +            friend class TReadBucket;  +  +            TBunch* volatile LastBunch; // Hint  +            volatile ui64 SlotCounter;  +  +            inline TBunch* GetLastBunch() const {  +                return AtomicGet(LastBunch);  +            }  +  +            bool HandleHint(TBunch*& hintBunch, TPushResult hint) {  +                if (Y_UNLIKELY(hint == PUSH_RESULT_BLOCKED)) {  +                    return false;  +                }  +  +                if (Y_UNLIKELY(hint == PUSH_RESULT_BACKWARD)) {  +                    hintBunch = hintBunch->GetBackLink();  +                    return true;  +                }  +  +                // PUSH_RESULT_FORWARD  +                auto nextBunch = hintBunch->GetNextBunch();  +  +                if (nextBunch == nullptr) {  +                    auto first = hintBunch->FirstSlot + BUNCH_SIZE;  +                    nextBunch = new TBunch(first, hintBunch);  +                    if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) {  +                        delete nextBunch;  +                        nextBunch = hintBunch->GetNextBunch();  +                    }  +                }  +  +                // hintBunch could not be freed here so it cannot be reused  +                // it's alright if this CAS was not succeeded,  +                // it means that other thread did that recently  +                AtomicCas(&LastBunch, nextBunch, hintBunch);  +  +                hintBunch = nextBunch;  +                return true;  +            }  +        };  +  +        template <typename TAux, ui32 BUNCH_SIZE, typename TBunchBase>  +        class TReadBucket {  +        public:  +            static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 20;  +  +            using TWBucket = TWriteBucket<TAux, BUNCH_SIZE, TBunchBase>;  +            using TBunch = TMsgBunch<TAux, BUNCH_SIZE, TBunchBase>;  +  +            TReadBucket(TWBucket* writer)  +                : Writer(writer)  +                , ReadBunch(writer->GetLastBunch())  +                , LastKnownPushBunch(writer->GetLastBunch())  +            {  +                ReadBunch->DecrementToken(); // no previous token  +            }  +  +            TReadBucket(TReadBucket toCopy, TWBucket* writer)  +                : TReadBucket(std::move(toCopy))  +            {  +                Writer = writer;  +            }  +  +            ui64 ReadyCount() const {  +                return AtomicGet(Writer->SlotCounter) - ReadSlot;  +            }  +  +            inline TMsgLink Pop() {  +                return PopAux().Msg;  +            }  +  +            inline TSlot<TAux> PopAux() {  +                for (;;) {  +                    if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {  +                        if (Y_LIKELY(!RereadPushSlot())) {  +                            return TSlot<TAux>::NullElem();  +                        }  +                    }  +  +                    if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {  +                        if (Y_UNLIKELY(!SwitchToNextBunch())) {  +                            return TSlot<TAux>::NullElem();  +                        }  +                    }  +  +                    auto result = ReadBunch->GetSlotAux(ReadSlot);  +                    if (Y_LIKELY(result.Msg != nullptr)) {  +                        ++ReadSlot;  +                        return result;  +                    }  +  +                    if (ReadSlot + 1 == AtomicGet(Writer->SlotCounter)) {  +                        return TSlot<TAux>::NullElem();  +                    }  +  +                    result = StubbornPopAux();  +  +                    if (result.Msg != nullptr) {  +                        return result;  +                    }  +                }  +            }  +  +        private:  +            TWBucket* Writer;  +            TBunch* ReadBunch;  +            ui64 ReadSlot = 0;  +            TBunch* LastKnownPushBunch;  +            ui64 LastKnownPushSlot = 0;  +  +            // MUST BE: ReadSlot == LastKnownPushSlot  +            bool RereadPushSlot() {  +                auto oldSlot = LastKnownPushSlot;  +  +                auto currentPushBunch = Writer->GetLastBunch();  +                auto currentPushSlot = AtomicGet(Writer->SlotCounter);  +  +                if (currentPushBunch != LastKnownPushBunch) {  +                    // LastKnownPushBunch could be invalid after this line  +                    LastKnownPushBunch->SetNextToken(currentPushBunch);  +                }  +  +                LastKnownPushBunch = currentPushBunch;  +                LastKnownPushSlot = currentPushSlot;  +  +                return oldSlot != LastKnownPushSlot;  +            }  +  +            bool SwitchToNextBunch() {  +                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {  +                    auto next = ReadBunch->GetNextBunch();  +                    if (next != nullptr) {  +                        ReadBunch = next;  +                        return true;  +                    }  +                    SpinLockPause();  +                }  +                return false;  +            }  +  +            TSlot<TAux> StubbornPopAux() {  +                for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {  +                    auto result = ReadBunch->GetSlotAux(ReadSlot);  +                    if (Y_LIKELY(result.Msg != nullptr)) {  +                        ++ReadSlot;  +                        return result;  +                    }  +                    SpinLockPause();  +                }  +  +                return ReadBunch->BlockSlotAux(ReadSlot++, LastKnownPushBunch);  +            }  +        };  +  +        struct TDefaultParams {  +            static constexpr bool DeleteItems = true;  +            using TAux = NObstructiveQueuePrivate::TEmptyAux;  +            using TBunchBase = NObstructiveQueuePrivate::TEmpty;  +            static constexpr ui32 BUNCH_SIZE = 251;  +        };  +  +    } //namespace NObstructiveQueuePrivate  +  +    DeclareTuneValueParam(TObstructiveQueueBunchSize, ui32, BUNCH_SIZE);  +    DeclareTuneValueParam(TObstructiveQueueDeleteItems, bool, DeleteItems);  +    DeclareTuneTypeParam(TObstructiveQueueBunchBase, TBunchBase);  +    DeclareTuneTypeParam(TObstructiveQueueAux, TAux);  +       template <typename TItem = void, typename... TParams> -    class TObstructiveConsumerAuxQueue { -    private: -        using TTuned = -            TTune<NObstructiveQueuePrivate::TDefaultParams, TParams...>; - -        using TAux = typename TTuned::TAux; -        using TSlot = NObstructiveQueuePrivate::TSlot<TAux>; -        using TMsgLink = NObstructiveQueuePrivate::TMsgLink; -        using TBunchBase = typename TTuned::TBunchBase; -        static constexpr bool DeleteItems = TTuned::DeleteItems; -        static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE; - -    public: -        TObstructiveConsumerAuxQueue() -            : RBuckets(&WBucket) -        { -        } - -        ~TObstructiveConsumerAuxQueue() { -            if (DeleteItems) { -                for (;;) { -                    auto msg = Pop(); -                    if (msg == nullptr) { -                        break; -                    } -                    TDelete::Destroy(msg); -                } -            } -        } - -        void Push(TItem* msg) { -            while (!WBucket.Push(reinterpret_cast<TMsgLink>(msg), TAux())) { -            } -        } - -        TItem* Pop() { -            return reinterpret_cast<TItem*>(RBuckets.Pop()); -        } - -        TSlot PopAux() { -            return RBuckets.PopAux(); -        } - -    private: -        NObstructiveQueuePrivate::TWriteBucket<TAux, BUNCH_SIZE, TBunchBase> -            WBucket; -        NObstructiveQueuePrivate::TReadBucket<TAux, BUNCH_SIZE, TBunchBase> -            RBuckets; -    }; - -    template <typename TItem = void, bool DeleteItems = true> -    class TObstructiveConsumerQueue +    class TObstructiveConsumerAuxQueue {  +    private:  +        using TTuned =  +            TTune<NObstructiveQueuePrivate::TDefaultParams, TParams...>;  +  +        using TAux = typename TTuned::TAux;  +        using TSlot = NObstructiveQueuePrivate::TSlot<TAux>;  +        using TMsgLink = NObstructiveQueuePrivate::TMsgLink;  +        using TBunchBase = typename TTuned::TBunchBase;  +        static constexpr bool DeleteItems = TTuned::DeleteItems;  +        static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE;  +  +    public:  +        TObstructiveConsumerAuxQueue()  +            : RBuckets(&WBucket)  +        {  +        }  +  +        ~TObstructiveConsumerAuxQueue() {  +            if (DeleteItems) {  +                for (;;) {  +                    auto msg = Pop();  +                    if (msg == nullptr) {  +                        break;  +                    }  +                    TDelete::Destroy(msg);  +                }  +            }  +        }  +  +        void Push(TItem* msg) {  +            while (!WBucket.Push(reinterpret_cast<TMsgLink>(msg), TAux())) {  +            }  +        }  +  +        TItem* Pop() {  +            return reinterpret_cast<TItem*>(RBuckets.Pop());  +        }  +  +        TSlot PopAux() {  +            return RBuckets.PopAux();  +        }  +  +    private:  +        NObstructiveQueuePrivate::TWriteBucket<TAux, BUNCH_SIZE, TBunchBase>  +            WBucket;  +        NObstructiveQueuePrivate::TReadBucket<TAux, BUNCH_SIZE, TBunchBase>  +            RBuckets;  +    };  +  +    template <typename TItem = void, bool DeleteItems = true>  +    class TObstructiveConsumerQueue          : public TObstructiveConsumerAuxQueue<TItem,                                                TObstructiveQueueDeleteItems<DeleteItems>> { -    }; +    };   } diff --git a/library/cpp/threading/queue/queue_ut.cpp b/library/cpp/threading/queue/queue_ut.cpp index 80eca147da9..8b36437034c 100644 --- a/library/cpp/threading/queue/queue_ut.cpp +++ b/library/cpp/threading/queue/queue_ut.cpp @@ -1,242 +1,242 @@  #include <library/cpp/testing/unittest/registar.h> -#include <util/system/thread.h> - -#include "ut_helpers.h" - -typedef void* TMsgLink; - +#include <util/system/thread.h>  +  +#include "ut_helpers.h"  +  +typedef void* TMsgLink;  +   template <typename TQueueType> -class TQueueTestProcs: public TTestBase { -private: +class TQueueTestProcs: public TTestBase {  +private:       UNIT_TEST_SUITE_DEMANGLE(TQueueTestProcs<TQueueType>); -    UNIT_TEST(Threads2_Push1M_Threads1_Pop2M) -    UNIT_TEST(Threads4_Push1M_Threads1_Pop4M) -    UNIT_TEST(Threads8_RndPush100K_Threads8_Queues) +    UNIT_TEST(Threads2_Push1M_Threads1_Pop2M)  +    UNIT_TEST(Threads4_Push1M_Threads1_Pop4M)  +    UNIT_TEST(Threads8_RndPush100K_Threads8_Queues)       /* -    UNIT_TEST(Threads24_RndPush100K_Threads24_Queues) -    UNIT_TEST(Threads24_RndPush100K_Threads8_Queues) -    UNIT_TEST(Threads24_RndPush100K_Threads4_Queues) -*/ -    UNIT_TEST_SUITE_END(); - -public: -    void Push1M_Pop1M() { +    UNIT_TEST(Threads24_RndPush100K_Threads24_Queues)  +    UNIT_TEST(Threads24_RndPush100K_Threads8_Queues)  +    UNIT_TEST(Threads24_RndPush100K_Threads4_Queues)  +*/  +    UNIT_TEST_SUITE_END();  +  +public:  +    void Push1M_Pop1M() {           TQueueType queue; -        TMsgLink msg = &queue; - -        auto pmsg = queue.Pop(); -        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); - -        for (int i = 0; i < 1000000; ++i) { -            queue.Push((char*)msg + i); -        } - -        for (int i = 0; i < 1000000; ++i) { -            auto popped = queue.Pop(); -            UNIT_ASSERT_EQUAL((char*)msg + i, popped); -        } - -        pmsg = queue.Pop(); -        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); -    } - -    void Threads2_Push1M_Threads1_Pop2M() { +        TMsgLink msg = &queue;  +  +        auto pmsg = queue.Pop();  +        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);  +  +        for (int i = 0; i < 1000000; ++i) {  +            queue.Push((char*)msg + i);  +        }  +  +        for (int i = 0; i < 1000000; ++i) {  +            auto popped = queue.Pop();  +            UNIT_ASSERT_EQUAL((char*)msg + i, popped);  +        }  +  +        pmsg = queue.Pop();  +        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);  +    }  +  +    void Threads2_Push1M_Threads1_Pop2M() {           TQueueType queue; - +           class TPusherThread: public ISimpleThread { -        public: +        public:               TPusherThread(TQueueType& theQueue, char* start)                  : Queue(theQueue) -                , Arg(start) -            { -            } - +                , Arg(start)  +            {  +            }  +               TQueueType& Queue; -            char* Arg; - -            void* ThreadProc() override { -                for (int i = 0; i < 1000000; ++i) { -                    Queue.Push(Arg + i); -                } -                return nullptr; -            } -        }; - -        TPusherThread pusher1(queue, (char*)&queue); -        TPusherThread pusher2(queue, (char*)&queue + 2000000); - -        pusher1.Start(); -        pusher2.Start(); - -        for (int i = 0; i < 2000000; ++i) { -            while (queue.Pop() == nullptr) { -                SpinLockPause(); -            } -        } - -        auto pmsg = queue.Pop(); -        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); -    } - -    void Threads4_Push1M_Threads1_Pop4M() { +            char* Arg;  +  +            void* ThreadProc() override {  +                for (int i = 0; i < 1000000; ++i) {  +                    Queue.Push(Arg + i);  +                }  +                return nullptr;  +            }  +        };  +  +        TPusherThread pusher1(queue, (char*)&queue);  +        TPusherThread pusher2(queue, (char*)&queue + 2000000);  +  +        pusher1.Start();  +        pusher2.Start();  +  +        for (int i = 0; i < 2000000; ++i) {  +            while (queue.Pop() == nullptr) {  +                SpinLockPause();  +            }  +        }  +  +        auto pmsg = queue.Pop();  +        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);  +    }  +  +    void Threads4_Push1M_Threads1_Pop4M() {           TQueueType queue; - +           class TPusherThread: public ISimpleThread { -        public: +        public:               TPusherThread(TQueueType& theQueue, char* start)                  : Queue(theQueue) -                , Arg(start) -            { -            } - +                , Arg(start)  +            {  +            }  +               TQueueType& Queue; -            char* Arg; - -            void* ThreadProc() override { -                for (int i = 0; i < 1000000; ++i) { -                    Queue.Push(Arg + i); -                } -                return nullptr; -            } -        }; - -        TPusherThread pusher1(queue, (char*)&queue); -        TPusherThread pusher2(queue, (char*)&queue + 2000000); -        TPusherThread pusher3(queue, (char*)&queue + 4000000); -        TPusherThread pusher4(queue, (char*)&queue + 6000000); - -        pusher1.Start(); -        pusher2.Start(); -        pusher3.Start(); -        pusher4.Start(); - -        for (int i = 0; i < 4000000; ++i) { -            while (queue.Pop() == nullptr) { -                SpinLockPause(); -            } -        } - -        auto pmsg = queue.Pop(); -        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); -    } - -    template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES> -    void ManyRndPush100K_ManyQueues() { +            char* Arg;  +  +            void* ThreadProc() override {  +                for (int i = 0; i < 1000000; ++i) {  +                    Queue.Push(Arg + i);  +                }  +                return nullptr;  +            }  +        };  +  +        TPusherThread pusher1(queue, (char*)&queue);  +        TPusherThread pusher2(queue, (char*)&queue + 2000000);  +        TPusherThread pusher3(queue, (char*)&queue + 4000000);  +        TPusherThread pusher4(queue, (char*)&queue + 6000000);  +  +        pusher1.Start();  +        pusher2.Start();  +        pusher3.Start();  +        pusher4.Start();  +  +        for (int i = 0; i < 4000000; ++i) {  +            while (queue.Pop() == nullptr) {  +                SpinLockPause();  +            }  +        }  +  +        auto pmsg = queue.Pop();  +        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);  +    }  +  +    template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES>  +    void ManyRndPush100K_ManyQueues() {           TQueueType queue[NUMBER_OF_QUEUES]; - +           class TPusherThread: public ISimpleThread { -        public: +        public:               TPusherThread(TQueueType* queues, char* start) -                : Queues(queues) -                , Arg(start) -            { -            } - +                : Queues(queues)  +                , Arg(start)  +            {  +            }  +               TQueueType* Queues; -            char* Arg; - -            void* ThreadProc() override { -                ui64 counters[NUMBER_OF_QUEUES]; -                for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { -                    counters[i] = 0; -                } - -                for (int i = 0; i < 100000; ++i) { -                    size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES; -                    int cookie = counters[rnd]++; -                    Queues[rnd].Push(Arg + cookie); -                } - -                for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { -                    Queues[i].Push((void*)2ULL); -                } - -                return nullptr; -            } -        }; - +            char* Arg;  +  +            void* ThreadProc() override {  +                ui64 counters[NUMBER_OF_QUEUES];  +                for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {  +                    counters[i] = 0;  +                }  +  +                for (int i = 0; i < 100000; ++i) {  +                    size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES;  +                    int cookie = counters[rnd]++;  +                    Queues[rnd].Push(Arg + cookie);  +                }  +  +                for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {  +                    Queues[i].Push((void*)2ULL);  +                }  +  +                return nullptr;  +            }  +        };  +           class TPopperThread: public ISimpleThread { -        public: +        public:               TPopperThread(TQueueType* theQueue, char* base)                  : Queue(theQueue) -                , Base(base) -            { -            } - +                , Base(base)  +            {  +            }  +               TQueueType* Queue; -            char* Base; - -            void* ThreadProc() override { -                ui64 counters[NUMBER_OF_PUSHERS]; -                for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { -                    counters[i] = 0; -                } - -                for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) { -                    auto msg = Queue->Pop(); -                    if (msg == nullptr) { -                        SpinLockPause(); -                        continue; -                    } -                    if (msg == (void*)2ULL) { -                        ++fin; -                        continue; -                    } -                    ui64 shift = (char*)msg - Base; -                    auto pusherNum = shift / 200000000ULL; -                    auto msgNum = shift % 200000000ULL; - -                    UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum); -                    ++counters[pusherNum]; -                } - -                auto pmsg = Queue->Pop(); -                UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); - -                return nullptr; -            } -        }; - +            char* Base;  +  +            void* ThreadProc() override {  +                ui64 counters[NUMBER_OF_PUSHERS];  +                for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {  +                    counters[i] = 0;  +                }  +  +                for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) {  +                    auto msg = Queue->Pop();  +                    if (msg == nullptr) {  +                        SpinLockPause();  +                        continue;  +                    }  +                    if (msg == (void*)2ULL) {  +                        ++fin;  +                        continue;  +                    }  +                    ui64 shift = (char*)msg - Base;  +                    auto pusherNum = shift / 200000000ULL;  +                    auto msgNum = shift % 200000000ULL;  +  +                    UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum);  +                    ++counters[pusherNum];  +                }  +  +                auto pmsg = Queue->Pop();  +                UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);  +  +                return nullptr;  +            }  +        };  +           TVector<TAutoPtr<TPopperThread>> poppers;          TVector<TAutoPtr<TPusherThread>> pushers; - -        for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { -            poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue)); -            poppers.back()->Start(); -        } - -        for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { -            pushers.emplace_back( -                new TPusherThread(queue, (char*)&queue + 200000000ULL * i)); -            pushers.back()->Start(); -        } - -        for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { -            poppers[i]->Join(); -        } - -        for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { -            pushers[i]->Join(); -        } -    } - -    void Threads8_RndPush100K_Threads8_Queues() { -        ManyRndPush100K_ManyQueues<8, 8>(); -    } - -    /* -    void Threads24_RndPush100K_Threads24_Queues() { -        ManyRndPush100K_ManyQueues<24, 24>(); -    } - -    void Threads24_RndPush100K_Threads8_Queues() { -        ManyRndPush100K_ManyQueues<24, 8>(); -    } - -    void Threads24_RndPush100K_Threads4_Queues() { -        ManyRndPush100K_ManyQueues<24, 4>(); -    } -    */ -}; - -REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestProcs); +  +        for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {  +            poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue));  +            poppers.back()->Start();  +        }  +  +        for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {  +            pushers.emplace_back(  +                new TPusherThread(queue, (char*)&queue + 200000000ULL * i));  +            pushers.back()->Start();  +        }  +  +        for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {  +            poppers[i]->Join();  +        }  +  +        for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {  +            pushers[i]->Join();  +        }  +    }  +  +    void Threads8_RndPush100K_Threads8_Queues() {  +        ManyRndPush100K_ManyQueues<8, 8>();  +    }  +  +    /*  +    void Threads24_RndPush100K_Threads24_Queues() {  +        ManyRndPush100K_ManyQueues<24, 24>();  +    }  +  +    void Threads24_RndPush100K_Threads8_Queues() {  +        ManyRndPush100K_ManyQueues<24, 8>();  +    }  +  +    void Threads24_RndPush100K_Threads4_Queues() {  +        ManyRndPush100K_ManyQueues<24, 4>();  +    }  +    */  +};  +  +REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestProcs);  diff --git a/library/cpp/threading/queue/tune.h b/library/cpp/threading/queue/tune.h index 50fc3dc17cd..43ad5efe3ef 100644 --- a/library/cpp/threading/queue/tune.h +++ b/library/cpp/threading/queue/tune.h @@ -1,101 +1,101 @@ -#pragma once - -/* -  Motivation: consider you have a template class with many parameters -  with default associations - -  template <typename A = TDefA, -            typename B = TDefB, -            typename C = TDefC, -            typename D = TDefD> -  class TExample { -  }; - -  consider you would like to provide easy to use interface to tune all -  these parameters in position independed manner, -  In that case TTune would be helpful for you. - -  How to use: -  First step: declare a struct with all default associations - -  struct TDefaultTune { -      using TStructA = TDefA; -      using TStructB = TDefB; -      using TStructC = TDefC; -      using TStructD = TDefD; -  }; - -  Second step: declare helper names visible to a user - -  DeclareTuneTypeParam(TTuneParamA, TStructA); -  DeclareTuneTypeParam(TTuneParamB, TStructB); -  DeclareTuneTypeParam(TTuneParamC, TStructC); -  DeclareTuneTypeParam(TTuneParamD, TStructD); - -  Third step: declare TExample this way: - -  template <typename...TParams> -  class TExample { -      using TMyParams = TTune<TDefaultTune, TParams...>; - -      using TActualA = TMyParams::TStructA; -      using TActualB = TMyParams::TStructB; -      ... -  }; - -  TTune<TDefaultTune, TParams...> is a struct with the default parameteres -  taken from TDefaultTune and overridden from "TParams...". - -  for example:  "TTune<TDefaultTune, TTuneParamC<TUserClass>>" -  will be virtually the same as: - -  struct TTunedClass { -      using TStructA = TDefA; -      using TStructB = TDefB; -      using TStructC = TUserClass; -      using TStructD = TDefD; -  }; - -  From now on you can tune your TExample in the following manner: - -  using TCustomClass = -      TExample <TTuneParamA<TUserStruct1>, TTuneParamD<TUserStruct2>>; - -  You can also tweak constant expressions in your TDefaultTune. -  Consider you have: - -  struct TDefaultTune { -      static constexpr ui32 MySize = 42; -  }; - -  declare an interface to modify the parameter this way: - -  DeclareTuneValueParam(TStructSize, ui32, MySize); - -  and tweak your class: - -  using TTwiceBigger = TExample<TStructSize<84>>; - - */ - -#define DeclareTuneTypeParam(TParamName, InternalName) \ -    template <typename TNewType>                       \ -    struct TParamName {                                \ -        template <typename TBase>                      \ -        struct TApply: public TBase {                  \ -            using InternalName = TNewType;             \ -        };                                             \ -    } - -#define DeclareTuneValueParam(TParamName, TValueType, InternalName) \ -    template <TValueType NewValue>                                  \ -    struct TParamName {                                             \ -        template <typename TBase>                                   \ -        struct TApply: public TBase {                               \ -            static constexpr TValueType InternalName = NewValue;    \ -        };                                                          \ -    } - +#pragma once  +  +/*  +  Motivation: consider you have a template class with many parameters  +  with default associations  +  +  template <typename A = TDefA,  +            typename B = TDefB,  +            typename C = TDefC,  +            typename D = TDefD>  +  class TExample {  +  };  +  +  consider you would like to provide easy to use interface to tune all  +  these parameters in position independed manner,  +  In that case TTune would be helpful for you.  +  +  How to use:  +  First step: declare a struct with all default associations  +  +  struct TDefaultTune {  +      using TStructA = TDefA;  +      using TStructB = TDefB;  +      using TStructC = TDefC;  +      using TStructD = TDefD;  +  };  +  +  Second step: declare helper names visible to a user  +  +  DeclareTuneTypeParam(TTuneParamA, TStructA);  +  DeclareTuneTypeParam(TTuneParamB, TStructB);  +  DeclareTuneTypeParam(TTuneParamC, TStructC);  +  DeclareTuneTypeParam(TTuneParamD, TStructD);  +  +  Third step: declare TExample this way:  +  +  template <typename...TParams>  +  class TExample {  +      using TMyParams = TTune<TDefaultTune, TParams...>;  +  +      using TActualA = TMyParams::TStructA;  +      using TActualB = TMyParams::TStructB;  +      ...  +  };  +  +  TTune<TDefaultTune, TParams...> is a struct with the default parameteres  +  taken from TDefaultTune and overridden from "TParams...".  +  +  for example:  "TTune<TDefaultTune, TTuneParamC<TUserClass>>"  +  will be virtually the same as:  +  +  struct TTunedClass {  +      using TStructA = TDefA;  +      using TStructB = TDefB;  +      using TStructC = TUserClass;  +      using TStructD = TDefD;  +  };  +  +  From now on you can tune your TExample in the following manner:  +  +  using TCustomClass =  +      TExample <TTuneParamA<TUserStruct1>, TTuneParamD<TUserStruct2>>;  +  +  You can also tweak constant expressions in your TDefaultTune.  +  Consider you have:  +  +  struct TDefaultTune {  +      static constexpr ui32 MySize = 42;  +  };  +  +  declare an interface to modify the parameter this way:  +  +  DeclareTuneValueParam(TStructSize, ui32, MySize);  +  +  and tweak your class:  +  +  using TTwiceBigger = TExample<TStructSize<84>>;  +  + */  +  +#define DeclareTuneTypeParam(TParamName, InternalName) \  +    template <typename TNewType>                       \  +    struct TParamName {                                \  +        template <typename TBase>                      \  +        struct TApply: public TBase {                  \  +            using InternalName = TNewType;             \  +        };                                             \  +    }  +  +#define DeclareTuneValueParam(TParamName, TValueType, InternalName) \  +    template <TValueType NewValue>                                  \  +    struct TParamName {                                             \  +        template <typename TBase>                                   \  +        struct TApply: public TBase {                               \  +            static constexpr TValueType InternalName = NewValue;    \  +        };                                                          \  +    }  +   #define DeclareTuneContainer(TParamName, InternalName)              \      template <template <typename, typename...> class TNewContainer> \      struct TParamName {                                             \ @@ -104,22 +104,22 @@              template <typename TElem, typename... TRest>            \              using InternalName = TNewContainer<TElem, TRest...>;    \          };                                                          \ -    } - -namespace NTunePrivate { -    template <typename TBase, typename... TParams> -    struct TFold; - -    template <typename TBase> -    struct TFold<TBase>: public TBase { -    }; - -    template <typename TBase, typename TFirstArg, typename... TRest> -    struct TFold<TBase, TFirstArg, TRest...> -       : public TFold<typename TFirstArg::template TApply<TBase>, TRest...> { -    }; -} - -template <typename TDefault, typename... TParams> -struct TTune: public NTunePrivate::TFold<TDefault, TParams...> { -}; +    }  +  +namespace NTunePrivate {  +    template <typename TBase, typename... TParams>  +    struct TFold;  +  +    template <typename TBase>  +    struct TFold<TBase>: public TBase {  +    };  +  +    template <typename TBase, typename TFirstArg, typename... TRest>  +    struct TFold<TBase, TFirstArg, TRest...>  +       : public TFold<typename TFirstArg::template TApply<TBase>, TRest...> {  +    };  +}  +  +template <typename TDefault, typename... TParams>  +struct TTune: public NTunePrivate::TFold<TDefault, TParams...> {  +};  diff --git a/library/cpp/threading/queue/tune_ut.cpp b/library/cpp/threading/queue/tune_ut.cpp index 7e980d3e27e..64bc8fd4279 100644 --- a/library/cpp/threading/queue/tune_ut.cpp +++ b/library/cpp/threading/queue/tune_ut.cpp @@ -1,118 +1,118 @@  #include <library/cpp/testing/unittest/registar.h> -#include "tune.h" - -struct TDefaultStructA { -}; - -struct TDefaultStructB { -}; - -struct TDefaults { -    using TStructA = TDefaultStructA; -    using TStructB = TDefaultStructB; -    static constexpr ui32 Param1 = 42; -    static constexpr ui32 Param2 = 42; -}; - -DeclareTuneTypeParam(TweakStructA, TStructA); -DeclareTuneTypeParam(TweakStructB, TStructB); -DeclareTuneValueParam(TweakParam1, ui32, Param1); -DeclareTuneValueParam(TweakParam2, ui32, Param2); - +#include "tune.h"  +  +struct TDefaultStructA {  +};  +  +struct TDefaultStructB {  +};  +  +struct TDefaults {  +    using TStructA = TDefaultStructA;  +    using TStructB = TDefaultStructB;  +    static constexpr ui32 Param1 = 42;  +    static constexpr ui32 Param2 = 42;  +};  +  +DeclareTuneTypeParam(TweakStructA, TStructA);  +DeclareTuneTypeParam(TweakStructB, TStructB);  +DeclareTuneValueParam(TweakParam1, ui32, Param1);  +DeclareTuneValueParam(TweakParam2, ui32, Param2);  +   Y_UNIT_TEST_SUITE(TestTuning) {      Y_UNIT_TEST(Defaults) { -        using TTuned = TTune<TDefaults>; -        using TunedA = TTuned::TStructA; -        using TunedB = TTuned::TStructB; -        auto sameA = std::is_same<TDefaultStructA, TunedA>::value; -        auto sameB = std::is_same<TDefaultStructB, TunedB>::value; -        auto param1 = TTuned::Param1; -        auto param2 = TTuned::Param2; - -        UNIT_ASSERT(sameA); -        UNIT_ASSERT(sameB); -        UNIT_ASSERT_EQUAL(param1, 42); -        UNIT_ASSERT_EQUAL(param2, 42); -    } - +        using TTuned = TTune<TDefaults>;  +        using TunedA = TTuned::TStructA;  +        using TunedB = TTuned::TStructB;  +        auto sameA = std::is_same<TDefaultStructA, TunedA>::value;  +        auto sameB = std::is_same<TDefaultStructB, TunedB>::value;  +        auto param1 = TTuned::Param1;  +        auto param2 = TTuned::Param2;  +  +        UNIT_ASSERT(sameA);  +        UNIT_ASSERT(sameB);  +        UNIT_ASSERT_EQUAL(param1, 42);  +        UNIT_ASSERT_EQUAL(param2, 42);  +    }  +       Y_UNIT_TEST(TuneStructA) { -        struct TMyStruct { -        }; - -        using TTuned = TTune<TDefaults, TweakStructA<TMyStruct>>; - -        using TunedA = TTuned::TStructA; -        using TunedB = TTuned::TStructB; -        //auto sameA = std::is_same<TDefaultStructA, TunedA>::value; -        auto sameB = std::is_same<TDefaultStructB, TunedB>::value; -        auto param1 = TTuned::Param1; -        auto param2 = TTuned::Param2; - -        auto sameA = std::is_same<TMyStruct, TunedA>::value; - -        UNIT_ASSERT(sameA); -        UNIT_ASSERT(sameB); -        UNIT_ASSERT_EQUAL(param1, 42); -        UNIT_ASSERT_EQUAL(param2, 42); -    } - +        struct TMyStruct {  +        };  +  +        using TTuned = TTune<TDefaults, TweakStructA<TMyStruct>>;  +  +        using TunedA = TTuned::TStructA;  +        using TunedB = TTuned::TStructB;  +        //auto sameA = std::is_same<TDefaultStructA, TunedA>::value;  +        auto sameB = std::is_same<TDefaultStructB, TunedB>::value;  +        auto param1 = TTuned::Param1;  +        auto param2 = TTuned::Param2;  +  +        auto sameA = std::is_same<TMyStruct, TunedA>::value;  +  +        UNIT_ASSERT(sameA);  +        UNIT_ASSERT(sameB);  +        UNIT_ASSERT_EQUAL(param1, 42);  +        UNIT_ASSERT_EQUAL(param2, 42);  +    }  +       Y_UNIT_TEST(TuneParam1) { -        using TTuned = TTune<TDefaults, TweakParam1<24>>; - -        using TunedA = TTuned::TStructA; -        using TunedB = TTuned::TStructB; -        auto sameA = std::is_same<TDefaultStructA, TunedA>::value; -        auto sameB = std::is_same<TDefaultStructB, TunedB>::value; -        auto param1 = TTuned::Param1; -        auto param2 = TTuned::Param2; - -        UNIT_ASSERT(sameA); -        UNIT_ASSERT(sameB); -        UNIT_ASSERT_EQUAL(param1, 24); -        UNIT_ASSERT_EQUAL(param2, 42); -    } - +        using TTuned = TTune<TDefaults, TweakParam1<24>>;  +  +        using TunedA = TTuned::TStructA;  +        using TunedB = TTuned::TStructB;  +        auto sameA = std::is_same<TDefaultStructA, TunedA>::value;  +        auto sameB = std::is_same<TDefaultStructB, TunedB>::value;  +        auto param1 = TTuned::Param1;  +        auto param2 = TTuned::Param2;  +  +        UNIT_ASSERT(sameA);  +        UNIT_ASSERT(sameB);  +        UNIT_ASSERT_EQUAL(param1, 24);  +        UNIT_ASSERT_EQUAL(param2, 42);  +    }  +       Y_UNIT_TEST(TuneStructAAndParam1) { -        struct TMyStruct { -        }; - -        using TTuned = -            TTune<TDefaults, TweakStructA<TMyStruct>, TweakParam1<24>>; - -        using TunedA = TTuned::TStructA; -        using TunedB = TTuned::TStructB; -        //auto sameA = std::is_same<TDefaultStructA, TunedA>::value; -        auto sameB = std::is_same<TDefaultStructB, TunedB>::value; -        auto param1 = TTuned::Param1; -        auto param2 = TTuned::Param2; - -        auto sameA = std::is_same<TMyStruct, TunedA>::value; - -        UNIT_ASSERT(sameA); -        UNIT_ASSERT(sameB); -        UNIT_ASSERT_EQUAL(param1, 24); -        UNIT_ASSERT_EQUAL(param2, 42); -    } - +        struct TMyStruct {  +        };  +  +        using TTuned =  +            TTune<TDefaults, TweakStructA<TMyStruct>, TweakParam1<24>>;  +  +        using TunedA = TTuned::TStructA;  +        using TunedB = TTuned::TStructB;  +        //auto sameA = std::is_same<TDefaultStructA, TunedA>::value;  +        auto sameB = std::is_same<TDefaultStructB, TunedB>::value;  +        auto param1 = TTuned::Param1;  +        auto param2 = TTuned::Param2;  +  +        auto sameA = std::is_same<TMyStruct, TunedA>::value;  +  +        UNIT_ASSERT(sameA);  +        UNIT_ASSERT(sameB);  +        UNIT_ASSERT_EQUAL(param1, 24);  +        UNIT_ASSERT_EQUAL(param2, 42);  +    }  +       Y_UNIT_TEST(TuneParam1AndStructA) { -        struct TMyStruct { -        }; - -        using TTuned = -            TTune<TDefaults, TweakParam1<24>, TweakStructA<TMyStruct>>; - -        using TunedA = TTuned::TStructA; -        using TunedB = TTuned::TStructB; -        //auto sameA = std::is_same<TDefaultStructA, TunedA>::value; -        auto sameB = std::is_same<TDefaultStructB, TunedB>::value; -        auto param1 = TTuned::Param1; -        auto param2 = TTuned::Param2; - -        auto sameA = std::is_same<TMyStruct, TunedA>::value; - -        UNIT_ASSERT(sameA); -        UNIT_ASSERT(sameB); -        UNIT_ASSERT_EQUAL(param1, 24); -        UNIT_ASSERT_EQUAL(param2, 42); -    } -} +        struct TMyStruct {  +        };  +  +        using TTuned =  +            TTune<TDefaults, TweakParam1<24>, TweakStructA<TMyStruct>>;  +  +        using TunedA = TTuned::TStructA;  +        using TunedB = TTuned::TStructB;  +        //auto sameA = std::is_same<TDefaultStructA, TunedA>::value;  +        auto sameB = std::is_same<TDefaultStructB, TunedB>::value;  +        auto param1 = TTuned::Param1;  +        auto param2 = TTuned::Param2;  +  +        auto sameA = std::is_same<TMyStruct, TunedA>::value;  +  +        UNIT_ASSERT(sameA);  +        UNIT_ASSERT(sameB);  +        UNIT_ASSERT_EQUAL(param1, 24);  +        UNIT_ASSERT_EQUAL(param2, 42);  +    }  +}  diff --git a/library/cpp/threading/queue/unordered_ut.cpp b/library/cpp/threading/queue/unordered_ut.cpp index a43b7f520e5..2018538bf77 100644 --- a/library/cpp/threading/queue/unordered_ut.cpp +++ b/library/cpp/threading/queue/unordered_ut.cpp @@ -1,154 +1,154 @@  #include <library/cpp/testing/unittest/registar.h> -#include <util/system/thread.h> -#include <algorithm> -#include <util/generic/vector.h> -#include <util/random/fast.h> - -#include "ut_helpers.h" - +#include <util/system/thread.h>  +#include <algorithm>  +#include <util/generic/vector.h>  +#include <util/random/fast.h>  +  +#include "ut_helpers.h"  +   template <typename TQueueType> -class TTestUnorderedQueue: public TTestBase { -private: -    using TLink = TIntrusiveLink; - +class TTestUnorderedQueue: public TTestBase {  +private:  +    using TLink = TIntrusiveLink;  +       UNIT_TEST_SUITE_DEMANGLE(TTestUnorderedQueue<TQueueType>); -    UNIT_TEST(Push1M_Pop1M_Unordered) -    UNIT_TEST_SUITE_END(); - -public: -    void Push1M_Pop1M_Unordered() { -        constexpr int REPEAT = 1000000; +    UNIT_TEST(Push1M_Pop1M_Unordered)  +    UNIT_TEST_SUITE_END();  +  +public:  +    void Push1M_Pop1M_Unordered() {  +        constexpr int REPEAT = 1000000;           TQueueType queue; -        TLink msg[REPEAT]; - -        auto pmsg = queue.Pop(); -        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); - -        for (int i = 0; i < REPEAT; ++i) { -            queue.Push(&msg[i]); -        } - +        TLink msg[REPEAT];  +  +        auto pmsg = queue.Pop();  +        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);  +  +        for (int i = 0; i < REPEAT; ++i) {  +            queue.Push(&msg[i]);  +        }  +           TVector<TLink*> popped; -        popped.reserve(REPEAT); -        for (int i = 0; i < REPEAT; ++i) { -            popped.push_back((TLink*)queue.Pop()); -        } - -        pmsg = queue.Pop(); -        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); - -        std::sort(popped.begin(), popped.end()); -        for (int i = 0; i < REPEAT; ++i) { -            UNIT_ASSERT_VALUES_EQUAL(&msg[i], popped[i]); -        } -    } -}; - +        popped.reserve(REPEAT);  +        for (int i = 0; i < REPEAT; ++i) {  +            popped.push_back((TLink*)queue.Pop());  +        }  +  +        pmsg = queue.Pop();  +        UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);  +  +        std::sort(popped.begin(), popped.end());  +        for (int i = 0; i < REPEAT; ++i) {  +            UNIT_ASSERT_VALUES_EQUAL(&msg[i], popped[i]);  +        }  +    }  +};  +   template <typename TQueueType> -class TTestWeakQueue: public TTestBase { -private: +class TTestWeakQueue: public TTestBase {  +private:       UNIT_TEST_SUITE_DEMANGLE(TTestWeakQueue<TQueueType>); -    UNIT_TEST(Threads8_Rnd_Exchange) -    UNIT_TEST_SUITE_END(); - -public: -    template <ui16 COUNT = 48, ui32 MSG_COUNT = 10000> -    void ManyThreadsRndExchange() { +    UNIT_TEST(Threads8_Rnd_Exchange)  +    UNIT_TEST_SUITE_END();  +  +public:  +    template <ui16 COUNT = 48, ui32 MSG_COUNT = 10000>  +    void ManyThreadsRndExchange() {           TQueueType queues[COUNT]; - +           class TWorker: public ISimpleThread { -        public: -            TWorker( +        public:  +            TWorker(                   TQueueType* queues_,                  ui16 mine,                  TAtomic* pushDone) -                : Queues(queues_) -                , MineQueue(mine) -                , PushDone(pushDone) -            { -            } - +                : Queues(queues_)  +                , MineQueue(mine)  +                , PushDone(pushDone)  +            {  +            }  +               TQueueType* Queues; -            ui16 MineQueue; +            ui16 MineQueue;               TVector<uintptr_t> Received; -            TAtomic* PushDone; - -            void* ThreadProc() override { -                TReallyFastRng32 rng(GetCycleCount()); -                Received.reserve(MSG_COUNT * 2); - -                for (ui32 loop = 1; loop <= MSG_COUNT; ++loop) { -                    for (;;) { -                        auto msg = Queues[MineQueue].Pop(); -                        if (msg == nullptr) { -                            break; -                        } - -                        Received.push_back((uintptr_t)msg); -                    } - -                    ui16 rnd = rng.GenRand64() % COUNT; -                    ui64 msg = ((ui64)MineQueue << 32) + loop; -                    while (!Queues[rnd].Push((void*)msg)) { -                    } -                } - -                AtomicIncrement(*PushDone); - -                for (;;) { -                    bool isItLast = AtomicGet(*PushDone) == COUNT; -                    auto msg = Queues[MineQueue].Pop(); -                    if (msg != nullptr) { -                        Received.push_back((uintptr_t)msg); -                    } else { -                        if (isItLast) { -                            break; -                        } -                        SpinLockPause(); -                    } -                } - -                for (ui64 last = 0;;) { -                    auto msg = Queues[MineQueue].UnsafeScanningPop(&last); -                    if (msg == nullptr) { -                        break; -                    } -                    Received.push_back((uintptr_t)msg); -                } - -                return nullptr; -            } -        }; - +            TAtomic* PushDone;  +  +            void* ThreadProc() override {  +                TReallyFastRng32 rng(GetCycleCount());  +                Received.reserve(MSG_COUNT * 2);  +  +                for (ui32 loop = 1; loop <= MSG_COUNT; ++loop) {  +                    for (;;) {  +                        auto msg = Queues[MineQueue].Pop();  +                        if (msg == nullptr) {  +                            break;  +                        }  +  +                        Received.push_back((uintptr_t)msg);  +                    }  +  +                    ui16 rnd = rng.GenRand64() % COUNT;  +                    ui64 msg = ((ui64)MineQueue << 32) + loop;  +                    while (!Queues[rnd].Push((void*)msg)) {  +                    }  +                }  +  +                AtomicIncrement(*PushDone);  +  +                for (;;) {  +                    bool isItLast = AtomicGet(*PushDone) == COUNT;  +                    auto msg = Queues[MineQueue].Pop();  +                    if (msg != nullptr) {  +                        Received.push_back((uintptr_t)msg);  +                    } else {  +                        if (isItLast) {  +                            break;  +                        }  +                        SpinLockPause();  +                    }  +                }  +  +                for (ui64 last = 0;;) {  +                    auto msg = Queues[MineQueue].UnsafeScanningPop(&last);  +                    if (msg == nullptr) {  +                        break;  +                    }  +                    Received.push_back((uintptr_t)msg);  +                }  +  +                return nullptr;  +            }  +        };  +           TVector<TAutoPtr<TWorker>> workers; -        TAtomic pushDone = 0; - -        for (ui32 i = 0; i < COUNT; ++i) { -            workers.emplace_back(new TWorker(&queues[0], i, &pushDone)); -            workers.back()->Start(); -        } - +        TAtomic pushDone = 0;  +  +        for (ui32 i = 0; i < COUNT; ++i) {  +            workers.emplace_back(new TWorker(&queues[0], i, &pushDone));  +            workers.back()->Start();  +        }  +           TVector<uintptr_t> all; -        for (ui32 i = 0; i < COUNT; ++i) { -            workers[i]->Join(); -            all.insert(all.begin(), +        for (ui32 i = 0; i < COUNT; ++i) {  +            workers[i]->Join();  +            all.insert(all.begin(),                          workers[i]->Received.begin(), workers[i]->Received.end()); -        } - -        std::sort(all.begin(), all.end()); -        auto iter = all.begin(); -        for (ui32 i = 0; i < COUNT; ++i) { -            for (ui32 k = 1; k <= MSG_COUNT; ++k) { -                UNIT_ASSERT_VALUES_EQUAL(((ui64)i << 32) + k, *iter); -                ++iter; -            } -        } -    } - -    void Threads8_Rnd_Exchange() { -        ManyThreadsRndExchange<8>(); -    } -}; - -REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TTestUnorderedQueue); -UNIT_TEST_SUITE_REGISTRATION(TTestWeakQueue<TMPMCUnorderedRing>); +        }  +  +        std::sort(all.begin(), all.end());  +        auto iter = all.begin();  +        for (ui32 i = 0; i < COUNT; ++i) {  +            for (ui32 k = 1; k <= MSG_COUNT; ++k) {  +                UNIT_ASSERT_VALUES_EQUAL(((ui64)i << 32) + k, *iter);  +                ++iter;  +            }  +        }  +    }  +  +    void Threads8_Rnd_Exchange() {  +        ManyThreadsRndExchange<8>();  +    }  +};  +  +REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TTestUnorderedQueue);  +UNIT_TEST_SUITE_REGISTRATION(TTestWeakQueue<TMPMCUnorderedRing>);  diff --git a/library/cpp/threading/queue/ut/ya.make b/library/cpp/threading/queue/ut/ya.make index 8883d9bf693..dda204155eb 100644 --- a/library/cpp/threading/queue/ut/ya.make +++ b/library/cpp/threading/queue/ut/ya.make @@ -1,16 +1,16 @@  UNITTEST_FOR(library/cpp/threading/queue) - +   OWNER(agri) - -ALLOCATOR(B) - -SRCS( -    basic_ut.cpp -    queue_ut.cpp -    tune_ut.cpp -    unordered_ut.cpp -    ut_helpers.cpp -    ut_helpers.h -) - -END() +  +ALLOCATOR(B)  +  +SRCS(  +    basic_ut.cpp  +    queue_ut.cpp  +    tune_ut.cpp  +    unordered_ut.cpp  +    ut_helpers.cpp  +    ut_helpers.h  +)  +  +END()  diff --git a/library/cpp/threading/queue/ut_helpers.cpp b/library/cpp/threading/queue/ut_helpers.cpp index aa3a8314411..342aa125a0b 100644 --- a/library/cpp/threading/queue/ut_helpers.cpp +++ b/library/cpp/threading/queue/ut_helpers.cpp @@ -1 +1 @@ -#include "ut_helpers.h" +#include "ut_helpers.h"  diff --git a/library/cpp/threading/queue/ut_helpers.h b/library/cpp/threading/queue/ut_helpers.h index 2756b52601e..c7203665930 100644 --- a/library/cpp/threading/queue/ut_helpers.h +++ b/library/cpp/threading/queue/ut_helpers.h @@ -1,40 +1,40 @@ -#pragma once - -#include "mpsc_read_as_filled.h" -#include "mpsc_htswap.h" -#include "mpsc_vinfarr_obstructive.h" -#include "mpsc_intrusive_unordered.h" -#include "mpmc_unordered_ring.h" - -struct TBasicHTSwap: public NThreading::THTSwapQueue<> { -}; - -struct TBasicReadAsFilled: public NThreading::TReadAsFilledQueue<> { -}; - -struct TBasicObstructiveConsumer +#pragma once  +  +#include "mpsc_read_as_filled.h"  +#include "mpsc_htswap.h"  +#include "mpsc_vinfarr_obstructive.h"  +#include "mpsc_intrusive_unordered.h"  +#include "mpmc_unordered_ring.h"  +  +struct TBasicHTSwap: public NThreading::THTSwapQueue<> {  +};  +  +struct TBasicReadAsFilled: public NThreading::TReadAsFilledQueue<> {  +};  +  +struct TBasicObstructiveConsumer      : public NThreading::TObstructiveConsumerQueue<> { -}; - -struct TBasicMPSCIntrusiveUnordered +};  +  +struct TBasicMPSCIntrusiveUnordered      : public NThreading::TMPSCIntrusiveUnordered { -}; - -struct TIntrusiveLink: public NThreading::TIntrusiveNode { -}; - -struct TMPMCUnorderedRing: public NThreading::TMPMCUnorderedRing { -    TMPMCUnorderedRing() -        : NThreading::TMPMCUnorderedRing(10000000) -    { -    } -}; - +};  +  +struct TIntrusiveLink: public NThreading::TIntrusiveNode {  +};  +  +struct TMPMCUnorderedRing: public NThreading::TMPMCUnorderedRing {  +    TMPMCUnorderedRing()  +        : NThreading::TMPMCUnorderedRing(10000000)  +    {  +    }  +};  +   #define REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TestTemplate)         \      UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicHTSwap>);       \      UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicReadAsFilled>); \ -    UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicObstructiveConsumer>) - +    UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicObstructiveConsumer>)  +   #define REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TestTemplate)                 \ -    UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicMPSCIntrusiveUnordered>); \ -    UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TMPMCUnorderedRing>); +    UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicMPSCIntrusiveUnordered>); \  +    UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TMPMCUnorderedRing>);  diff --git a/library/cpp/threading/queue/ya.make b/library/cpp/threading/queue/ya.make index 6570b38ce59..3a11eb2d925 100644 --- a/library/cpp/threading/queue/ya.make +++ b/library/cpp/threading/queue/ya.make @@ -1,18 +1,18 @@ -LIBRARY() - -OWNER(agri) - -SRCS( -    mpmc_unordered_ring.cpp -    mpmc_unordered_ring.h -    mpsc_htswap.cpp -    mpsc_htswap.h -    mpsc_intrusive_unordered.cpp -    mpsc_intrusive_unordered.h -    mpsc_read_as_filled.cpp -    mpsc_read_as_filled.h -    mpsc_vinfarr_obstructive.cpp -    mpsc_vinfarr_obstructive.h -) - -END() +LIBRARY()  +  +OWNER(agri)  +  +SRCS(  +    mpmc_unordered_ring.cpp  +    mpmc_unordered_ring.h  +    mpsc_htswap.cpp  +    mpsc_htswap.h  +    mpsc_intrusive_unordered.cpp  +    mpsc_intrusive_unordered.h  +    mpsc_read_as_filled.cpp  +    mpsc_read_as_filled.h  +    mpsc_vinfarr_obstructive.cpp  +    mpsc_vinfarr_obstructive.h  +)  +  +END()  | 
