diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /util/thread/pool.cpp | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'util/thread/pool.cpp')
-rw-r--r-- | util/thread/pool.cpp | 990 |
1 files changed, 495 insertions, 495 deletions
diff --git a/util/thread/pool.cpp b/util/thread/pool.cpp index 05fad02e9b..0275429304 100644 --- a/util/thread/pool.cpp +++ b/util/thread/pool.cpp @@ -2,31 +2,31 @@ #include <util/system/defaults.h> -#if defined(_unix_) - #include <pthread.h> -#endif - +#if defined(_unix_) + #include <pthread.h> +#endif + #include <util/generic/vector.h> -#include <util/generic/intrlist.h> +#include <util/generic/intrlist.h> #include <util/generic/yexception.h> #include <util/generic/ylimits.h> -#include <util/generic/singleton.h> +#include <util/generic/singleton.h> #include <util/generic/fastqueue.h> - + #include <util/stream/output.h> #include <util/string/builder.h> - -#include <util/system/event.h> -#include <util/system/mutex.h> -#include <util/system/atomic.h> -#include <util/system/condvar.h> + +#include <util/system/event.h> +#include <util/system/mutex.h> +#include <util/system/atomic.h> +#include <util/system/condvar.h> #include <util/system/thread.h> #include <util/datetime/base.h> #include "factory.h" #include "pool.h" - + namespace { class TThreadNamer { public: @@ -36,7 +36,7 @@ namespace { { } - explicit operator bool() const { + explicit operator bool() const { return !ThreadName.empty(); } @@ -62,269 +62,269 @@ namespace { TThreadFactoryHolder::TThreadFactoryHolder() noexcept : Pool_(SystemThreadFactory()) -{ -} - +{ +} + class TThreadPool::TImpl: public TIntrusiveListItem<TImpl>, public IThreadFactory::IThreadAble { using TTsr = IThreadPool::TTsr; using TJobQueue = TFastQueue<IObjectInQueue*>; using TThreadRef = THolder<IThreadFactory::IThread>; - -public: + +public: inline TImpl(TThreadPool* parent, size_t thrnum, size_t maxqueue, const TParams& params) - : Parent_(parent) + : Parent_(parent) , Blocking(params.Blocking_) , Catching(params.Catching_) , Namer(params) , ShouldTerminate(1) - , MaxQueueSize(0) - , ThreadCountExpected(0) - , ThreadCountReal(0) - , Forked(false) - { - TAtforkQueueRestarter::Get().RegisterObject(this); - Start(thrnum, maxqueue); - } - + , MaxQueueSize(0) + , ThreadCountExpected(0) + , ThreadCountReal(0) + , Forked(false) + { + TAtforkQueueRestarter::Get().RegisterObject(this); + Start(thrnum, maxqueue); + } + inline ~TImpl() override { - try { - Stop(); - } catch (...) { - // ¯\_(ツ)_/¯ - } - - TAtforkQueueRestarter::Get().UnregisterObject(this); + try { + Stop(); + } catch (...) { + // ¯\_(ツ)_/¯ + } + + TAtforkQueueRestarter::Get().UnregisterObject(this); Y_ASSERT(Tharr.empty()); - } - - inline bool Add(IObjectInQueue* obj) { + } + + inline bool Add(IObjectInQueue* obj) { if (AtomicGet(ShouldTerminate)) { - return false; - } - - if (Tharr.empty()) { - TTsr tsr(Parent_); - obj->Process(tsr); - - return true; - } - - with_lock (QueueMutex) { + return false; + } + + if (Tharr.empty()) { + TTsr tsr(Parent_); + obj->Process(tsr); + + return true; + } + + with_lock (QueueMutex) { while (MaxQueueSize > 0 && Queue.Size() >= MaxQueueSize && !AtomicGet(ShouldTerminate)) { - if (!Blocking) { - return false; - } - QueuePopCond.Wait(QueueMutex); - } - + if (!Blocking) { + return false; + } + QueuePopCond.Wait(QueueMutex); + } + if (AtomicGet(ShouldTerminate)) { - return false; - } - - Queue.Push(obj); + return false; + } + + Queue.Push(obj); } - + QueuePushCond.Signal(); - return true; - } - + return true; + } + inline size_t Size() const noexcept { - auto guard = Guard(QueueMutex); + auto guard = Guard(QueueMutex); - return Queue.Size(); - } + return Queue.Size(); + } inline size_t GetMaxQueueSize() const noexcept { - return MaxQueueSize; - } + return MaxQueueSize; + } inline size_t GetThreadCountExpected() const noexcept { - return ThreadCountExpected; - } + return ThreadCountExpected; + } inline size_t GetThreadCountReal() const noexcept { return ThreadCountReal; } inline void AtforkAction() noexcept Y_NO_SANITIZE("thread") { - Forked = true; - } - + Forked = true; + } + inline bool NeedRestart() const noexcept { - return Forked; - } - -private: - inline void Start(size_t num, size_t maxque) { + return Forked; + } + +private: + inline void Start(size_t num, size_t maxque) { AtomicSet(ShouldTerminate, 0); - MaxQueueSize = maxque; - ThreadCountExpected = num; - - try { - for (size_t i = 0; i < num; ++i) { - Tharr.push_back(Parent_->Pool()->Run(this)); - ++ThreadCountReal; - } - } catch (...) { - Stop(); - - throw; - } - } - - inline void Stop() { + MaxQueueSize = maxque; + ThreadCountExpected = num; + + try { + for (size_t i = 0; i < num; ++i) { + Tharr.push_back(Parent_->Pool()->Run(this)); + ++ThreadCountReal; + } + } catch (...) { + Stop(); + + throw; + } + } + + inline void Stop() { AtomicSet(ShouldTerminate, 1); - with_lock (QueueMutex) { - QueuePopCond.BroadCast(); - } - - if (!NeedRestart()) { - WaitForComplete(); - } - - Tharr.clear(); - ThreadCountExpected = 0; - MaxQueueSize = 0; - } - + with_lock (QueueMutex) { + QueuePopCond.BroadCast(); + } + + if (!NeedRestart()) { + WaitForComplete(); + } + + Tharr.clear(); + ThreadCountExpected = 0; + MaxQueueSize = 0; + } + inline void WaitForComplete() noexcept { - with_lock (StopMutex) { - while (ThreadCountReal) { - with_lock (QueueMutex) { + with_lock (StopMutex) { + while (ThreadCountReal) { + with_lock (QueueMutex) { QueuePushCond.Signal(); - } - - StopCond.Wait(StopMutex); - } - } - } - + } + + StopCond.Wait(StopMutex); + } + } + } + void DoExecute() override { - THolder<TTsr> tsr(new TTsr(Parent_)); - + THolder<TTsr> tsr(new TTsr(Parent_)); + if (Namer) { Namer.SetCurrentThreadName(); } - while (true) { + while (true) { IObjectInQueue* job = nullptr; - - with_lock (QueueMutex) { + + with_lock (QueueMutex) { while (Queue.Empty() && !AtomicGet(ShouldTerminate)) { QueuePushCond.Wait(QueueMutex); - } - + } + if (AtomicGet(ShouldTerminate) && Queue.Empty()) { - tsr.Destroy(); - - break; - } - - job = Queue.Pop(); - } - - QueuePopCond.Signal(); - + tsr.Destroy(); + + break; + } + + job = Queue.Pop(); + } + + QueuePopCond.Signal(); + if (Catching) { - try { + try { try { job->Process(*tsr); } catch (...) { Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl; } - } catch (...) { - // ¯\_(ツ)_/¯ - } + } catch (...) { + // ¯\_(ツ)_/¯ + } } else { job->Process(*tsr); - } - } - - FinishOneThread(); - } - + } + } + + FinishOneThread(); + } + inline void FinishOneThread() noexcept { - auto guard = Guard(StopMutex); - - --ThreadCountReal; - StopCond.Signal(); - } + auto guard = Guard(StopMutex); + + --ThreadCountReal; + StopCond.Signal(); + } -private: +private: TThreadPool* Parent_; const bool Blocking; const bool Catching; TThreadNamer Namer; - mutable TMutex QueueMutex; - mutable TMutex StopMutex; + mutable TMutex QueueMutex; + mutable TMutex StopMutex; TCondVar QueuePushCond; - TCondVar QueuePopCond; - TCondVar StopCond; - TJobQueue Queue; + TCondVar QueuePopCond; + TCondVar StopCond; + TJobQueue Queue; TVector<TThreadRef> Tharr; TAtomic ShouldTerminate; - size_t MaxQueueSize; - size_t ThreadCountExpected; - size_t ThreadCountReal; - bool Forked; - - class TAtforkQueueRestarter { - public: - static TAtforkQueueRestarter& Get() { - return *SingletonWithPriority<TAtforkQueueRestarter, 256>(); - } - - inline void RegisterObject(TImpl* obj) { - auto guard = Guard(ActionMutex); - - RegisteredObjects.PushBack(obj); - } - - inline void UnregisterObject(TImpl* obj) { - auto guard = Guard(ActionMutex); - - obj->Unlink(); - } - - private: - void ChildAction() { - with_lock (ActionMutex) { - for (auto it = RegisteredObjects.Begin(); it != RegisteredObjects.End(); ++it) { - it->AtforkAction(); - } - } - } - - static void ProcessChildAction() { - Get().ChildAction(); - } - - TIntrusiveList<TImpl> RegisteredObjects; - TMutex ActionMutex; - - public: - inline TAtforkQueueRestarter() { -#if defined(_bionic_) -//no pthread_atfork on android libc -#elif defined(_unix_) + size_t MaxQueueSize; + size_t ThreadCountExpected; + size_t ThreadCountReal; + bool Forked; + + class TAtforkQueueRestarter { + public: + static TAtforkQueueRestarter& Get() { + return *SingletonWithPriority<TAtforkQueueRestarter, 256>(); + } + + inline void RegisterObject(TImpl* obj) { + auto guard = Guard(ActionMutex); + + RegisteredObjects.PushBack(obj); + } + + inline void UnregisterObject(TImpl* obj) { + auto guard = Guard(ActionMutex); + + obj->Unlink(); + } + + private: + void ChildAction() { + with_lock (ActionMutex) { + for (auto it = RegisteredObjects.Begin(); it != RegisteredObjects.End(); ++it) { + it->AtforkAction(); + } + } + } + + static void ProcessChildAction() { + Get().ChildAction(); + } + + TIntrusiveList<TImpl> RegisteredObjects; + TMutex ActionMutex; + + public: + inline TAtforkQueueRestarter() { +#if defined(_bionic_) +//no pthread_atfork on android libc +#elif defined(_unix_) pthread_atfork(nullptr, nullptr, ProcessChildAction); #endif - } - }; -}; - + } + }; +}; + TThreadPool::~TThreadPool() = default; - + size_t TThreadPool::Size() const noexcept { - if (!Impl_.Get()) { - return 0; - } - - return Impl_->Size(); -} - + if (!Impl_.Get()) { + return 0; + } + + return Impl_->Size(); +} + size_t TThreadPool::GetThreadCountExpected() const noexcept { if (!Impl_.Get()) { return 0; @@ -351,298 +351,298 @@ size_t TThreadPool::GetMaxQueueSize() const noexcept { bool TThreadPool::Add(IObjectInQueue* obj) { Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started")); - - if (Impl_->NeedRestart()) { - Start(Impl_->GetThreadCountExpected(), Impl_->GetMaxQueueSize()); + + if (Impl_->NeedRestart()) { + Start(Impl_->GetThreadCountExpected(), Impl_->GetMaxQueueSize()); } - return Impl_->Add(obj); + return Impl_->Add(obj); } void TThreadPool::Start(size_t thrnum, size_t maxque) { Impl_.Reset(new TImpl(this, thrnum, maxque, Params)); -} - +} + void TThreadPool::Stop() noexcept { - Impl_.Destroy(); -} - -static TAtomic mtp_queue_counter = 0; - + Impl_.Destroy(); +} + +static TAtomic mtp_queue_counter = 0; + class TAdaptiveThreadPool::TImpl { -public: +public: class TThread: public IThreadFactory::IThreadAble { public: - inline TThread(TImpl* parent) - : Impl_(parent) - , Thread_(Impl_->Parent_->Pool()->Run(this)) - { - } - + inline TThread(TImpl* parent) + : Impl_(parent) + , Thread_(Impl_->Parent_->Pool()->Run(this)) + { + } + inline ~TThread() override { - Impl_->DecThreadCount(); - } - - private: + Impl_->DecThreadCount(); + } + + private: void DoExecute() noexcept override { - THolder<TThread> This(this); - + THolder<TThread> This(this); + if (Impl_->Namer) { Impl_->Namer.SetCurrentThreadName(); } - { - TTsr tsr(Impl_->Parent_); - IObjectInQueue* obj; - + { + TTsr tsr(Impl_->Parent_); + IObjectInQueue* obj; + while ((obj = Impl_->WaitForJob()) != nullptr) { if (Impl_->Catching) { - try { + try { try { obj->Process(tsr); } catch (...) { Cdbg << Impl_->Name() << " " << CurrentExceptionMessage() << Endl; } - } catch (...) { + } catch (...) { // ¯\_(ツ)_/¯ - } + } } else { obj->Process(tsr); - } - } - } - } - - private: - TImpl* Impl_; + } + } + } + } + + private: + TImpl* Impl_; THolder<IThreadFactory::IThread> Thread_; - }; - + }; + inline TImpl(TAdaptiveThreadPool* parent, const TParams& params) - : Parent_(parent) + : Parent_(parent) , Catching(params.Catching_) , Namer(params) - , ThrCount_(0) - , AllDone_(false) + , ThrCount_(0) + , AllDone_(false) , Obj_(nullptr) - , Free_(0) - , IdleTime_(TDuration::Max()) - { - sprintf(Name_, "[mtp queue %ld]", (long)AtomicAdd(mtp_queue_counter, 1)); - } - + , Free_(0) + , IdleTime_(TDuration::Max()) + { + sprintf(Name_, "[mtp queue %ld]", (long)AtomicAdd(mtp_queue_counter, 1)); + } + inline ~TImpl() { - Stop(); - } - - inline void SetMaxIdleTime(TDuration idleTime) { - IdleTime_ = idleTime; - } - + Stop(); + } + + inline void SetMaxIdleTime(TDuration idleTime) { + IdleTime_ = idleTime; + } + inline const char* Name() const noexcept { - return Name_; - } - - inline void Add(IObjectInQueue* obj) { - with_lock (Mutex_) { + return Name_; + } + + inline void Add(IObjectInQueue* obj) { + with_lock (Mutex_) { while (Obj_ != nullptr) { - CondFree_.Wait(Mutex_); - } - - if (Free_ == 0) { - AddThreadNoLock(); - } - - Obj_ = obj; - + CondFree_.Wait(Mutex_); + } + + if (Free_ == 0) { + AddThreadNoLock(); + } + + Obj_ = obj; + Y_ENSURE_EX(!AllDone_, TThreadPoolException() << TStringBuf("adding to a stopped queue")); - } - - CondReady_.Signal(); - } - - inline void AddThreads(size_t n) { - with_lock (Mutex_) { - while (n) { - AddThreadNoLock(); - - --n; - } - } - } - + } + + CondReady_.Signal(); + } + + inline void AddThreads(size_t n) { + with_lock (Mutex_) { + while (n) { + AddThreadNoLock(); + + --n; + } + } + } + inline size_t Size() const noexcept { - return (size_t)ThrCount_; - } - -private: + return (size_t)ThrCount_; + } + +private: inline void IncThreadCount() noexcept { - AtomicAdd(ThrCount_, 1); - } - + AtomicAdd(ThrCount_, 1); + } + inline void DecThreadCount() noexcept { - AtomicAdd(ThrCount_, -1); - } - - inline void AddThreadNoLock() { - IncThreadCount(); - - try { - new TThread(this); - } catch (...) { - DecThreadCount(); - - throw; - } - } - + AtomicAdd(ThrCount_, -1); + } + + inline void AddThreadNoLock() { + IncThreadCount(); + + try { + new TThread(this); + } catch (...) { + DecThreadCount(); + + throw; + } + } + inline void Stop() noexcept { - Mutex_.Acquire(); - - AllDone_ = true; - + Mutex_.Acquire(); + + AllDone_ = true; + while (AtomicGet(ThrCount_)) { - Mutex_.Release(); - CondReady_.Signal(); - Mutex_.Acquire(); - } - - Mutex_.Release(); - } - + Mutex_.Release(); + CondReady_.Signal(); + Mutex_.Acquire(); + } + + Mutex_.Release(); + } + inline IObjectInQueue* WaitForJob() noexcept { - Mutex_.Acquire(); - - ++Free_; - - while (!Obj_ && !AllDone_) { + Mutex_.Acquire(); + + ++Free_; + + while (!Obj_ && !AllDone_) { if (!CondReady_.WaitT(Mutex_, IdleTime_)) { - break; - } - } - - IObjectInQueue* ret = Obj_; + break; + } + } + + IObjectInQueue* ret = Obj_; Obj_ = nullptr; - - --Free_; - - Mutex_.Release(); - CondFree_.Signal(); - - return ret; - } - -private: + + --Free_; + + Mutex_.Release(); + CondFree_.Signal(); + + return ret; + } + +private: TAdaptiveThreadPool* Parent_; const bool Catching; TThreadNamer Namer; - TAtomic ThrCount_; - TMutex Mutex_; - TCondVar CondReady_; - TCondVar CondFree_; - bool AllDone_; - IObjectInQueue* Obj_; - size_t Free_; - char Name_[64]; - TDuration IdleTime_; -}; - + TAtomic ThrCount_; + TMutex Mutex_; + TCondVar CondReady_; + TCondVar CondFree_; + bool AllDone_; + IObjectInQueue* Obj_; + size_t Free_; + char Name_[64]; + TDuration IdleTime_; +}; + TThreadPoolBase::TThreadPoolBase(const TParams& params) : TThreadFactoryHolder(params.Factory_) , Params(params) -{ -} - +{ +} + #define DEFINE_THREAD_POOL_CTORS(type) \ - type::type(const TParams& params) \ - : TThreadPoolBase(params) \ - { \ - } + type::type(const TParams& params) \ + : TThreadPoolBase(params) \ + { \ + } DEFINE_THREAD_POOL_CTORS(TThreadPool) DEFINE_THREAD_POOL_CTORS(TAdaptiveThreadPool) DEFINE_THREAD_POOL_CTORS(TSimpleThreadPool) TAdaptiveThreadPool::~TAdaptiveThreadPool() = default; - + bool TAdaptiveThreadPool::Add(IObjectInQueue* obj) { Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started")); - - Impl_->Add(obj); - - return true; -} - + + Impl_->Add(obj); + + return true; +} + void TAdaptiveThreadPool::Start(size_t, size_t) { Impl_.Reset(new TImpl(this, Params)); -} - +} + void TAdaptiveThreadPool::Stop() noexcept { - Impl_.Destroy(); -} - + Impl_.Destroy(); +} + size_t TAdaptiveThreadPool::Size() const noexcept { - if (Impl_.Get()) { - return Impl_->Size(); - } - - return 0; -} - + if (Impl_.Get()) { + return Impl_->Size(); + } + + return 0; +} + void TAdaptiveThreadPool::SetMaxIdleTime(TDuration interval) { Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started")); - + Impl_->SetMaxIdleTime(interval); -} - +} + TSimpleThreadPool::~TSimpleThreadPool() { - try { - Stop(); - } catch (...) { - // ¯\_(ツ)_/¯ - } -} - + try { + Stop(); + } catch (...) { + // ¯\_(ツ)_/¯ + } +} + bool TSimpleThreadPool::Add(IObjectInQueue* obj) { Y_ENSURE_EX(Slave_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started")); - - return Slave_->Add(obj); -} - + + return Slave_->Add(obj); +} + void TSimpleThreadPool::Start(size_t thrnum, size_t maxque) { THolder<IThreadPool> tmp; TAdaptiveThreadPool* adaptive(nullptr); - - if (thrnum) { + + if (thrnum) { tmp.Reset(new TThreadPoolBinder<TThreadPool, TSimpleThreadPool>(this, Params)); - } else { + } else { adaptive = new TThreadPoolBinder<TAdaptiveThreadPool, TSimpleThreadPool>(this, Params); - tmp.Reset(adaptive); - } - - tmp->Start(thrnum, maxque); - - if (adaptive) { + tmp.Reset(adaptive); + } + + tmp->Start(thrnum, maxque); + + if (adaptive) { adaptive->SetMaxIdleTime(TDuration::Seconds(100)); - } - - Slave_.Swap(tmp); -} - + } + + Slave_.Swap(tmp); +} + void TSimpleThreadPool::Stop() noexcept { - Slave_.Destroy(); -} - + Slave_.Destroy(); +} + size_t TSimpleThreadPool::Size() const noexcept { - if (Slave_.Get()) { - return Slave_->Size(); - } - - return 0; -} - + if (Slave_.Get()) { + return Slave_->Size(); + } + + return 0; +} + namespace { - class TOwnedObjectInQueue: public IObjectInQueue { + class TOwnedObjectInQueue: public IObjectInQueue { private: THolder<IObjectInQueue> Owned; @@ -661,7 +661,7 @@ namespace { void IThreadPool::SafeAdd(IObjectInQueue* obj) { Y_ENSURE_EX(Add(obj), TThreadPoolException() << TStringBuf("can not add object to queue")); -} +} void IThreadPool::SafeAddAndOwn(THolder<IObjectInQueue> obj) { Y_ENSURE_EX(AddAndOwn(std::move(obj)), TThreadPoolException() << TStringBuf("can not add to queue and own")); @@ -678,87 +678,87 @@ bool IThreadPool::AddAndOwn(THolder<IObjectInQueue> obj) { using IThread = IThreadFactory::IThread; using IThreadAble = IThreadFactory::IThreadAble; - -namespace { - class TPoolThread: public IThread { - class TThreadImpl: public IObjectInQueue, public TAtomicRefCount<TThreadImpl> { - public: - inline TThreadImpl(IThreadAble* func) - : Func_(func) - { - } - + +namespace { + class TPoolThread: public IThread { + class TThreadImpl: public IObjectInQueue, public TAtomicRefCount<TThreadImpl> { + public: + inline TThreadImpl(IThreadAble* func) + : Func_(func) + { + } + ~TThreadImpl() override = default; - + inline void WaitForStart() noexcept { - StartEvent_.Wait(); - } - + StartEvent_.Wait(); + } + inline void WaitForComplete() noexcept { - CompleteEvent_.Wait(); - } - - private: + CompleteEvent_.Wait(); + } + + private: void Process(void* /*tsr*/) override { - TThreadImplRef This(this); - - { - StartEvent_.Signal(); - - try { - Func_->Execute(); - } catch (...) { - // ¯\_(ツ)_/¯ - } - - CompleteEvent_.Signal(); - } - } - - private: - IThreadAble* Func_; + TThreadImplRef This(this); + + { + StartEvent_.Signal(); + + try { + Func_->Execute(); + } catch (...) { + // ¯\_(ツ)_/¯ + } + + CompleteEvent_.Signal(); + } + } + + private: + IThreadAble* Func_; TSystemEvent CompleteEvent_; TSystemEvent StartEvent_; - }; - + }; + using TThreadImplRef = TIntrusivePtr<TThreadImpl>; - - public: + + public: inline TPoolThread(IThreadPool* parent) - : Parent_(parent) - { - } - + : Parent_(parent) + { + } + ~TPoolThread() override { - if (Impl_) { - Impl_->WaitForStart(); - } - } - - private: + if (Impl_) { + Impl_->WaitForStart(); + } + } + + private: void DoRun(IThreadAble* func) override { - TThreadImplRef impl(new TThreadImpl(func)); - - Parent_->SafeAdd(impl.Get()); - Impl_.Swap(impl); - } - + TThreadImplRef impl(new TThreadImpl(func)); + + Parent_->SafeAdd(impl.Get()); + Impl_.Swap(impl); + } + void DoJoin() noexcept override { - if (Impl_) { - Impl_->WaitForComplete(); + if (Impl_) { + Impl_->WaitForComplete(); Impl_ = nullptr; - } - } - - private: + } + } + + private: IThreadPool* Parent_; - TThreadImplRef Impl_; - }; -} - + TThreadImplRef Impl_; + }; +} + IThread* IThreadPool::DoCreate() { - return new TPoolThread(this); -} + return new TPoolThread(this); +} THolder<IThreadPool> CreateThreadPool(size_t threadsCount, size_t queueSizeLimit, const TThreadPoolParams& params) { THolder<IThreadPool> queue; |