diff options
author | trofimenkov <[email protected]> | 2022-02-10 16:49:31 +0300 |
---|---|---|
committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:49:31 +0300 |
commit | 7c6139b61ced2798d1134b68b8facf6925a36b8e (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /util/thread | |
parent | 30cebc2cfa79af3b577760a113e203a79450e6b6 (diff) |
Restoring authorship annotation for <[email protected]>. Commit 2 of 2.
Diffstat (limited to 'util/thread')
-rw-r--r-- | util/thread/pool.cpp | 172 | ||||
-rw-r--r-- | util/thread/pool.h | 142 | ||||
-rw-r--r-- | util/thread/pool_ut.cpp | 176 |
3 files changed, 245 insertions, 245 deletions
diff --git a/util/thread/pool.cpp b/util/thread/pool.cpp index 2e2edf94888..05fad02e9b9 100644 --- a/util/thread/pool.cpp +++ b/util/thread/pool.cpp @@ -14,52 +14,52 @@ #include <util/generic/fastqueue.h> #include <util/stream/output.h> -#include <util/string/builder.h> +#include <util/string/builder.h> #include <util/system/event.h> #include <util/system/mutex.h> #include <util/system/atomic.h> #include <util/system/condvar.h> -#include <util/system/thread.h> +#include <util/system/thread.h> #include <util/datetime/base.h> #include "factory.h" #include "pool.h" -namespace { - class TThreadNamer { - public: - TThreadNamer(const IThreadPool::TParams& params) - : ThreadName(params.ThreadName_) - , EnumerateThreads(params.EnumerateThreads_) - { - } - +namespace { + class TThreadNamer { + public: + TThreadNamer(const IThreadPool::TParams& params) + : ThreadName(params.ThreadName_) + , EnumerateThreads(params.EnumerateThreads_) + { + } + explicit operator bool() const { - return !ThreadName.empty(); - } - - void SetCurrentThreadName() { - if (EnumerateThreads) { - Set(TStringBuilder() << ThreadName << (Index++)); - } else { - Set(ThreadName); - } - } - - private: - void Set(const TString& name) { - TThread::SetCurrentThreadName(name.c_str()); - } - - private: - TString ThreadName; - bool EnumerateThreads = false; - std::atomic<ui64> Index{0}; - }; -} - + return !ThreadName.empty(); + } + + void SetCurrentThreadName() { + if (EnumerateThreads) { + Set(TStringBuilder() << ThreadName << (Index++)); + } else { + Set(ThreadName); + } + } + + private: + void Set(const TString& name) { + TThread::SetCurrentThreadName(name.c_str()); + } + + private: + TString ThreadName; + bool EnumerateThreads = false; + std::atomic<ui64> Index{0}; + }; +} + TThreadFactoryHolder::TThreadFactoryHolder() noexcept : Pool_(SystemThreadFactory()) { @@ -71,11 +71,11 @@ class TThreadPool::TImpl: public TIntrusiveListItem<TImpl>, public IThreadFactor using TThreadRef = THolder<IThreadFactory::IThread>; public: - inline TImpl(TThreadPool* parent, size_t thrnum, size_t maxqueue, const TParams& params) + inline TImpl(TThreadPool* parent, size_t thrnum, size_t maxqueue, const TParams& params) : Parent_(parent) - , Blocking(params.Blocking_) - , Catching(params.Catching_) - , Namer(params) + , Blocking(params.Blocking_) + , Catching(params.Catching_) + , Namer(params) , ShouldTerminate(1) , MaxQueueSize(0) , ThreadCountExpected(0) @@ -204,10 +204,10 @@ private: void DoExecute() override { THolder<TTsr> tsr(new TTsr(Parent_)); - if (Namer) { - Namer.SetCurrentThreadName(); - } - + if (Namer) { + Namer.SetCurrentThreadName(); + } + while (true) { IObjectInQueue* job = nullptr; @@ -227,18 +227,18 @@ private: QueuePopCond.Signal(); - if (Catching) { + if (Catching) { try { - try { - job->Process(*tsr); - } catch (...) { - Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl; - } + try { + job->Process(*tsr); + } catch (...) { + Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl; + } } catch (...) { // ¯\_(ツ)_/¯ } - } else { - job->Process(*tsr); + } else { + job->Process(*tsr); } } @@ -254,9 +254,9 @@ private: private: TThreadPool* Parent_; - const bool Blocking; - const bool Catching; - TThreadNamer Namer; + const bool Blocking; + const bool Catching; + TThreadNamer Namer; mutable TMutex QueueMutex; mutable TMutex StopMutex; TCondVar QueuePushCond; @@ -360,7 +360,7 @@ bool TThreadPool::Add(IObjectInQueue* obj) { } void TThreadPool::Start(size_t thrnum, size_t maxque) { - Impl_.Reset(new TImpl(this, thrnum, maxque, Params)); + Impl_.Reset(new TImpl(this, thrnum, maxque, Params)); } void TThreadPool::Stop() noexcept { @@ -387,27 +387,27 @@ public: void DoExecute() noexcept override { THolder<TThread> This(this); - if (Impl_->Namer) { - Impl_->Namer.SetCurrentThreadName(); - } - + if (Impl_->Namer) { + Impl_->Namer.SetCurrentThreadName(); + } + { TTsr tsr(Impl_->Parent_); IObjectInQueue* obj; while ((obj = Impl_->WaitForJob()) != nullptr) { - if (Impl_->Catching) { + if (Impl_->Catching) { try { - try { - obj->Process(tsr); - } catch (...) { - Cdbg << Impl_->Name() << " " << CurrentExceptionMessage() << Endl; - } + try { + obj->Process(tsr); + } catch (...) { + Cdbg << Impl_->Name() << " " << CurrentExceptionMessage() << Endl; + } } catch (...) { - // ¯\_(ツ)_/¯ + // ¯\_(ツ)_/¯ } - } else { - obj->Process(tsr); + } else { + obj->Process(tsr); } } } @@ -418,10 +418,10 @@ public: THolder<IThreadFactory::IThread> Thread_; }; - inline TImpl(TAdaptiveThreadPool* parent, const TParams& params) + inline TImpl(TAdaptiveThreadPool* parent, const TParams& params) : Parent_(parent) - , Catching(params.Catching_) - , Namer(params) + , Catching(params.Catching_) + , Namer(params) , ThrCount_(0) , AllDone_(false) , Obj_(nullptr) @@ -534,8 +534,8 @@ private: private: TAdaptiveThreadPool* Parent_; - const bool Catching; - TThreadNamer Namer; + const bool Catching; + TThreadNamer Namer; TAtomic ThrCount_; TMutex Mutex_; TCondVar CondReady_; @@ -547,22 +547,22 @@ private: TDuration IdleTime_; }; -TThreadPoolBase::TThreadPoolBase(const TParams& params) - : TThreadFactoryHolder(params.Factory_) - , Params(params) +TThreadPoolBase::TThreadPoolBase(const TParams& params) + : TThreadFactoryHolder(params.Factory_) + , Params(params) { } -#define DEFINE_THREAD_POOL_CTORS(type) \ +#define DEFINE_THREAD_POOL_CTORS(type) \ type::type(const TParams& params) \ : TThreadPoolBase(params) \ { \ } - -DEFINE_THREAD_POOL_CTORS(TThreadPool) -DEFINE_THREAD_POOL_CTORS(TAdaptiveThreadPool) -DEFINE_THREAD_POOL_CTORS(TSimpleThreadPool) - + +DEFINE_THREAD_POOL_CTORS(TThreadPool) +DEFINE_THREAD_POOL_CTORS(TAdaptiveThreadPool) +DEFINE_THREAD_POOL_CTORS(TSimpleThreadPool) + TAdaptiveThreadPool::~TAdaptiveThreadPool() = default; bool TAdaptiveThreadPool::Add(IObjectInQueue* obj) { @@ -574,7 +574,7 @@ bool TAdaptiveThreadPool::Add(IObjectInQueue* obj) { } void TAdaptiveThreadPool::Start(size_t, size_t) { - Impl_.Reset(new TImpl(this, Params)); + Impl_.Reset(new TImpl(this, Params)); } void TAdaptiveThreadPool::Stop() noexcept { @@ -614,9 +614,9 @@ void TSimpleThreadPool::Start(size_t thrnum, size_t maxque) { TAdaptiveThreadPool* adaptive(nullptr); if (thrnum) { - tmp.Reset(new TThreadPoolBinder<TThreadPool, TSimpleThreadPool>(this, Params)); + tmp.Reset(new TThreadPoolBinder<TThreadPool, TSimpleThreadPool>(this, Params)); } else { - adaptive = new TThreadPoolBinder<TAdaptiveThreadPool, TSimpleThreadPool>(this, Params); + adaptive = new TThreadPoolBinder<TAdaptiveThreadPool, TSimpleThreadPool>(this, Params); tmp.Reset(adaptive); } @@ -760,10 +760,10 @@ IThread* IThreadPool::DoCreate() { return new TPoolThread(this); } -THolder<IThreadPool> CreateThreadPool(size_t threadsCount, size_t queueSizeLimit, const TThreadPoolParams& params) { +THolder<IThreadPool> CreateThreadPool(size_t threadsCount, size_t queueSizeLimit, const TThreadPoolParams& params) { THolder<IThreadPool> queue; if (threadsCount > 1) { - queue.Reset(new TThreadPool(params)); + queue.Reset(new TThreadPool(params)); } else { queue.Reset(new TFakeThreadPool()); } diff --git a/util/thread/pool.h b/util/thread/pool.h index 79e37050e49..d1ea3a67cb6 100644 --- a/util/thread/pool.h +++ b/util/thread/pool.h @@ -80,66 +80,66 @@ IObjectInQueue* MakeThrFuncObj(T&& func) { return new TThrFuncObj<std::remove_cv_t<std::remove_reference_t<T>>>(std::forward<T>(func)); } -struct TThreadPoolParams { - bool Catching_ = true; - bool Blocking_ = false; - IThreadFactory* Factory_ = SystemThreadFactory(); - TString ThreadName_; - bool EnumerateThreads_ = false; - - using TSelf = TThreadPoolParams; - - TThreadPoolParams() { - } - - TThreadPoolParams(IThreadFactory* factory) - : Factory_(factory) - { - } - - TThreadPoolParams(const TString& name) { - SetThreadName(name); - } - - TThreadPoolParams(const char* name) { - SetThreadName(name); - } - - TSelf& SetCatching(bool val) { - Catching_ = val; - return *this; - } - - TSelf& SetBlocking(bool val) { - Blocking_ = val; - return *this; - } - - TSelf& SetFactory(IThreadFactory* factory) { - Factory_ = factory; - return *this; - } - - TSelf& SetThreadName(const TString& name) { - ThreadName_ = name; - EnumerateThreads_ = false; - return *this; - } - - TSelf& SetThreadNamePrefix(const TString& prefix) { - ThreadName_ = prefix; - EnumerateThreads_ = true; - return *this; - } -}; - +struct TThreadPoolParams { + bool Catching_ = true; + bool Blocking_ = false; + IThreadFactory* Factory_ = SystemThreadFactory(); + TString ThreadName_; + bool EnumerateThreads_ = false; + + using TSelf = TThreadPoolParams; + + TThreadPoolParams() { + } + + TThreadPoolParams(IThreadFactory* factory) + : Factory_(factory) + { + } + + TThreadPoolParams(const TString& name) { + SetThreadName(name); + } + + TThreadPoolParams(const char* name) { + SetThreadName(name); + } + + TSelf& SetCatching(bool val) { + Catching_ = val; + return *this; + } + + TSelf& SetBlocking(bool val) { + Blocking_ = val; + return *this; + } + + TSelf& SetFactory(IThreadFactory* factory) { + Factory_ = factory; + return *this; + } + + TSelf& SetThreadName(const TString& name) { + ThreadName_ = name; + EnumerateThreads_ = false; + return *this; + } + + TSelf& SetThreadNamePrefix(const TString& prefix) { + ThreadName_ = prefix; + EnumerateThreads_ = true; + return *this; + } +}; + /** * A queue processed simultaneously by several threads */ class IThreadPool: public IThreadFactory, public TNonCopyable { public: - using TParams = TThreadPoolParams; - + using TParams = TThreadPoolParams; + ~IThreadPool() override = default; /** @@ -255,18 +255,18 @@ public: } }; -class TThreadPoolBase: public IThreadPool, public TThreadFactoryHolder { -public: - TThreadPoolBase(const TParams& params); - -protected: - TParams Params; -}; - +class TThreadPoolBase: public IThreadPool, public TThreadFactoryHolder { +public: + TThreadPoolBase(const TParams& params); + +protected: + TParams Params; +}; + /** queue processed by fixed size thread pool */ -class TThreadPool: public TThreadPoolBase { +class TThreadPool: public TThreadPoolBase { public: - TThreadPool(const TParams& params = {}); + TThreadPool(const TParams& params = {}); ~TThreadPool() override; bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT; @@ -290,9 +290,9 @@ private: * Always create new thread for new task, when all existing threads are busy. * Maybe dangerous, number of threads is not limited. */ -class TAdaptiveThreadPool: public TThreadPoolBase { +class TAdaptiveThreadPool: public TThreadPoolBase { public: - TAdaptiveThreadPool(const TParams& params = {}); + TAdaptiveThreadPool(const TParams& params = {}); ~TAdaptiveThreadPool() override; /** @@ -308,15 +308,15 @@ public: void Stop() noexcept override; size_t Size() const noexcept override; -private: +private: class TImpl; THolder<TImpl> Impl_; }; /** Behave like TThreadPool or TAdaptiveThreadPool, choosen by thrnum parameter of Start() */ -class TSimpleThreadPool: public TThreadPoolBase { +class TSimpleThreadPool: public TThreadPoolBase { public: - TSimpleThreadPool(const TParams& params = {}); + TSimpleThreadPool(const TParams& params = {}); ~TSimpleThreadPool() override; bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT; @@ -387,4 +387,4 @@ inline void Delete(THolder<IThreadPool> q) { * Creates and starts TThreadPool if threadsCount > 1, or TFakeThreadPool otherwise * You could specify blocking and catching modes for TThreadPool only */ -THolder<IThreadPool> CreateThreadPool(size_t threadCount, size_t queueSizeLimit = 0, const IThreadPool::TParams& params = {}); +THolder<IThreadPool> CreateThreadPool(size_t threadCount, size_t queueSizeLimit = 0, const IThreadPool::TParams& params = {}); diff --git a/util/thread/pool_ut.cpp b/util/thread/pool_ut.cpp index 189e4865e19..893770d0c47 100644 --- a/util/thread/pool_ut.cpp +++ b/util/thread/pool_ut.cpp @@ -5,9 +5,9 @@ #include <util/stream/output.h> #include <util/random/fast.h> #include <util/system/spinlock.h> -#include <util/system/thread.h> -#include <util/system/mutex.h> -#include <util/system/condvar.h> +#include <util/system/thread.h> +#include <util/system/mutex.h> +#include <util/system/condvar.h> struct TThreadPoolTest { TSpinLock Lock; @@ -99,7 +99,7 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) { Y_UNIT_TEST(TestTThreadPoolBlocking) { TThreadPoolTest t; - TThreadPool q(TThreadPool::TParams().SetBlocking(true)); + TThreadPool q(TThreadPool::TParams().SetBlocking(true)); t.TestAnyQueue(&q, 100); } @@ -132,32 +132,32 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) { ); UNIT_ASSERT_VALUES_EQUAL(added, false); } - + Y_UNIT_TEST(TestSafeAddFuncThrows) { TFailAddQueue queue; UNIT_CHECK_GENERATED_EXCEPTION(queue.SafeAddFunc([] {}), TThreadPoolException); } Y_UNIT_TEST(TestFunctionNotCopied) { - struct TFailOnCopy { - TFailOnCopy() { - } - - TFailOnCopy(TFailOnCopy&&) { - } - - TFailOnCopy(const TFailOnCopy&) { + struct TFailOnCopy { + TFailOnCopy() { + } + + TFailOnCopy(TFailOnCopy&&) { + } + + TFailOnCopy(const TFailOnCopy&) { UNIT_FAIL("Don't copy std::function inside TThreadPool"); - } - }; - - TThreadPool queue(TThreadPool::TParams().SetBlocking(false).SetCatching(true)); - queue.Start(2); - + } + }; + + TThreadPool queue(TThreadPool::TParams().SetBlocking(false).SetCatching(true)); + queue.Start(2); + queue.SafeAddFunc([data = TFailOnCopy()]() {}); - - queue.Stop(); - } + + queue.Stop(); + } Y_UNIT_TEST(TestInfoGetters) { TThreadPool queue; @@ -178,80 +178,80 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) { queue.Stop(); } - + void TestFixedThreadName(IThreadPool& pool, const TString& expectedName) { - pool.Start(1); - TString name; - pool.SafeAddFunc([&name]() { - name = TThread::CurrentThreadName(); - }); - pool.Stop(); + pool.Start(1); + TString name; + pool.SafeAddFunc([&name]() { + name = TThread::CurrentThreadName(); + }); + pool.Stop(); if (TThread::CanGetCurrentThreadName()) { UNIT_ASSERT_EQUAL(name, expectedName); UNIT_ASSERT_UNEQUAL(TThread::CurrentThreadName(), expectedName); } - } - - Y_UNIT_TEST(TestFixedThreadName) { - const TString expectedName = "HelloWorld"; - { - TThreadPool pool(TThreadPool::TParams().SetBlocking(true).SetCatching(false).SetThreadName(expectedName)); - TestFixedThreadName(pool, expectedName); - } - { - TAdaptiveThreadPool pool(TThreadPool::TParams().SetThreadName(expectedName)); - TestFixedThreadName(pool, expectedName); - } - } - + } + + Y_UNIT_TEST(TestFixedThreadName) { + const TString expectedName = "HelloWorld"; + { + TThreadPool pool(TThreadPool::TParams().SetBlocking(true).SetCatching(false).SetThreadName(expectedName)); + TestFixedThreadName(pool, expectedName); + } + { + TAdaptiveThreadPool pool(TThreadPool::TParams().SetThreadName(expectedName)); + TestFixedThreadName(pool, expectedName); + } + } + void TestEnumeratedThreadName(IThreadPool& pool, const THashSet<TString>& expectedNames) { - pool.Start(expectedNames.size()); - TMutex lock; - TCondVar allReady; - size_t readyCount = 0; - THashSet<TString> names; - for (size_t i = 0; i < expectedNames.size(); ++i) { - pool.SafeAddFunc([&]() { + pool.Start(expectedNames.size()); + TMutex lock; + TCondVar allReady; + size_t readyCount = 0; + THashSet<TString> names; + for (size_t i = 0; i < expectedNames.size(); ++i) { + pool.SafeAddFunc([&]() { with_lock (lock) { - if (++readyCount == expectedNames.size()) { - allReady.BroadCast(); - } else { - while (readyCount != expectedNames.size()) { - allReady.WaitI(lock); - } - } - names.insert(TThread::CurrentThreadName()); - } - }); - } - pool.Stop(); + if (++readyCount == expectedNames.size()) { + allReady.BroadCast(); + } else { + while (readyCount != expectedNames.size()) { + allReady.WaitI(lock); + } + } + names.insert(TThread::CurrentThreadName()); + } + }); + } + pool.Stop(); if (TThread::CanGetCurrentThreadName()) { UNIT_ASSERT_EQUAL(names, expectedNames); } - } - - Y_UNIT_TEST(TestEnumeratedThreadName) { - const TString namePrefix = "HelloWorld"; - const THashSet<TString> expectedNames = { - "HelloWorld0", - "HelloWorld1", - "HelloWorld2", - "HelloWorld3", - "HelloWorld4", - "HelloWorld5", - "HelloWorld6", - "HelloWorld7", - "HelloWorld8", - "HelloWorld9", - "HelloWorld10", - }; - { - TThreadPool pool(TThreadPool::TParams().SetBlocking(true).SetCatching(false).SetThreadNamePrefix(namePrefix)); - TestEnumeratedThreadName(pool, expectedNames); - } - { - TAdaptiveThreadPool pool(TThreadPool::TParams().SetThreadNamePrefix(namePrefix)); - TestEnumeratedThreadName(pool, expectedNames); - } - } + } + + Y_UNIT_TEST(TestEnumeratedThreadName) { + const TString namePrefix = "HelloWorld"; + const THashSet<TString> expectedNames = { + "HelloWorld0", + "HelloWorld1", + "HelloWorld2", + "HelloWorld3", + "HelloWorld4", + "HelloWorld5", + "HelloWorld6", + "HelloWorld7", + "HelloWorld8", + "HelloWorld9", + "HelloWorld10", + }; + { + TThreadPool pool(TThreadPool::TParams().SetBlocking(true).SetCatching(false).SetThreadNamePrefix(namePrefix)); + TestEnumeratedThreadName(pool, expectedNames); + } + { + TAdaptiveThreadPool pool(TThreadPool::TParams().SetThreadNamePrefix(namePrefix)); + TestEnumeratedThreadName(pool, expectedNames); + } + } } |