aboutsummaryrefslogtreecommitdiffstats
path: root/util/thread/pool.cpp
diff options
context:
space:
mode:
authortrofimenkov <trofimenkov@yandex-team.ru>2022-02-10 16:49:30 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:30 +0300
commit30cebc2cfa79af3b577760a113e203a79450e6b6 (patch)
tree49327bf3c28fab534b04b312a39179e70f7c2763 /util/thread/pool.cpp
parenta2d2743094c8d255cda4011b317235874db4d01c (diff)
downloadydb-30cebc2cfa79af3b577760a113e203a79450e6b6.tar.gz
Restoring authorship annotation for <trofimenkov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'util/thread/pool.cpp')
-rw-r--r--util/thread/pool.cpp172
1 files changed, 86 insertions, 86 deletions
diff --git a/util/thread/pool.cpp b/util/thread/pool.cpp
index 05fad02e9b..2e2edf9488 100644
--- a/util/thread/pool.cpp
+++ b/util/thread/pool.cpp
@@ -14,52 +14,52 @@
#include <util/generic/fastqueue.h>
#include <util/stream/output.h>
-#include <util/string/builder.h>
+#include <util/string/builder.h>
#include <util/system/event.h>
#include <util/system/mutex.h>
#include <util/system/atomic.h>
#include <util/system/condvar.h>
-#include <util/system/thread.h>
+#include <util/system/thread.h>
#include <util/datetime/base.h>
#include "factory.h"
#include "pool.h"
-namespace {
- class TThreadNamer {
- public:
- TThreadNamer(const IThreadPool::TParams& params)
- : ThreadName(params.ThreadName_)
- , EnumerateThreads(params.EnumerateThreads_)
- {
- }
-
+namespace {
+ class TThreadNamer {
+ public:
+ TThreadNamer(const IThreadPool::TParams& params)
+ : ThreadName(params.ThreadName_)
+ , EnumerateThreads(params.EnumerateThreads_)
+ {
+ }
+
explicit operator bool() const {
- return !ThreadName.empty();
- }
-
- void SetCurrentThreadName() {
- if (EnumerateThreads) {
- Set(TStringBuilder() << ThreadName << (Index++));
- } else {
- Set(ThreadName);
- }
- }
-
- private:
- void Set(const TString& name) {
- TThread::SetCurrentThreadName(name.c_str());
- }
-
- private:
- TString ThreadName;
- bool EnumerateThreads = false;
- std::atomic<ui64> Index{0};
- };
-}
-
+ return !ThreadName.empty();
+ }
+
+ void SetCurrentThreadName() {
+ if (EnumerateThreads) {
+ Set(TStringBuilder() << ThreadName << (Index++));
+ } else {
+ Set(ThreadName);
+ }
+ }
+
+ private:
+ void Set(const TString& name) {
+ TThread::SetCurrentThreadName(name.c_str());
+ }
+
+ private:
+ TString ThreadName;
+ bool EnumerateThreads = false;
+ std::atomic<ui64> Index{0};
+ };
+}
+
TThreadFactoryHolder::TThreadFactoryHolder() noexcept
: Pool_(SystemThreadFactory())
{
@@ -71,11 +71,11 @@ class TThreadPool::TImpl: public TIntrusiveListItem<TImpl>, public IThreadFactor
using TThreadRef = THolder<IThreadFactory::IThread>;
public:
- inline TImpl(TThreadPool* parent, size_t thrnum, size_t maxqueue, const TParams& params)
+ inline TImpl(TThreadPool* parent, size_t thrnum, size_t maxqueue, const TParams& params)
: Parent_(parent)
- , Blocking(params.Blocking_)
- , Catching(params.Catching_)
- , Namer(params)
+ , Blocking(params.Blocking_)
+ , Catching(params.Catching_)
+ , Namer(params)
, ShouldTerminate(1)
, MaxQueueSize(0)
, ThreadCountExpected(0)
@@ -204,10 +204,10 @@ private:
void DoExecute() override {
THolder<TTsr> tsr(new TTsr(Parent_));
- if (Namer) {
- Namer.SetCurrentThreadName();
- }
-
+ if (Namer) {
+ Namer.SetCurrentThreadName();
+ }
+
while (true) {
IObjectInQueue* job = nullptr;
@@ -227,18 +227,18 @@ private:
QueuePopCond.Signal();
- if (Catching) {
+ if (Catching) {
try {
- try {
- job->Process(*tsr);
- } catch (...) {
- Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
- }
+ try {
+ job->Process(*tsr);
+ } catch (...) {
+ Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
+ }
} catch (...) {
// ¯\_(ツ)_/¯
}
- } else {
- job->Process(*tsr);
+ } else {
+ job->Process(*tsr);
}
}
@@ -254,9 +254,9 @@ private:
private:
TThreadPool* Parent_;
- const bool Blocking;
- const bool Catching;
- TThreadNamer Namer;
+ const bool Blocking;
+ const bool Catching;
+ TThreadNamer Namer;
mutable TMutex QueueMutex;
mutable TMutex StopMutex;
TCondVar QueuePushCond;
@@ -360,7 +360,7 @@ bool TThreadPool::Add(IObjectInQueue* obj) {
}
void TThreadPool::Start(size_t thrnum, size_t maxque) {
- Impl_.Reset(new TImpl(this, thrnum, maxque, Params));
+ Impl_.Reset(new TImpl(this, thrnum, maxque, Params));
}
void TThreadPool::Stop() noexcept {
@@ -387,27 +387,27 @@ public:
void DoExecute() noexcept override {
THolder<TThread> This(this);
- if (Impl_->Namer) {
- Impl_->Namer.SetCurrentThreadName();
- }
-
+ if (Impl_->Namer) {
+ Impl_->Namer.SetCurrentThreadName();
+ }
+
{
TTsr tsr(Impl_->Parent_);
IObjectInQueue* obj;
while ((obj = Impl_->WaitForJob()) != nullptr) {
- if (Impl_->Catching) {
+ if (Impl_->Catching) {
try {
- try {
- obj->Process(tsr);
- } catch (...) {
- Cdbg << Impl_->Name() << " " << CurrentExceptionMessage() << Endl;
- }
+ try {
+ obj->Process(tsr);
+ } catch (...) {
+ Cdbg << Impl_->Name() << " " << CurrentExceptionMessage() << Endl;
+ }
} catch (...) {
- // ¯\_(ツ)_/¯
+ // ¯\_(ツ)_/¯
}
- } else {
- obj->Process(tsr);
+ } else {
+ obj->Process(tsr);
}
}
}
@@ -418,10 +418,10 @@ public:
THolder<IThreadFactory::IThread> Thread_;
};
- inline TImpl(TAdaptiveThreadPool* parent, const TParams& params)
+ inline TImpl(TAdaptiveThreadPool* parent, const TParams& params)
: Parent_(parent)
- , Catching(params.Catching_)
- , Namer(params)
+ , Catching(params.Catching_)
+ , Namer(params)
, ThrCount_(0)
, AllDone_(false)
, Obj_(nullptr)
@@ -534,8 +534,8 @@ private:
private:
TAdaptiveThreadPool* Parent_;
- const bool Catching;
- TThreadNamer Namer;
+ const bool Catching;
+ TThreadNamer Namer;
TAtomic ThrCount_;
TMutex Mutex_;
TCondVar CondReady_;
@@ -547,22 +547,22 @@ private:
TDuration IdleTime_;
};
-TThreadPoolBase::TThreadPoolBase(const TParams& params)
- : TThreadFactoryHolder(params.Factory_)
- , Params(params)
+TThreadPoolBase::TThreadPoolBase(const TParams& params)
+ : TThreadFactoryHolder(params.Factory_)
+ , Params(params)
{
}
-#define DEFINE_THREAD_POOL_CTORS(type) \
+#define DEFINE_THREAD_POOL_CTORS(type) \
type::type(const TParams& params) \
: TThreadPoolBase(params) \
{ \
}
-
-DEFINE_THREAD_POOL_CTORS(TThreadPool)
-DEFINE_THREAD_POOL_CTORS(TAdaptiveThreadPool)
-DEFINE_THREAD_POOL_CTORS(TSimpleThreadPool)
-
+
+DEFINE_THREAD_POOL_CTORS(TThreadPool)
+DEFINE_THREAD_POOL_CTORS(TAdaptiveThreadPool)
+DEFINE_THREAD_POOL_CTORS(TSimpleThreadPool)
+
TAdaptiveThreadPool::~TAdaptiveThreadPool() = default;
bool TAdaptiveThreadPool::Add(IObjectInQueue* obj) {
@@ -574,7 +574,7 @@ bool TAdaptiveThreadPool::Add(IObjectInQueue* obj) {
}
void TAdaptiveThreadPool::Start(size_t, size_t) {
- Impl_.Reset(new TImpl(this, Params));
+ Impl_.Reset(new TImpl(this, Params));
}
void TAdaptiveThreadPool::Stop() noexcept {
@@ -614,9 +614,9 @@ void TSimpleThreadPool::Start(size_t thrnum, size_t maxque) {
TAdaptiveThreadPool* adaptive(nullptr);
if (thrnum) {
- tmp.Reset(new TThreadPoolBinder<TThreadPool, TSimpleThreadPool>(this, Params));
+ tmp.Reset(new TThreadPoolBinder<TThreadPool, TSimpleThreadPool>(this, Params));
} else {
- adaptive = new TThreadPoolBinder<TAdaptiveThreadPool, TSimpleThreadPool>(this, Params);
+ adaptive = new TThreadPoolBinder<TAdaptiveThreadPool, TSimpleThreadPool>(this, Params);
tmp.Reset(adaptive);
}
@@ -760,10 +760,10 @@ IThread* IThreadPool::DoCreate() {
return new TPoolThread(this);
}
-THolder<IThreadPool> CreateThreadPool(size_t threadsCount, size_t queueSizeLimit, const TThreadPoolParams& params) {
+THolder<IThreadPool> CreateThreadPool(size_t threadsCount, size_t queueSizeLimit, const TThreadPoolParams& params) {
THolder<IThreadPool> queue;
if (threadsCount > 1) {
- queue.Reset(new TThreadPool(params));
+ queue.Reset(new TThreadPool(params));
} else {
queue.Reset(new TFakeThreadPool());
}