diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /util/thread | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'util/thread')
-rw-r--r-- | util/thread/factory.cpp | 128 | ||||
-rw-r--r-- | util/thread/factory.h | 92 | ||||
-rw-r--r-- | util/thread/factory_ut.cpp | 96 | ||||
-rw-r--r-- | util/thread/lfqueue.cpp | 2 | ||||
-rw-r--r-- | util/thread/lfqueue.h | 98 | ||||
-rw-r--r-- | util/thread/lfqueue_ut.cpp | 18 | ||||
-rw-r--r-- | util/thread/lfstack.cpp | 2 | ||||
-rw-r--r-- | util/thread/lfstack.h | 66 | ||||
-rw-r--r-- | util/thread/lfstack_ut.cpp | 28 | ||||
-rw-r--r-- | util/thread/pool.cpp | 990 | ||||
-rw-r--r-- | util/thread/pool.h | 260 | ||||
-rw-r--r-- | util/thread/pool_ut.cpp | 88 | ||||
-rw-r--r-- | util/thread/singleton.cpp | 2 | ||||
-rw-r--r-- | util/thread/singleton.h | 48 | ||||
-rw-r--r-- | util/thread/singleton_ut.cpp | 4 | ||||
-rw-r--r-- | util/thread/ut/ya.make | 8 |
16 files changed, 965 insertions, 965 deletions
diff --git a/util/thread/factory.cpp b/util/thread/factory.cpp index 48e898f32d..0de161f007 100644 --- a/util/thread/factory.cpp +++ b/util/thread/factory.cpp @@ -1,71 +1,71 @@ #include "factory.h" - -#include <util/system/thread.h> -#include <util/generic/singleton.h> - + +#include <util/system/thread.h> +#include <util/generic/singleton.h> + using IThread = IThreadFactory::IThread; - -namespace { + +namespace { class TSystemThreadFactory: public IThreadFactory { - public: - class TPoolThread: public IThread { - public: + public: + class TPoolThread: public IThread { + public: ~TPoolThread() override { - if (Thr_) { - Thr_->Detach(); - } - } - + if (Thr_) { + Thr_->Detach(); + } + } + void DoRun(IThreadAble* func) override { - Thr_.Reset(new TThread(ThreadProc, func)); - - Thr_->Start(); - } - + Thr_.Reset(new TThread(ThreadProc, func)); + + Thr_->Start(); + } + void DoJoin() noexcept override { - if (!Thr_) { - return; - } - - Thr_->Join(); - Thr_.Destroy(); - } - - private: - static void* ThreadProc(void* func) { - ((IThreadAble*)(func))->Execute(); - + if (!Thr_) { + return; + } + + Thr_->Join(); + Thr_.Destroy(); + } + + private: + static void* ThreadProc(void* func) { + ((IThreadAble*)(func))->Execute(); + return nullptr; - } - - private: - THolder<TThread> Thr_; - }; - + } + + private: + THolder<TThread> Thr_; + }; + inline TSystemThreadFactory() noexcept { - } - + } + IThread* DoCreate() override { - return new TPoolThread; - } - }; + return new TPoolThread; + } + }; class TThreadFactoryFuncObj: public IThreadFactory::IThreadAble { - public: + public: TThreadFactoryFuncObj(const std::function<void()>& func) - : Func(func) - { - } + : Func(func) + { + } void DoExecute() override { THolder<TThreadFactoryFuncObj> self(this); - Func(); - } - - private: + Func(); + } + + private: std::function<void()> Func; }; -} - +} + THolder<IThread> IThreadFactory::Run(std::function<void()> func) { THolder<IThread> ret(DoCreate()); @@ -76,18 +76,18 @@ THolder<IThread> IThreadFactory::Run(std::function<void()> func) { static IThreadFactory* SystemThreadPoolImpl() { return Singleton<TSystemThreadFactory>(); -} - +} + static IThreadFactory* systemPool = nullptr; - + IThreadFactory* SystemThreadFactory() { - if (systemPool) { - return systemPool; - } - - return SystemThreadPoolImpl(); -} - + if (systemPool) { + return systemPool; + } + + return SystemThreadPoolImpl(); +} + void SetSystemThreadFactory(IThreadFactory* pool) { - systemPool = pool; -} + systemPool = pool; +} diff --git a/util/thread/factory.h b/util/thread/factory.h index 561fcbac88..9393f0f309 100644 --- a/util/thread/factory.h +++ b/util/thread/factory.h @@ -1,65 +1,65 @@ #pragma once - + #include <util/generic/ptr.h> #include <functional> - + class IThreadFactory { -public: - class IThreadAble { - public: +public: + class IThreadAble { + public: inline IThreadAble() noexcept = default; - + virtual ~IThreadAble() = default; - - inline void Execute() { - DoExecute(); - } - - private: - virtual void DoExecute() = 0; - }; - - class IThread { + + inline void Execute() { + DoExecute(); + } + + private: + virtual void DoExecute() = 0; + }; + + class IThread { friend class IThreadFactory; - - public: + + public: inline IThread() noexcept = default; - + virtual ~IThread() = default; - + inline void Join() noexcept { - DoJoin(); - } - - private: - inline void Run(IThreadAble* func) { - DoRun(func); - } - - private: - // it's actually DoStart - virtual void DoRun(IThreadAble* func) = 0; + DoJoin(); + } + + private: + inline void Run(IThreadAble* func) { + DoRun(func); + } + + private: + // it's actually DoStart + virtual void DoRun(IThreadAble* func) = 0; virtual void DoJoin() noexcept = 0; - }; - + }; + inline IThreadFactory() noexcept = default; - + virtual ~IThreadFactory() = default; - - // XXX: rename to Start + + // XXX: rename to Start inline THolder<IThread> Run(IThreadAble* func) { THolder<IThread> ret(DoCreate()); - ret->Run(func); - - return ret; - } - + ret->Run(func); + + return ret; + } + THolder<IThread> Run(std::function<void()> func); - -private: - virtual IThread* DoCreate() = 0; -}; - + +private: + virtual IThread* DoCreate() = 0; +}; + IThreadFactory* SystemThreadFactory(); void SetSystemThreadFactory(IThreadFactory* pool); diff --git a/util/thread/factory_ut.cpp b/util/thread/factory_ut.cpp index 647d96c901..f506ce7e06 100644 --- a/util/thread/factory_ut.cpp +++ b/util/thread/factory_ut.cpp @@ -1,57 +1,57 @@ #include "factory.h" #include "pool.h" - + #include <library/cpp/testing/unittest/registar.h> - -class TThrPoolTest: public TTestBase { - UNIT_TEST_SUITE(TThrPoolTest); - UNIT_TEST(TestSystemPool) - UNIT_TEST(TestAdaptivePool) - UNIT_TEST_SUITE_END(); - + +class TThrPoolTest: public TTestBase { + UNIT_TEST_SUITE(TThrPoolTest); + UNIT_TEST(TestSystemPool) + UNIT_TEST(TestAdaptivePool) + UNIT_TEST_SUITE_END(); + struct TRunAble: public IThreadFactory::IThreadAble { - inline TRunAble() - : done(false) - { - } - + inline TRunAble() + : done(false) + { + } + ~TRunAble() override = default; - + void DoExecute() override { - done = true; - } - - bool done; - }; - -private: - inline void TestSystemPool() { - TRunAble r; - - { + done = true; + } + + bool done; + }; + +private: + inline void TestSystemPool() { + TRunAble r; + + { THolder<IThreadFactory::IThread> thr = SystemThreadFactory()->Run(&r); - - thr->Join(); - } - - UNIT_ASSERT_EQUAL(r.done, true); - } - - inline void TestAdaptivePool() { - TRunAble r; - - { + + thr->Join(); + } + + UNIT_ASSERT_EQUAL(r.done, true); + } + + inline void TestAdaptivePool() { + TRunAble r; + + { TAdaptiveThreadPool pool; - - pool.Start(0); - + + pool.Start(0); + THolder<IThreadFactory::IThread> thr = pool.Run(&r); - - thr->Join(); - } - - UNIT_ASSERT_EQUAL(r.done, true); - } -}; - -UNIT_TEST_SUITE_REGISTRATION(TThrPoolTest); + + thr->Join(); + } + + UNIT_ASSERT_EQUAL(r.done, true); + } +}; + +UNIT_TEST_SUITE_REGISTRATION(TThrPoolTest); diff --git a/util/thread/lfqueue.cpp b/util/thread/lfqueue.cpp index 5861999b78..102e25c2a3 100644 --- a/util/thread/lfqueue.cpp +++ b/util/thread/lfqueue.cpp @@ -1 +1 @@ -#include "lfqueue.h" +#include "lfqueue.h" diff --git a/util/thread/lfqueue.h b/util/thread/lfqueue.h index ab523631e4..1d2a776bbb 100644 --- a/util/thread/lfqueue.h +++ b/util/thread/lfqueue.h @@ -5,15 +5,15 @@ #include <util/generic/ptr.h> #include <util/system/atomic.h> #include <util/system/yassert.h> -#include "lfstack.h" +#include "lfstack.h" -struct TDefaultLFCounter { - template <class T> - void IncCount(const T& data) { +struct TDefaultLFCounter { + template <class T> + void IncCount(const T& data) { (void)data; } - template <class T> - void DecCount(const T& data) { + template <class T> + void DecCount(const T& data) { (void)data; } }; @@ -25,7 +25,7 @@ struct TDefaultLFCounter { // it is TCounter class responsibility to check validity of passed object template <class T, class TCounter> class TLockFreeQueue: public TNonCopyable { - struct TListNode { + struct TListNode { template <typename U> TListNode(U&& u, TListNode* next) : Next(next) @@ -39,30 +39,30 @@ class TLockFreeQueue: public TNonCopyable { { } - TListNode* volatile Next; + TListNode* volatile Next; T Data; }; // using inheritance to be able to use 0 bytes for TCounter when we don't need one - struct TRootNode: public TCounter { - TListNode* volatile PushQueue; - TListNode* volatile PopQueue; - TListNode* volatile ToDelete; - TRootNode* volatile NextFree; + struct TRootNode: public TCounter { + TListNode* volatile PushQueue; + TListNode* volatile PopQueue; + TListNode* volatile ToDelete; + TRootNode* volatile NextFree; - TRootNode() + TRootNode() : PushQueue(nullptr) , PopQueue(nullptr) , ToDelete(nullptr) , NextFree(nullptr) { } - void CopyCounter(TRootNode* x) { + void CopyCounter(TRootNode* x) { *(TCounter*)this = *(TCounter*)x; } }; - static void EraseList(TListNode* n) { + static void EraseList(TListNode* n) { while (n) { TListNode* keepNext = AtomicGet(n->Next); delete n; @@ -75,12 +75,12 @@ class TLockFreeQueue: public TNonCopyable { alignas(64) volatile TAtomic FreeingTaskCounter; alignas(64) TRootNode* volatile FreePtr; - void TryToFreeAsyncMemory() { + void TryToFreeAsyncMemory() { TAtomic keepCounter = AtomicAdd(FreeingTaskCounter, 0); TRootNode* current = AtomicGet(FreePtr); if (current == nullptr) return; - if (AtomicAdd(FreememCounter, 0) == 1) { + if (AtomicAdd(FreememCounter, 0) == 1) { // we are the last thread, try to cleanup // check if another thread have cleaned up if (keepCounter != AtomicAdd(FreeingTaskCounter, 0)) { @@ -98,24 +98,24 @@ class TLockFreeQueue: public TNonCopyable { } } } - void AsyncRef() { - AtomicAdd(FreememCounter, 1); + void AsyncRef() { + AtomicAdd(FreememCounter, 1); } - void AsyncUnref() { + void AsyncUnref() { TryToFreeAsyncMemory(); - AtomicAdd(FreememCounter, -1); + AtomicAdd(FreememCounter, -1); } - void AsyncDel(TRootNode* toDelete, TListNode* lst) { + void AsyncDel(TRootNode* toDelete, TListNode* lst) { AtomicSet(toDelete->ToDelete, lst); - for (;;) { + for (;;) { AtomicSet(toDelete->NextFree, AtomicGet(FreePtr)); if (AtomicCas(&FreePtr, toDelete, AtomicGet(toDelete->NextFree))) break; } } - void AsyncUnref(TRootNode* toDelete, TListNode* lst) { + void AsyncUnref(TRootNode* toDelete, TListNode* lst) { TryToFreeAsyncMemory(); - if (AtomicAdd(FreememCounter, -1) == 0) { + if (AtomicAdd(FreememCounter, -1) == 0) { // no other operations in progress, can safely reclaim memory EraseList(lst); delete toDelete; @@ -125,27 +125,27 @@ class TLockFreeQueue: public TNonCopyable { } } - struct TListInvertor { - TListNode* Copy; - TListNode* Tail; - TListNode* PrevFirst; + struct TListInvertor { + TListNode* Copy; + TListNode* Tail; + TListNode* PrevFirst; - TListInvertor() + TListInvertor() : Copy(nullptr) , Tail(nullptr) , PrevFirst(nullptr) { } - ~TListInvertor() { + ~TListInvertor() { EraseList(Copy); } - void CopyWasUsed() { + void CopyWasUsed() { Copy = nullptr; Tail = nullptr; PrevFirst = nullptr; } - void DoCopy(TListNode* ptr) { - TListNode* newFirst = ptr; + void DoCopy(TListNode* ptr) { + TListNode* newFirst = ptr; TListNode* newCopy = nullptr; TListNode* newTail = nullptr; while (ptr) { @@ -171,8 +171,8 @@ class TLockFreeQueue: public TNonCopyable { } }; - void EnqueueImpl(TListNode* head, TListNode* tail) { - TRootNode* newRoot = new TRootNode; + void EnqueueImpl(TListNode* head, TListNode* tail) { + TRootNode* newRoot = new TRootNode; AsyncRef(); AtomicSet(newRoot->PushQueue, head); for (;;) { @@ -225,14 +225,14 @@ class TLockFreeQueue: public TNonCopyable { } public: - TLockFreeQueue() + TLockFreeQueue() : JobQueue(new TRootNode) , FreememCounter(0) , FreeingTaskCounter(0) , FreePtr(nullptr) { } - ~TLockFreeQueue() { + ~TLockFreeQueue() { AsyncRef(); AsyncUnref(); // should free FreeList EraseList(JobQueue->PushQueue); @@ -253,17 +253,17 @@ public: EnqueueImpl(newNode, newNode); } template <typename TCollection> - void EnqueueAll(const TCollection& data) { + void EnqueueAll(const TCollection& data) { EnqueueAll(data.begin(), data.end()); } template <typename TIter> - void EnqueueAll(TIter dataBegin, TIter dataEnd) { + void EnqueueAll(TIter dataBegin, TIter dataEnd) { if (dataBegin == dataEnd) return; TIter i = dataBegin; TListNode* volatile node = new TListNode(*i); - TListNode* volatile tail = node; + TListNode* volatile tail = node; for (++i; i != dataEnd; ++i) { TListNode* nextNode = node; @@ -271,7 +271,7 @@ public: } EnqueueImpl(node, tail); } - bool Dequeue(T* data) { + bool Dequeue(T* data) { TRootNode* newRoot = nullptr; TListInvertor listInvertor; AsyncRef(); @@ -288,7 +288,7 @@ public: newRoot->CopyCounter(curRoot); newRoot->DecCount(tail->Data); Y_ASSERT(AtomicGet(curRoot->PopQueue) == tail); - if (AtomicCas(&JobQueue, newRoot, curRoot)) { + if (AtomicCas(&JobQueue, newRoot, curRoot)) { *data = std::move(tail->Data); AtomicSet(tail->Next, nullptr); AsyncUnref(curRoot, tail); @@ -309,7 +309,7 @@ public: AtomicSet(newRoot->PopQueue, listInvertor.Copy); newRoot->CopyCounter(curRoot); Y_ASSERT(AtomicGet(curRoot->PopQueue) == nullptr); - if (AtomicCas(&JobQueue, newRoot, curRoot)) { + if (AtomicCas(&JobQueue, newRoot, curRoot)) { newRoot = nullptr; listInvertor.CopyWasUsed(); AsyncDel(curRoot, AtomicGet(curRoot->PushQueue)); @@ -343,14 +343,14 @@ public: AsyncUnref(curRoot, toDeleteHead); } - bool IsEmpty() { + bool IsEmpty() { AsyncRef(); TRootNode* curRoot = AtomicGet(JobQueue); bool res = AtomicGet(curRoot->PushQueue) == nullptr && AtomicGet(curRoot->PopQueue) == nullptr; AsyncUnref(); return res; } - TCounter GetCounter() { + TCounter GetCounter() { AsyncRef(); TRootNode* curRoot = AtomicGet(JobQueue); TCounter res = *(TCounter*)curRoot; @@ -367,8 +367,8 @@ public: inline ~TAutoLockFreeQueue() { TRef tmp; - while (Dequeue(&tmp)) { - } + while (Dequeue(&tmp)) { + } } inline bool Dequeue(TRef* t) { diff --git a/util/thread/lfqueue_ut.cpp b/util/thread/lfqueue_ut.cpp index 83bca100cf..1290ce0a46 100644 --- a/util/thread/lfqueue_ut.cpp +++ b/util/thread/lfqueue_ut.cpp @@ -7,7 +7,7 @@ #include <util/system/atomic.h> #include <util/thread/pool.h> -#include "lfqueue.h" +#include "lfqueue.h" class TMoveTest { public: @@ -54,24 +54,24 @@ private: class TOperationsChecker { public: TOperationsChecker() { - ++DefaultCtor_; + ++DefaultCtor_; } TOperationsChecker(TOperationsChecker&&) { - ++MoveCtor_; + ++MoveCtor_; } TOperationsChecker(const TOperationsChecker&) { - ++CopyCtor_; + ++CopyCtor_; } TOperationsChecker& operator=(TOperationsChecker&&) { - ++MoveAssign_; + ++MoveAssign_; return *this; } TOperationsChecker& operator=(const TOperationsChecker&) { - ++CopyAssign_; + ++CopyAssign_; return *this; } @@ -190,7 +190,7 @@ Y_UNIT_TEST_SUITE(TLockFreeQueueTests) { UNIT_ASSERT(!queue.Dequeue(&i)); } - void DequeueAllRunner(TLockFreeQueue<int>& queue, bool singleConsumer) { + void DequeueAllRunner(TLockFreeQueue<int>& queue, bool singleConsumer) { size_t threadsNum = 4; size_t enqueuesPerThread = 10'000; TThreadPool p; @@ -303,11 +303,11 @@ Y_UNIT_TEST_SUITE(TLockFreeQueueTests) { } Y_UNIT_TEST(CleanInDestructor) { - TSimpleSharedPtr<bool> p(new bool); + TSimpleSharedPtr<bool> p(new bool); UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount()); { - TLockFreeQueue<TSimpleSharedPtr<bool>> stack; + TLockFreeQueue<TSimpleSharedPtr<bool>> stack; stack.Enqueue(p); stack.Enqueue(p); diff --git a/util/thread/lfstack.cpp b/util/thread/lfstack.cpp index be8b3bdf37..6408639189 100644 --- a/util/thread/lfstack.cpp +++ b/util/thread/lfstack.cpp @@ -1 +1 @@ -#include "lfstack.h" +#include "lfstack.h" diff --git a/util/thread/lfstack.h b/util/thread/lfstack.h index ca3d95f3c3..aa64ce1dbe 100644 --- a/util/thread/lfstack.h +++ b/util/thread/lfstack.h @@ -5,31 +5,31 @@ ////////////////////////////// // lock free lifo stack -template <class T> -class TLockFreeStack: TNonCopyable { - struct TNode { +template <class T> +class TLockFreeStack: TNonCopyable { + struct TNode { T Value; - TNode* Next; + TNode* Next; TNode() = default; - template <class U> + template <class U> explicit TNode(U&& val) : Value(std::forward<U>(val)) , Next(nullptr) - { - } + { + } }; TNode* Head; TNode* FreePtr; TAtomic DequeueCount; - void TryToFreeMemory() { + void TryToFreeMemory() { TNode* current = AtomicGet(FreePtr); if (!current) return; - if (AtomicAdd(DequeueCount, 0) == 1) { + if (AtomicAdd(DequeueCount, 0) == 1) { // node current is in free list, we are the last thread so try to cleanup if (AtomicCas(&FreePtr, (TNode*)nullptr, current)) EraseList(current); @@ -37,7 +37,7 @@ class TLockFreeStack: TNonCopyable { } void EraseList(TNode* volatile p) { while (p) { - TNode* next = p->Next; + TNode* next = p->Next; delete p; p = next; } @@ -54,20 +54,20 @@ class TLockFreeStack: TNonCopyable { TNode* volatile node = new TNode(std::forward<U>(u)); EnqueueImpl(node, node); } - + public: - TLockFreeStack() + TLockFreeStack() : Head(nullptr) , FreePtr(nullptr) - , DequeueCount(0) - { - } - ~TLockFreeStack() { + , DequeueCount(0) + { + } + ~TLockFreeStack() { EraseList(Head); EraseList(FreePtr); } - void Enqueue(const T& t) { + void Enqueue(const T& t) { EnqueueImpl(t); } @@ -76,11 +76,11 @@ public: } template <typename TCollection> - void EnqueueAll(const TCollection& data) { + void EnqueueAll(const TCollection& data) { EnqueueAll(data.begin(), data.end()); } template <typename TIter> - void EnqueueAll(TIter dataBegin, TIter dataEnd) { + void EnqueueAll(TIter dataBegin, TIter dataEnd) { if (dataBegin == dataEnd) { return; } @@ -95,22 +95,22 @@ public: } EnqueueImpl(node, tail); } - bool Dequeue(T* res) { - AtomicAdd(DequeueCount, 1); + bool Dequeue(T* res) { + AtomicAdd(DequeueCount, 1); for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { if (AtomicCas(&Head, AtomicGet(current->Next), current)) { *res = std::move(current->Value); // delete current; // ABA problem // even more complex node deletion TryToFreeMemory(); - if (AtomicAdd(DequeueCount, -1) == 0) { + if (AtomicAdd(DequeueCount, -1) == 0) { // no other Dequeue()s, can safely reclaim memory delete current; } else { // Dequeue()s in progress, put node to free list - for (;;) { + for (;;) { AtomicSet(current->Next, AtomicGet(FreePtr)); - if (AtomicCas(&FreePtr, current, current->Next)) + if (AtomicCas(&FreePtr, current, current->Next)) break; } } @@ -118,17 +118,17 @@ public: } } TryToFreeMemory(); - AtomicAdd(DequeueCount, -1); + AtomicAdd(DequeueCount, -1); return false; } // add all elements to *res // elements are returned in order of dequeue (top to bottom; see example in unittest) template <typename TCollection> - void DequeueAll(TCollection* res) { + void DequeueAll(TCollection* res) { AtomicAdd(DequeueCount, 1); for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { if (AtomicCas(&Head, (TNode*)nullptr, current)) { - for (TNode* x = current; x;) { + for (TNode* x = current; x;) { res->push_back(std::move(x->Value)); x = x->Next; } @@ -140,9 +140,9 @@ public: EraseList(current); } else { // Dequeue()s in progress, add nodes list to free list - TNode* currentLast = current; + TNode* currentLast = current; while (currentLast->Next) { - currentLast = currentLast->Next; + currentLast = currentLast->Next; } for (;;) { AtomicSet(currentLast->Next, AtomicGet(FreePtr)); @@ -156,7 +156,7 @@ public: TryToFreeMemory(); AtomicAdd(DequeueCount, -1); } - bool DequeueSingleConsumer(T* res) { + bool DequeueSingleConsumer(T* res) { for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { if (AtomicCas(&Head, current->Next, current)) { *res = std::move(current->Value); @@ -169,10 +169,10 @@ public: // add all elements to *res // elements are returned in order of dequeue (top to bottom; see example in unittest) template <typename TCollection> - void DequeueAllSingleConsumer(TCollection* res) { + void DequeueAllSingleConsumer(TCollection* res) { for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { if (AtomicCas(&Head, (TNode*)nullptr, current)) { - for (TNode* x = current; x;) { + for (TNode* x = current; x;) { res->push_back(std::move(x->Value)); x = x->Next; } @@ -181,7 +181,7 @@ public: } } } - bool IsEmpty() { + bool IsEmpty() { AtomicAdd(DequeueCount, 0); // mem barrier return AtomicGet(Head) == nullptr; // without lock, so result is approximate } diff --git a/util/thread/lfstack_ut.cpp b/util/thread/lfstack_ut.cpp index e20a838f95..61a8137db9 100644 --- a/util/thread/lfstack_ut.cpp +++ b/util/thread/lfstack_ut.cpp @@ -6,19 +6,19 @@ #include <library/cpp/testing/unittest/registar.h> -#include "lfstack.h" +#include "lfstack.h" Y_UNIT_TEST_SUITE(TLockFreeStackTests) { class TCountDownLatch { private: TAtomic Current_; TSystemEvent EventObject_; - + public: TCountDownLatch(unsigned initial) : Current_(initial) - { - } + { + } void CountDown() { if (AtomicDecrement(Current_) == 0) { @@ -175,11 +175,11 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) { } Y_UNIT_TEST(CleanInDestructor) { - TSimpleSharedPtr<bool> p(new bool); + TSimpleSharedPtr<bool> p(new bool); UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount()); { - TLockFreeStack<TSimpleSharedPtr<bool>> stack; + TLockFreeStack<TSimpleSharedPtr<bool>> stack; stack.Enqueue(p); stack.Enqueue(p); @@ -193,14 +193,14 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) { Y_UNIT_TEST(NoCopyTest) { static unsigned copied = 0; struct TCopyCount { - TCopyCount(int) { - } - TCopyCount(const TCopyCount&) { - ++copied; - } - - TCopyCount(TCopyCount&&) { - } + TCopyCount(int) { + } + TCopyCount(const TCopyCount&) { + ++copied; + } + + TCopyCount(TCopyCount&&) { + } TCopyCount& operator=(const TCopyCount&) { ++copied; diff --git a/util/thread/pool.cpp b/util/thread/pool.cpp index 05fad02e9b..0275429304 100644 --- a/util/thread/pool.cpp +++ b/util/thread/pool.cpp @@ -2,31 +2,31 @@ #include <util/system/defaults.h> -#if defined(_unix_) - #include <pthread.h> -#endif - +#if defined(_unix_) + #include <pthread.h> +#endif + #include <util/generic/vector.h> -#include <util/generic/intrlist.h> +#include <util/generic/intrlist.h> #include <util/generic/yexception.h> #include <util/generic/ylimits.h> -#include <util/generic/singleton.h> +#include <util/generic/singleton.h> #include <util/generic/fastqueue.h> - + #include <util/stream/output.h> #include <util/string/builder.h> - -#include <util/system/event.h> -#include <util/system/mutex.h> -#include <util/system/atomic.h> -#include <util/system/condvar.h> + +#include <util/system/event.h> +#include <util/system/mutex.h> +#include <util/system/atomic.h> +#include <util/system/condvar.h> #include <util/system/thread.h> #include <util/datetime/base.h> #include "factory.h" #include "pool.h" - + namespace { class TThreadNamer { public: @@ -36,7 +36,7 @@ namespace { { } - explicit operator bool() const { + explicit operator bool() const { return !ThreadName.empty(); } @@ -62,269 +62,269 @@ namespace { TThreadFactoryHolder::TThreadFactoryHolder() noexcept : Pool_(SystemThreadFactory()) -{ -} - +{ +} + class TThreadPool::TImpl: public TIntrusiveListItem<TImpl>, public IThreadFactory::IThreadAble { using TTsr = IThreadPool::TTsr; using TJobQueue = TFastQueue<IObjectInQueue*>; using TThreadRef = THolder<IThreadFactory::IThread>; - -public: + +public: inline TImpl(TThreadPool* parent, size_t thrnum, size_t maxqueue, const TParams& params) - : Parent_(parent) + : Parent_(parent) , Blocking(params.Blocking_) , Catching(params.Catching_) , Namer(params) , ShouldTerminate(1) - , MaxQueueSize(0) - , ThreadCountExpected(0) - , ThreadCountReal(0) - , Forked(false) - { - TAtforkQueueRestarter::Get().RegisterObject(this); - Start(thrnum, maxqueue); - } - + , MaxQueueSize(0) + , ThreadCountExpected(0) + , ThreadCountReal(0) + , Forked(false) + { + TAtforkQueueRestarter::Get().RegisterObject(this); + Start(thrnum, maxqueue); + } + inline ~TImpl() override { - try { - Stop(); - } catch (...) { - // ¯\_(ツ)_/¯ - } - - TAtforkQueueRestarter::Get().UnregisterObject(this); + try { + Stop(); + } catch (...) { + // ¯\_(ツ)_/¯ + } + + TAtforkQueueRestarter::Get().UnregisterObject(this); Y_ASSERT(Tharr.empty()); - } - - inline bool Add(IObjectInQueue* obj) { + } + + inline bool Add(IObjectInQueue* obj) { if (AtomicGet(ShouldTerminate)) { - return false; - } - - if (Tharr.empty()) { - TTsr tsr(Parent_); - obj->Process(tsr); - - return true; - } - - with_lock (QueueMutex) { + return false; + } + + if (Tharr.empty()) { + TTsr tsr(Parent_); + obj->Process(tsr); + + return true; + } + + with_lock (QueueMutex) { while (MaxQueueSize > 0 && Queue.Size() >= MaxQueueSize && !AtomicGet(ShouldTerminate)) { - if (!Blocking) { - return false; - } - QueuePopCond.Wait(QueueMutex); - } - + if (!Blocking) { + return false; + } + QueuePopCond.Wait(QueueMutex); + } + if (AtomicGet(ShouldTerminate)) { - return false; - } - - Queue.Push(obj); + return false; + } + + Queue.Push(obj); } - + QueuePushCond.Signal(); - return true; - } - + return true; + } + inline size_t Size() const noexcept { - auto guard = Guard(QueueMutex); + auto guard = Guard(QueueMutex); - return Queue.Size(); - } + return Queue.Size(); + } inline size_t GetMaxQueueSize() const noexcept { - return MaxQueueSize; - } + return MaxQueueSize; + } inline size_t GetThreadCountExpected() const noexcept { - return ThreadCountExpected; - } + return ThreadCountExpected; + } inline size_t GetThreadCountReal() const noexcept { return ThreadCountReal; } inline void AtforkAction() noexcept Y_NO_SANITIZE("thread") { - Forked = true; - } - + Forked = true; + } + inline bool NeedRestart() const noexcept { - return Forked; - } - -private: - inline void Start(size_t num, size_t maxque) { + return Forked; + } + +private: + inline void Start(size_t num, size_t maxque) { AtomicSet(ShouldTerminate, 0); - MaxQueueSize = maxque; - ThreadCountExpected = num; - - try { - for (size_t i = 0; i < num; ++i) { - Tharr.push_back(Parent_->Pool()->Run(this)); - ++ThreadCountReal; - } - } catch (...) { - Stop(); - - throw; - } - } - - inline void Stop() { + MaxQueueSize = maxque; + ThreadCountExpected = num; + + try { + for (size_t i = 0; i < num; ++i) { + Tharr.push_back(Parent_->Pool()->Run(this)); + ++ThreadCountReal; + } + } catch (...) { + Stop(); + + throw; + } + } + + inline void Stop() { AtomicSet(ShouldTerminate, 1); - with_lock (QueueMutex) { - QueuePopCond.BroadCast(); - } - - if (!NeedRestart()) { - WaitForComplete(); - } - - Tharr.clear(); - ThreadCountExpected = 0; - MaxQueueSize = 0; - } - + with_lock (QueueMutex) { + QueuePopCond.BroadCast(); + } + + if (!NeedRestart()) { + WaitForComplete(); + } + + Tharr.clear(); + ThreadCountExpected = 0; + MaxQueueSize = 0; + } + inline void WaitForComplete() noexcept { - with_lock (StopMutex) { - while (ThreadCountReal) { - with_lock (QueueMutex) { + with_lock (StopMutex) { + while (ThreadCountReal) { + with_lock (QueueMutex) { QueuePushCond.Signal(); - } - - StopCond.Wait(StopMutex); - } - } - } - + } + + StopCond.Wait(StopMutex); + } + } + } + void DoExecute() override { - THolder<TTsr> tsr(new TTsr(Parent_)); - + THolder<TTsr> tsr(new TTsr(Parent_)); + if (Namer) { Namer.SetCurrentThreadName(); } - while (true) { + while (true) { IObjectInQueue* job = nullptr; - - with_lock (QueueMutex) { + + with_lock (QueueMutex) { while (Queue.Empty() && !AtomicGet(ShouldTerminate)) { QueuePushCond.Wait(QueueMutex); - } - + } + if (AtomicGet(ShouldTerminate) && Queue.Empty()) { - tsr.Destroy(); - - break; - } - - job = Queue.Pop(); - } - - QueuePopCond.Signal(); - + tsr.Destroy(); + + break; + } + + job = Queue.Pop(); + } + + QueuePopCond.Signal(); + if (Catching) { - try { + try { try { job->Process(*tsr); } catch (...) { Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl; } - } catch (...) { - // ¯\_(ツ)_/¯ - } + } catch (...) { + // ¯\_(ツ)_/¯ + } } else { job->Process(*tsr); - } - } - - FinishOneThread(); - } - + } + } + + FinishOneThread(); + } + inline void FinishOneThread() noexcept { - auto guard = Guard(StopMutex); - - --ThreadCountReal; - StopCond.Signal(); - } + auto guard = Guard(StopMutex); + + --ThreadCountReal; + StopCond.Signal(); + } -private: +private: TThreadPool* Parent_; const bool Blocking; const bool Catching; TThreadNamer Namer; - mutable TMutex QueueMutex; - mutable TMutex StopMutex; + mutable TMutex QueueMutex; + mutable TMutex StopMutex; TCondVar QueuePushCond; - TCondVar QueuePopCond; - TCondVar StopCond; - TJobQueue Queue; + TCondVar QueuePopCond; + TCondVar StopCond; + TJobQueue Queue; TVector<TThreadRef> Tharr; TAtomic ShouldTerminate; - size_t MaxQueueSize; - size_t ThreadCountExpected; - size_t ThreadCountReal; - bool Forked; - - class TAtforkQueueRestarter { - public: - static TAtforkQueueRestarter& Get() { - return *SingletonWithPriority<TAtforkQueueRestarter, 256>(); - } - - inline void RegisterObject(TImpl* obj) { - auto guard = Guard(ActionMutex); - - RegisteredObjects.PushBack(obj); - } - - inline void UnregisterObject(TImpl* obj) { - auto guard = Guard(ActionMutex); - - obj->Unlink(); - } - - private: - void ChildAction() { - with_lock (ActionMutex) { - for (auto it = RegisteredObjects.Begin(); it != RegisteredObjects.End(); ++it) { - it->AtforkAction(); - } - } - } - - static void ProcessChildAction() { - Get().ChildAction(); - } - - TIntrusiveList<TImpl> RegisteredObjects; - TMutex ActionMutex; - - public: - inline TAtforkQueueRestarter() { -#if defined(_bionic_) -//no pthread_atfork on android libc -#elif defined(_unix_) + size_t MaxQueueSize; + size_t ThreadCountExpected; + size_t ThreadCountReal; + bool Forked; + + class TAtforkQueueRestarter { + public: + static TAtforkQueueRestarter& Get() { + return *SingletonWithPriority<TAtforkQueueRestarter, 256>(); + } + + inline void RegisterObject(TImpl* obj) { + auto guard = Guard(ActionMutex); + + RegisteredObjects.PushBack(obj); + } + + inline void UnregisterObject(TImpl* obj) { + auto guard = Guard(ActionMutex); + + obj->Unlink(); + } + + private: + void ChildAction() { + with_lock (ActionMutex) { + for (auto it = RegisteredObjects.Begin(); it != RegisteredObjects.End(); ++it) { + it->AtforkAction(); + } + } + } + + static void ProcessChildAction() { + Get().ChildAction(); + } + + TIntrusiveList<TImpl> RegisteredObjects; + TMutex ActionMutex; + + public: + inline TAtforkQueueRestarter() { +#if defined(_bionic_) +//no pthread_atfork on android libc +#elif defined(_unix_) pthread_atfork(nullptr, nullptr, ProcessChildAction); #endif - } - }; -}; - + } + }; +}; + TThreadPool::~TThreadPool() = default; - + size_t TThreadPool::Size() const noexcept { - if (!Impl_.Get()) { - return 0; - } - - return Impl_->Size(); -} - + if (!Impl_.Get()) { + return 0; + } + + return Impl_->Size(); +} + size_t TThreadPool::GetThreadCountExpected() const noexcept { if (!Impl_.Get()) { return 0; @@ -351,298 +351,298 @@ size_t TThreadPool::GetMaxQueueSize() const noexcept { bool TThreadPool::Add(IObjectInQueue* obj) { Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started")); - - if (Impl_->NeedRestart()) { - Start(Impl_->GetThreadCountExpected(), Impl_->GetMaxQueueSize()); + + if (Impl_->NeedRestart()) { + Start(Impl_->GetThreadCountExpected(), Impl_->GetMaxQueueSize()); } - return Impl_->Add(obj); + return Impl_->Add(obj); } void TThreadPool::Start(size_t thrnum, size_t maxque) { Impl_.Reset(new TImpl(this, thrnum, maxque, Params)); -} - +} + void TThreadPool::Stop() noexcept { - Impl_.Destroy(); -} - -static TAtomic mtp_queue_counter = 0; - + Impl_.Destroy(); +} + +static TAtomic mtp_queue_counter = 0; + class TAdaptiveThreadPool::TImpl { -public: +public: class TThread: public IThreadFactory::IThreadAble { public: - inline TThread(TImpl* parent) - : Impl_(parent) - , Thread_(Impl_->Parent_->Pool()->Run(this)) - { - } - + inline TThread(TImpl* parent) + : Impl_(parent) + , Thread_(Impl_->Parent_->Pool()->Run(this)) + { + } + inline ~TThread() override { - Impl_->DecThreadCount(); - } - - private: + Impl_->DecThreadCount(); + } + + private: void DoExecute() noexcept override { - THolder<TThread> This(this); - + THolder<TThread> This(this); + if (Impl_->Namer) { Impl_->Namer.SetCurrentThreadName(); } - { - TTsr tsr(Impl_->Parent_); - IObjectInQueue* obj; - + { + TTsr tsr(Impl_->Parent_); + IObjectInQueue* obj; + while ((obj = Impl_->WaitForJob()) != nullptr) { if (Impl_->Catching) { - try { + try { try { obj->Process(tsr); } catch (...) { Cdbg << Impl_->Name() << " " << CurrentExceptionMessage() << Endl; } - } catch (...) { + } catch (...) { // ¯\_(ツ)_/¯ - } + } } else { obj->Process(tsr); - } - } - } - } - - private: - TImpl* Impl_; + } + } + } + } + + private: + TImpl* Impl_; THolder<IThreadFactory::IThread> Thread_; - }; - + }; + inline TImpl(TAdaptiveThreadPool* parent, const TParams& params) - : Parent_(parent) + : Parent_(parent) , Catching(params.Catching_) , Namer(params) - , ThrCount_(0) - , AllDone_(false) + , ThrCount_(0) + , AllDone_(false) , Obj_(nullptr) - , Free_(0) - , IdleTime_(TDuration::Max()) - { - sprintf(Name_, "[mtp queue %ld]", (long)AtomicAdd(mtp_queue_counter, 1)); - } - + , Free_(0) + , IdleTime_(TDuration::Max()) + { + sprintf(Name_, "[mtp queue %ld]", (long)AtomicAdd(mtp_queue_counter, 1)); + } + inline ~TImpl() { - Stop(); - } - - inline void SetMaxIdleTime(TDuration idleTime) { - IdleTime_ = idleTime; - } - + Stop(); + } + + inline void SetMaxIdleTime(TDuration idleTime) { + IdleTime_ = idleTime; + } + inline const char* Name() const noexcept { - return Name_; - } - - inline void Add(IObjectInQueue* obj) { - with_lock (Mutex_) { + return Name_; + } + + inline void Add(IObjectInQueue* obj) { + with_lock (Mutex_) { while (Obj_ != nullptr) { - CondFree_.Wait(Mutex_); - } - - if (Free_ == 0) { - AddThreadNoLock(); - } - - Obj_ = obj; - + CondFree_.Wait(Mutex_); + } + + if (Free_ == 0) { + AddThreadNoLock(); + } + + Obj_ = obj; + Y_ENSURE_EX(!AllDone_, TThreadPoolException() << TStringBuf("adding to a stopped queue")); - } - - CondReady_.Signal(); - } - - inline void AddThreads(size_t n) { - with_lock (Mutex_) { - while (n) { - AddThreadNoLock(); - - --n; - } - } - } - + } + + CondReady_.Signal(); + } + + inline void AddThreads(size_t n) { + with_lock (Mutex_) { + while (n) { + AddThreadNoLock(); + + --n; + } + } + } + inline size_t Size() const noexcept { - return (size_t)ThrCount_; - } - -private: + return (size_t)ThrCount_; + } + +private: inline void IncThreadCount() noexcept { - AtomicAdd(ThrCount_, 1); - } - + AtomicAdd(ThrCount_, 1); + } + inline void DecThreadCount() noexcept { - AtomicAdd(ThrCount_, -1); - } - - inline void AddThreadNoLock() { - IncThreadCount(); - - try { - new TThread(this); - } catch (...) { - DecThreadCount(); - - throw; - } - } - + AtomicAdd(ThrCount_, -1); + } + + inline void AddThreadNoLock() { + IncThreadCount(); + + try { + new TThread(this); + } catch (...) { + DecThreadCount(); + + throw; + } + } + inline void Stop() noexcept { - Mutex_.Acquire(); - - AllDone_ = true; - + Mutex_.Acquire(); + + AllDone_ = true; + while (AtomicGet(ThrCount_)) { - Mutex_.Release(); - CondReady_.Signal(); - Mutex_.Acquire(); - } - - Mutex_.Release(); - } - + Mutex_.Release(); + CondReady_.Signal(); + Mutex_.Acquire(); + } + + Mutex_.Release(); + } + inline IObjectInQueue* WaitForJob() noexcept { - Mutex_.Acquire(); - - ++Free_; - - while (!Obj_ && !AllDone_) { + Mutex_.Acquire(); + + ++Free_; + + while (!Obj_ && !AllDone_) { if (!CondReady_.WaitT(Mutex_, IdleTime_)) { - break; - } - } - - IObjectInQueue* ret = Obj_; + break; + } + } + + IObjectInQueue* ret = Obj_; Obj_ = nullptr; - - --Free_; - - Mutex_.Release(); - CondFree_.Signal(); - - return ret; - } - -private: + + --Free_; + + Mutex_.Release(); + CondFree_.Signal(); + + return ret; + } + +private: TAdaptiveThreadPool* Parent_; const bool Catching; TThreadNamer Namer; - TAtomic ThrCount_; - TMutex Mutex_; - TCondVar CondReady_; - TCondVar CondFree_; - bool AllDone_; - IObjectInQueue* Obj_; - size_t Free_; - char Name_[64]; - TDuration IdleTime_; -}; - + TAtomic ThrCount_; + TMutex Mutex_; + TCondVar CondReady_; + TCondVar CondFree_; + bool AllDone_; + IObjectInQueue* Obj_; + size_t Free_; + char Name_[64]; + TDuration IdleTime_; +}; + TThreadPoolBase::TThreadPoolBase(const TParams& params) : TThreadFactoryHolder(params.Factory_) , Params(params) -{ -} - +{ +} + #define DEFINE_THREAD_POOL_CTORS(type) \ - type::type(const TParams& params) \ - : TThreadPoolBase(params) \ - { \ - } + type::type(const TParams& params) \ + : TThreadPoolBase(params) \ + { \ + } DEFINE_THREAD_POOL_CTORS(TThreadPool) DEFINE_THREAD_POOL_CTORS(TAdaptiveThreadPool) DEFINE_THREAD_POOL_CTORS(TSimpleThreadPool) TAdaptiveThreadPool::~TAdaptiveThreadPool() = default; - + bool TAdaptiveThreadPool::Add(IObjectInQueue* obj) { Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started")); - - Impl_->Add(obj); - - return true; -} - + + Impl_->Add(obj); + + return true; +} + void TAdaptiveThreadPool::Start(size_t, size_t) { Impl_.Reset(new TImpl(this, Params)); -} - +} + void TAdaptiveThreadPool::Stop() noexcept { - Impl_.Destroy(); -} - + Impl_.Destroy(); +} + size_t TAdaptiveThreadPool::Size() const noexcept { - if (Impl_.Get()) { - return Impl_->Size(); - } - - return 0; -} - + if (Impl_.Get()) { + return Impl_->Size(); + } + + return 0; +} + void TAdaptiveThreadPool::SetMaxIdleTime(TDuration interval) { Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started")); - + Impl_->SetMaxIdleTime(interval); -} - +} + TSimpleThreadPool::~TSimpleThreadPool() { - try { - Stop(); - } catch (...) { - // ¯\_(ツ)_/¯ - } -} - + try { + Stop(); + } catch (...) { + // ¯\_(ツ)_/¯ + } +} + bool TSimpleThreadPool::Add(IObjectInQueue* obj) { Y_ENSURE_EX(Slave_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started")); - - return Slave_->Add(obj); -} - + + return Slave_->Add(obj); +} + void TSimpleThreadPool::Start(size_t thrnum, size_t maxque) { THolder<IThreadPool> tmp; TAdaptiveThreadPool* adaptive(nullptr); - - if (thrnum) { + + if (thrnum) { tmp.Reset(new TThreadPoolBinder<TThreadPool, TSimpleThreadPool>(this, Params)); - } else { + } else { adaptive = new TThreadPoolBinder<TAdaptiveThreadPool, TSimpleThreadPool>(this, Params); - tmp.Reset(adaptive); - } - - tmp->Start(thrnum, maxque); - - if (adaptive) { + tmp.Reset(adaptive); + } + + tmp->Start(thrnum, maxque); + + if (adaptive) { adaptive->SetMaxIdleTime(TDuration::Seconds(100)); - } - - Slave_.Swap(tmp); -} - + } + + Slave_.Swap(tmp); +} + void TSimpleThreadPool::Stop() noexcept { - Slave_.Destroy(); -} - + Slave_.Destroy(); +} + size_t TSimpleThreadPool::Size() const noexcept { - if (Slave_.Get()) { - return Slave_->Size(); - } - - return 0; -} - + if (Slave_.Get()) { + return Slave_->Size(); + } + + return 0; +} + namespace { - class TOwnedObjectInQueue: public IObjectInQueue { + class TOwnedObjectInQueue: public IObjectInQueue { private: THolder<IObjectInQueue> Owned; @@ -661,7 +661,7 @@ namespace { void IThreadPool::SafeAdd(IObjectInQueue* obj) { Y_ENSURE_EX(Add(obj), TThreadPoolException() << TStringBuf("can not add object to queue")); -} +} void IThreadPool::SafeAddAndOwn(THolder<IObjectInQueue> obj) { Y_ENSURE_EX(AddAndOwn(std::move(obj)), TThreadPoolException() << TStringBuf("can not add to queue and own")); @@ -678,87 +678,87 @@ bool IThreadPool::AddAndOwn(THolder<IObjectInQueue> obj) { using IThread = IThreadFactory::IThread; using IThreadAble = IThreadFactory::IThreadAble; - -namespace { - class TPoolThread: public IThread { - class TThreadImpl: public IObjectInQueue, public TAtomicRefCount<TThreadImpl> { - public: - inline TThreadImpl(IThreadAble* func) - : Func_(func) - { - } - + +namespace { + class TPoolThread: public IThread { + class TThreadImpl: public IObjectInQueue, public TAtomicRefCount<TThreadImpl> { + public: + inline TThreadImpl(IThreadAble* func) + : Func_(func) + { + } + ~TThreadImpl() override = default; - + inline void WaitForStart() noexcept { - StartEvent_.Wait(); - } - + StartEvent_.Wait(); + } + inline void WaitForComplete() noexcept { - CompleteEvent_.Wait(); - } - - private: + CompleteEvent_.Wait(); + } + + private: void Process(void* /*tsr*/) override { - TThreadImplRef This(this); - - { - StartEvent_.Signal(); - - try { - Func_->Execute(); - } catch (...) { - // ¯\_(ツ)_/¯ - } - - CompleteEvent_.Signal(); - } - } - - private: - IThreadAble* Func_; + TThreadImplRef This(this); + + { + StartEvent_.Signal(); + + try { + Func_->Execute(); + } catch (...) { + // ¯\_(ツ)_/¯ + } + + CompleteEvent_.Signal(); + } + } + + private: + IThreadAble* Func_; TSystemEvent CompleteEvent_; TSystemEvent StartEvent_; - }; - + }; + using TThreadImplRef = TIntrusivePtr<TThreadImpl>; - - public: + + public: inline TPoolThread(IThreadPool* parent) - : Parent_(parent) - { - } - + : Parent_(parent) + { + } + ~TPoolThread() override { - if (Impl_) { - Impl_->WaitForStart(); - } - } - - private: + if (Impl_) { + Impl_->WaitForStart(); + } + } + + private: void DoRun(IThreadAble* func) override { - TThreadImplRef impl(new TThreadImpl(func)); - - Parent_->SafeAdd(impl.Get()); - Impl_.Swap(impl); - } - + TThreadImplRef impl(new TThreadImpl(func)); + + Parent_->SafeAdd(impl.Get()); + Impl_.Swap(impl); + } + void DoJoin() noexcept override { - if (Impl_) { - Impl_->WaitForComplete(); + if (Impl_) { + Impl_->WaitForComplete(); Impl_ = nullptr; - } - } - - private: + } + } + + private: IThreadPool* Parent_; - TThreadImplRef Impl_; - }; -} - + TThreadImplRef Impl_; + }; +} + IThread* IThreadPool::DoCreate() { - return new TPoolThread(this); -} + return new TPoolThread(this); +} THolder<IThreadPool> CreateThreadPool(size_t threadsCount, size_t queueSizeLimit, const TThreadPoolParams& params) { THolder<IThreadPool> queue; diff --git a/util/thread/pool.h b/util/thread/pool.h index d1ea3a67cb..e2a2a03968 100644 --- a/util/thread/pool.h +++ b/util/thread/pool.h @@ -2,19 +2,19 @@ #include "fwd.h" #include "factory.h" - -#include <util/system/yassert.h> + +#include <util/system/yassert.h> #include <util/system/defaults.h> #include <util/generic/yexception.h> #include <util/generic/ptr.h> -#include <util/generic/noncopyable.h> +#include <util/generic/noncopyable.h> #include <functional> class TDuration; -struct IObjectInQueue { +struct IObjectInQueue { virtual ~IObjectInQueue() = default; - + /** * Supposed to be implemented by user, to define jobs processed * in multiple threads. @@ -32,38 +32,38 @@ struct IObjectInQueue { * Useful only for creators of new queue classes. */ class TThreadFactoryHolder { -public: +public: TThreadFactoryHolder() noexcept; - + inline TThreadFactoryHolder(IThreadFactory* pool) noexcept - : Pool_(pool) - { - } - + : Pool_(pool) + { + } + inline ~TThreadFactoryHolder() = default; - + inline IThreadFactory* Pool() const noexcept { - return Pool_; - } - -private: + return Pool_; + } + +private: IThreadFactory* Pool_; -}; - +}; + class TThreadPoolException: public yexception { }; -template <class T> -class TThrFuncObj: public IObjectInQueue { +template <class T> +class TThrFuncObj: public IObjectInQueue { public: TThrFuncObj(const T& func) - : Func(func) - { + : Func(func) + { } TThrFuncObj(T&& func) - : Func(std::move(func)) - { + : Func(std::move(func)) + { } void Process(void*) override { @@ -75,7 +75,7 @@ private: T Func; }; -template <class T> +template <class T> IObjectInQueue* MakeThrFuncObj(T&& func) { return new TThrFuncObj<std::remove_cv_t<std::remove_reference_t<T>>>(std::forward<T>(func)); } @@ -134,10 +134,10 @@ struct TThreadPoolParams { }; /** - * A queue processed simultaneously by several threads + * A queue processed simultaneously by several threads */ class IThreadPool: public IThreadFactory, public TNonCopyable { -public: +public: using TParams = TThreadPoolParams; ~IThreadPool() override = default; @@ -148,7 +148,7 @@ public: */ void SafeAdd(IObjectInQueue* obj); - template <class T> + template <class T> void SafeAddFunc(T&& func) { Y_ENSURE_EX(AddFunc(std::forward<T>(func)), TThreadPoolException() << TStringBuf("can not add function to queue")); } @@ -161,9 +161,9 @@ public: * @return true of obj is successfully added to queue * @return false if queue is full or shutting down */ - virtual bool Add(IObjectInQueue* obj) Y_WARN_UNUSED_RESULT = 0; + virtual bool Add(IObjectInQueue* obj) Y_WARN_UNUSED_RESULT = 0; - template <class T> + template <class T> Y_WARN_UNUSED_RESULT bool AddFunc(T&& func) { THolder<IObjectInQueue> wrapper(MakeThrFuncObj(std::forward<T>(func))); bool added = Add(wrapper.Get()); @@ -185,76 +185,76 @@ public: * RAII wrapper for Create/DestroyThreadSpecificResource. * Useful only for implementers of new IThreadPool queues. */ - class TTsr { - public: + class TTsr { + public: inline TTsr(IThreadPool* q) - : Q_(q) - , Data_(Q_->CreateThreadSpecificResource()) - { - } - + : Q_(q) + , Data_(Q_->CreateThreadSpecificResource()) + { + } + inline ~TTsr() { - try { - Q_->DestroyThreadSpecificResource(Data_); - } catch (...) { - // ¯\_(ツ)_/¯ - } - } - + try { + Q_->DestroyThreadSpecificResource(Data_); + } catch (...) { + // ¯\_(ツ)_/¯ + } + } + inline operator void*() noexcept { - return Data_; - } - - private: + return Data_; + } + + private: IThreadPool* Q_; - void* Data_; - }; - + void* Data_; + }; + /** * CreateThreadSpecificResource and DestroyThreadSpecificResource * called from internals of (TAdaptiveThreadPool, TThreadPool, ...) implementation, * not by user of IThreadPool interface. * Created resource is passed to IObjectInQueue::Proccess function. */ - virtual void* CreateThreadSpecificResource() { + virtual void* CreateThreadSpecificResource() { return nullptr; - } - - virtual void DestroyThreadSpecificResource(void* resource) { + } + + virtual void DestroyThreadSpecificResource(void* resource) { if (resource != nullptr) { Y_ASSERT(resource == nullptr); - } - } - -private: + } + } + +private: IThread* DoCreate() override; -}; - +}; + /** * Single-threaded implementation of IThreadPool, process tasks in same thread when * added. * Can be used to remove multithreading. */ class TFakeThreadPool: public IThreadPool { -public: +public: bool Add(IObjectInQueue* pObj) override Y_WARN_UNUSED_RESULT { - TTsr tsr(this); - pObj->Process(tsr); - - return true; - } - + TTsr tsr(this); + pObj->Process(tsr); + + return true; + } + void Start(size_t, size_t = 0) override { - } - + } + void Stop() noexcept override { - } - + } + size_t Size() const noexcept override { - return 0; - } -}; - + return 0; + } +}; + class TThreadPoolBase: public IThreadPool, public TThreadFactoryHolder { public: TThreadPoolBase(const TParams& params); @@ -265,10 +265,10 @@ protected: /** queue processed by fixed size thread pool */ class TThreadPool: public TThreadPoolBase { -public: +public: TThreadPool(const TParams& params = {}); ~TThreadPool() override; - + bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT; /** * @param queueSizeLimit means "unlimited" when = 0 @@ -280,45 +280,45 @@ public: size_t GetThreadCountExpected() const noexcept; size_t GetThreadCountReal() const noexcept; size_t GetMaxQueueSize() const noexcept; - -private: - class TImpl; - THolder<TImpl> Impl_; -}; - + +private: + class TImpl; + THolder<TImpl> Impl_; +}; + /** * Always create new thread for new task, when all existing threads are busy. * Maybe dangerous, number of threads is not limited. */ class TAdaptiveThreadPool: public TThreadPoolBase { -public: +public: TAdaptiveThreadPool(const TParams& params = {}); ~TAdaptiveThreadPool() override; - + /** * If working thread waits task too long (more then interval parameter), * then the thread would be killed. Default value - infinity, all created threads * waits for new task forever, before Stop. */ - void SetMaxIdleTime(TDuration interval); - + void SetMaxIdleTime(TDuration interval); + bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT; /** @param thrnum, @param maxque are ignored */ void Start(size_t thrnum = 0, size_t maxque = 0) override; void Stop() noexcept override; size_t Size() const noexcept override; - + private: - class TImpl; - THolder<TImpl> Impl_; + class TImpl; + THolder<TImpl> Impl_; }; /** Behave like TThreadPool or TAdaptiveThreadPool, choosen by thrnum parameter of Start() */ class TSimpleThreadPool: public TThreadPoolBase { -public: +public: TSimpleThreadPool(const TParams& params = {}); ~TSimpleThreadPool() override; - + bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT; /** * @parameter thrnum. If thrnum is 0, use TAdaptiveThreadPool with small @@ -327,11 +327,11 @@ public: void Start(size_t thrnum, size_t maxque = 0) override; void Stop() noexcept override; size_t Size() const noexcept override; - -private: + +private: THolder<IThreadPool> Slave_; -}; - +}; + /** * Helper to override virtual functions Create/DestroyThreadSpecificResource * from IThreadPool and implement them using functions with same name from @@ -339,46 +339,46 @@ private: */ template <class TQueueType, class TSlave> class TThreadPoolBinder: public TQueueType { -public: +public: inline TThreadPoolBinder(TSlave* slave) - : Slave_(slave) - { - } - - template <class... Args> - inline TThreadPoolBinder(TSlave* slave, Args&&... args) + : Slave_(slave) + { + } + + template <class... Args> + inline TThreadPoolBinder(TSlave* slave, Args&&... args) : TQueueType(std::forward<Args>(args)...) - , Slave_(slave) - { - } - + , Slave_(slave) + { + } + inline TThreadPoolBinder(TSlave& slave) - : Slave_(&slave) - { - } - + : Slave_(&slave) + { + } + ~TThreadPoolBinder() override { - try { - this->Stop(); - } catch (...) { - // ¯\_(ツ)_/¯ - } - } - + try { + this->Stop(); + } catch (...) { + // ¯\_(ツ)_/¯ + } + } + void* CreateThreadSpecificResource() override { - return Slave_->CreateThreadSpecificResource(); - } - + return Slave_->CreateThreadSpecificResource(); + } + void DestroyThreadSpecificResource(void* resource) override { - Slave_->DestroyThreadSpecificResource(resource); - } - -private: - TSlave* Slave_; -}; - + Slave_->DestroyThreadSpecificResource(resource); + } + +private: + TSlave* Slave_; +}; + inline void Delete(THolder<IThreadPool> q) { - if (q.Get()) { + if (q.Get()) { q->Stop(); } } diff --git a/util/thread/pool_ut.cpp b/util/thread/pool_ut.cpp index 893770d0c4..200bd89060 100644 --- a/util/thread/pool_ut.cpp +++ b/util/thread/pool_ut.cpp @@ -1,10 +1,10 @@ #include "pool.h" - + #include <library/cpp/testing/unittest/registar.h> #include <util/stream/output.h> -#include <util/random/fast.h> -#include <util/system/spinlock.h> +#include <util/random/fast.h> +#include <util/system/spinlock.h> #include <util/system/thread.h> #include <util/system/mutex.h> #include <util/system/condvar.h> @@ -12,26 +12,26 @@ struct TThreadPoolTest { TSpinLock Lock; long R = -1; - - struct TTask: public IObjectInQueue { + + struct TTask: public IObjectInQueue { TThreadPoolTest* Test = nullptr; long Value = 0; - + TTask(TThreadPoolTest* test, int value) - : Test(test) - , Value(value) - { - } - + : Test(test) + , Value(value) + { + } + void Process(void*) override { THolder<TTask> This(this); - + TGuard<TSpinLock> guard(Test->Lock); Test->R ^= Value; - } - }; - - struct TOwnedTask: public IObjectInQueue { + } + }; + + struct TOwnedTask: public IObjectInQueue { bool& Processed; bool& Destructed; @@ -51,40 +51,40 @@ struct TThreadPoolTest { }; inline void TestAnyQueue(IThreadPool* queue, size_t queueSize = 1000) { - TReallyFastRng32 rand(17); - const size_t cnt = 1000; - + TReallyFastRng32 rand(17); + const size_t cnt = 1000; + R = 0; - - for (size_t i = 0; i < cnt; ++i) { - R ^= (long)rand.GenRand(); + + for (size_t i = 0; i < cnt; ++i) { + R ^= (long)rand.GenRand(); + } + + queue->Start(10, queueSize); + rand = TReallyFastRng32(17); + + for (size_t i = 0; i < cnt; ++i) { + UNIT_ASSERT(queue->Add(new TTask(this, (long)rand.GenRand()))); } - - queue->Start(10, queueSize); - rand = TReallyFastRng32(17); - - for (size_t i = 0; i < cnt; ++i) { - UNIT_ASSERT(queue->Add(new TTask(this, (long)rand.GenRand()))); - } - - queue->Stop(); - + + queue->Stop(); + UNIT_ASSERT_EQUAL(0, R); - } + } }; class TFailAddQueue: public IThreadPool { public: bool Add(IObjectInQueue* /*obj*/) override Y_WARN_UNUSED_RESULT { return false; - } + } void Start(size_t, size_t) override { } - + void Stop() noexcept override { - } - + } + size_t Size() const noexcept override { return 0; } @@ -111,7 +111,7 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) { TAdaptiveThreadPool q; t.TestAnyQueue(&q); } - } + } Y_UNIT_TEST(TestAddAndOwn) { TThreadPool q; @@ -128,8 +128,8 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) { Y_UNIT_TEST(TestAddFunc) { TFailAddQueue queue; bool added = queue.AddFunc( - []() {} // Lambda, I call him 'Lambda'! - ); + []() {} // Lambda, I call him 'Lambda'! + ); UNIT_ASSERT_VALUES_EQUAL(added, false); } @@ -154,7 +154,7 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) { TThreadPool queue(TThreadPool::TParams().SetBlocking(false).SetCatching(true)); queue.Start(2); - queue.SafeAddFunc([data = TFailOnCopy()]() {}); + queue.SafeAddFunc([data = TFailOnCopy()]() {}); queue.Stop(); } @@ -179,7 +179,7 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) { queue.Stop(); } - void TestFixedThreadName(IThreadPool& pool, const TString& expectedName) { + void TestFixedThreadName(IThreadPool& pool, const TString& expectedName) { pool.Start(1); TString name; pool.SafeAddFunc([&name]() { @@ -204,7 +204,7 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) { } } - void TestEnumeratedThreadName(IThreadPool& pool, const THashSet<TString>& expectedNames) { + void TestEnumeratedThreadName(IThreadPool& pool, const THashSet<TString>& expectedNames) { pool.Start(expectedNames.size()); TMutex lock; TCondVar allReady; @@ -212,7 +212,7 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) { THashSet<TString> names; for (size_t i = 0; i < expectedNames.size(); ++i) { pool.SafeAddFunc([&]() { - with_lock (lock) { + with_lock (lock) { if (++readyCount == expectedNames.size()) { allReady.BroadCast(); } else { diff --git a/util/thread/singleton.cpp b/util/thread/singleton.cpp index a898bdc9d4..83bee4e88b 100644 --- a/util/thread/singleton.cpp +++ b/util/thread/singleton.cpp @@ -1 +1 @@ -#include "singleton.h" +#include "singleton.h" diff --git a/util/thread/singleton.h b/util/thread/singleton.h index 4a1e05aea0..a9af366005 100644 --- a/util/thread/singleton.h +++ b/util/thread/singleton.h @@ -4,38 +4,38 @@ #include <util/generic/singleton.h> #include <util/generic/ptr.h> -namespace NPrivate { - template <class T, size_t Priority> - struct TFastThreadSingletonHelper { - static inline T* GetSlow() { - return SingletonWithPriority<NTls::TValue<T>, Priority>()->GetPtr(); - } +namespace NPrivate { + template <class T, size_t Priority> + struct TFastThreadSingletonHelper { + static inline T* GetSlow() { + return SingletonWithPriority<NTls::TValue<T>, Priority>()->GetPtr(); + } - static inline T* Get() { + static inline T* Get() { #if defined(Y_HAVE_FAST_POD_TLS) - Y_POD_STATIC_THREAD(T*) fast(nullptr); + Y_POD_STATIC_THREAD(T*) fast(nullptr); if (Y_UNLIKELY(!fast)) { - fast = GetSlow(); - } - - return fast; -#else - return GetSlow(); -#endif - } - }; + fast = GetSlow(); + } + + return fast; +#else + return GetSlow(); +#endif + } + }; } -template <class T, size_t Priority> -static inline T* FastTlsSingletonWithPriority() { - return ::NPrivate::TFastThreadSingletonHelper<T, Priority>::Get(); +template <class T, size_t Priority> +static inline T* FastTlsSingletonWithPriority() { + return ::NPrivate::TFastThreadSingletonHelper<T, Priority>::Get(); } // NB: the singleton is the same for all modules that use // FastTlsSingleton with the same type parameter. If unique singleton // required, use unique types. -template <class T> -static inline T* FastTlsSingleton() { - return FastTlsSingletonWithPriority<T, TSingletonTraits<T>::Priority>(); -} +template <class T> +static inline T* FastTlsSingleton() { + return FastTlsSingletonWithPriority<T, TSingletonTraits<T>::Priority>(); +} diff --git a/util/thread/singleton_ut.cpp b/util/thread/singleton_ut.cpp index 164b1cc184..0ad4d395a6 100644 --- a/util/thread/singleton_ut.cpp +++ b/util/thread/singleton_ut.cpp @@ -7,8 +7,8 @@ namespace { int i; TFoo() : i(0) - { - } + { + } }; } diff --git a/util/thread/ut/ya.make b/util/thread/ut/ya.make index 93198bfaf1..102c21429f 100644 --- a/util/thread/ut/ya.make +++ b/util/thread/ut/ya.make @@ -1,14 +1,14 @@ -UNITTEST_FOR(util) +UNITTEST_FOR(util) OWNER(g:util) SUBSCRIBER(g:util-subscribers) SRCS( thread/factory_ut.cpp - thread/lfqueue_ut.cpp - thread/lfstack_ut.cpp + thread/lfqueue_ut.cpp + thread/lfstack_ut.cpp thread/pool_ut.cpp - thread/singleton_ut.cpp + thread/singleton_ut.cpp ) PEERDIR( |