diff options
author | trofimenkov <trofimenkov@yandex-team.ru> | 2022-02-10 16:49:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:30 +0300 |
commit | 30cebc2cfa79af3b577760a113e203a79450e6b6 (patch) | |
tree | 49327bf3c28fab534b04b312a39179e70f7c2763 /util/thread/pool.cpp | |
parent | a2d2743094c8d255cda4011b317235874db4d01c (diff) | |
download | ydb-30cebc2cfa79af3b577760a113e203a79450e6b6.tar.gz |
Restoring authorship annotation for <trofimenkov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'util/thread/pool.cpp')
-rw-r--r-- | util/thread/pool.cpp | 172 |
1 files changed, 86 insertions, 86 deletions
diff --git a/util/thread/pool.cpp b/util/thread/pool.cpp index 05fad02e9b..2e2edf9488 100644 --- a/util/thread/pool.cpp +++ b/util/thread/pool.cpp @@ -14,52 +14,52 @@ #include <util/generic/fastqueue.h> #include <util/stream/output.h> -#include <util/string/builder.h> +#include <util/string/builder.h> #include <util/system/event.h> #include <util/system/mutex.h> #include <util/system/atomic.h> #include <util/system/condvar.h> -#include <util/system/thread.h> +#include <util/system/thread.h> #include <util/datetime/base.h> #include "factory.h" #include "pool.h" -namespace { - class TThreadNamer { - public: - TThreadNamer(const IThreadPool::TParams& params) - : ThreadName(params.ThreadName_) - , EnumerateThreads(params.EnumerateThreads_) - { - } - +namespace { + class TThreadNamer { + public: + TThreadNamer(const IThreadPool::TParams& params) + : ThreadName(params.ThreadName_) + , EnumerateThreads(params.EnumerateThreads_) + { + } + explicit operator bool() const { - return !ThreadName.empty(); - } - - void SetCurrentThreadName() { - if (EnumerateThreads) { - Set(TStringBuilder() << ThreadName << (Index++)); - } else { - Set(ThreadName); - } - } - - private: - void Set(const TString& name) { - TThread::SetCurrentThreadName(name.c_str()); - } - - private: - TString ThreadName; - bool EnumerateThreads = false; - std::atomic<ui64> Index{0}; - }; -} - + return !ThreadName.empty(); + } + + void SetCurrentThreadName() { + if (EnumerateThreads) { + Set(TStringBuilder() << ThreadName << (Index++)); + } else { + Set(ThreadName); + } + } + + private: + void Set(const TString& name) { + TThread::SetCurrentThreadName(name.c_str()); + } + + private: + TString ThreadName; + bool EnumerateThreads = false; + std::atomic<ui64> Index{0}; + }; +} + TThreadFactoryHolder::TThreadFactoryHolder() noexcept : Pool_(SystemThreadFactory()) { @@ -71,11 +71,11 @@ class TThreadPool::TImpl: public TIntrusiveListItem<TImpl>, public IThreadFactor using TThreadRef = THolder<IThreadFactory::IThread>; public: - inline TImpl(TThreadPool* parent, size_t thrnum, size_t maxqueue, const TParams& params) + inline TImpl(TThreadPool* parent, size_t thrnum, size_t maxqueue, const TParams& params) : Parent_(parent) - , Blocking(params.Blocking_) - , Catching(params.Catching_) - , Namer(params) + , Blocking(params.Blocking_) + , Catching(params.Catching_) + , Namer(params) , ShouldTerminate(1) , MaxQueueSize(0) , ThreadCountExpected(0) @@ -204,10 +204,10 @@ private: void DoExecute() override { THolder<TTsr> tsr(new TTsr(Parent_)); - if (Namer) { - Namer.SetCurrentThreadName(); - } - + if (Namer) { + Namer.SetCurrentThreadName(); + } + while (true) { IObjectInQueue* job = nullptr; @@ -227,18 +227,18 @@ private: QueuePopCond.Signal(); - if (Catching) { + if (Catching) { try { - try { - job->Process(*tsr); - } catch (...) { - Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl; - } + try { + job->Process(*tsr); + } catch (...) { + Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl; + } } catch (...) { // ¯\_(ツ)_/¯ } - } else { - job->Process(*tsr); + } else { + job->Process(*tsr); } } @@ -254,9 +254,9 @@ private: private: TThreadPool* Parent_; - const bool Blocking; - const bool Catching; - TThreadNamer Namer; + const bool Blocking; + const bool Catching; + TThreadNamer Namer; mutable TMutex QueueMutex; mutable TMutex StopMutex; TCondVar QueuePushCond; @@ -360,7 +360,7 @@ bool TThreadPool::Add(IObjectInQueue* obj) { } void TThreadPool::Start(size_t thrnum, size_t maxque) { - Impl_.Reset(new TImpl(this, thrnum, maxque, Params)); + Impl_.Reset(new TImpl(this, thrnum, maxque, Params)); } void TThreadPool::Stop() noexcept { @@ -387,27 +387,27 @@ public: void DoExecute() noexcept override { THolder<TThread> This(this); - if (Impl_->Namer) { - Impl_->Namer.SetCurrentThreadName(); - } - + if (Impl_->Namer) { + Impl_->Namer.SetCurrentThreadName(); + } + { TTsr tsr(Impl_->Parent_); IObjectInQueue* obj; while ((obj = Impl_->WaitForJob()) != nullptr) { - if (Impl_->Catching) { + if (Impl_->Catching) { try { - try { - obj->Process(tsr); - } catch (...) { - Cdbg << Impl_->Name() << " " << CurrentExceptionMessage() << Endl; - } + try { + obj->Process(tsr); + } catch (...) { + Cdbg << Impl_->Name() << " " << CurrentExceptionMessage() << Endl; + } } catch (...) { - // ¯\_(ツ)_/¯ + // ¯\_(ツ)_/¯ } - } else { - obj->Process(tsr); + } else { + obj->Process(tsr); } } } @@ -418,10 +418,10 @@ public: THolder<IThreadFactory::IThread> Thread_; }; - inline TImpl(TAdaptiveThreadPool* parent, const TParams& params) + inline TImpl(TAdaptiveThreadPool* parent, const TParams& params) : Parent_(parent) - , Catching(params.Catching_) - , Namer(params) + , Catching(params.Catching_) + , Namer(params) , ThrCount_(0) , AllDone_(false) , Obj_(nullptr) @@ -534,8 +534,8 @@ private: private: TAdaptiveThreadPool* Parent_; - const bool Catching; - TThreadNamer Namer; + const bool Catching; + TThreadNamer Namer; TAtomic ThrCount_; TMutex Mutex_; TCondVar CondReady_; @@ -547,22 +547,22 @@ private: TDuration IdleTime_; }; -TThreadPoolBase::TThreadPoolBase(const TParams& params) - : TThreadFactoryHolder(params.Factory_) - , Params(params) +TThreadPoolBase::TThreadPoolBase(const TParams& params) + : TThreadFactoryHolder(params.Factory_) + , Params(params) { } -#define DEFINE_THREAD_POOL_CTORS(type) \ +#define DEFINE_THREAD_POOL_CTORS(type) \ type::type(const TParams& params) \ : TThreadPoolBase(params) \ { \ } - -DEFINE_THREAD_POOL_CTORS(TThreadPool) -DEFINE_THREAD_POOL_CTORS(TAdaptiveThreadPool) -DEFINE_THREAD_POOL_CTORS(TSimpleThreadPool) - + +DEFINE_THREAD_POOL_CTORS(TThreadPool) +DEFINE_THREAD_POOL_CTORS(TAdaptiveThreadPool) +DEFINE_THREAD_POOL_CTORS(TSimpleThreadPool) + TAdaptiveThreadPool::~TAdaptiveThreadPool() = default; bool TAdaptiveThreadPool::Add(IObjectInQueue* obj) { @@ -574,7 +574,7 @@ bool TAdaptiveThreadPool::Add(IObjectInQueue* obj) { } void TAdaptiveThreadPool::Start(size_t, size_t) { - Impl_.Reset(new TImpl(this, Params)); + Impl_.Reset(new TImpl(this, Params)); } void TAdaptiveThreadPool::Stop() noexcept { @@ -614,9 +614,9 @@ void TSimpleThreadPool::Start(size_t thrnum, size_t maxque) { TAdaptiveThreadPool* adaptive(nullptr); if (thrnum) { - tmp.Reset(new TThreadPoolBinder<TThreadPool, TSimpleThreadPool>(this, Params)); + tmp.Reset(new TThreadPoolBinder<TThreadPool, TSimpleThreadPool>(this, Params)); } else { - adaptive = new TThreadPoolBinder<TAdaptiveThreadPool, TSimpleThreadPool>(this, Params); + adaptive = new TThreadPoolBinder<TAdaptiveThreadPool, TSimpleThreadPool>(this, Params); tmp.Reset(adaptive); } @@ -760,10 +760,10 @@ IThread* IThreadPool::DoCreate() { return new TPoolThread(this); } -THolder<IThreadPool> CreateThreadPool(size_t threadsCount, size_t queueSizeLimit, const TThreadPoolParams& params) { +THolder<IThreadPool> CreateThreadPool(size_t threadsCount, size_t queueSizeLimit, const TThreadPoolParams& params) { THolder<IThreadPool> queue; if (threadsCount > 1) { - queue.Reset(new TThreadPool(params)); + queue.Reset(new TThreadPool(params)); } else { queue.Reset(new TFakeThreadPool()); } |