aboutsummaryrefslogtreecommitdiffstats
path: root/util/thread/pool.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /util/thread/pool.cpp
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'util/thread/pool.cpp')
-rw-r--r--util/thread/pool.cpp990
1 files changed, 495 insertions, 495 deletions
diff --git a/util/thread/pool.cpp b/util/thread/pool.cpp
index 05fad02e9b..0275429304 100644
--- a/util/thread/pool.cpp
+++ b/util/thread/pool.cpp
@@ -2,31 +2,31 @@
#include <util/system/defaults.h>
-#if defined(_unix_)
- #include <pthread.h>
-#endif
-
+#if defined(_unix_)
+ #include <pthread.h>
+#endif
+
#include <util/generic/vector.h>
-#include <util/generic/intrlist.h>
+#include <util/generic/intrlist.h>
#include <util/generic/yexception.h>
#include <util/generic/ylimits.h>
-#include <util/generic/singleton.h>
+#include <util/generic/singleton.h>
#include <util/generic/fastqueue.h>
-
+
#include <util/stream/output.h>
#include <util/string/builder.h>
-
-#include <util/system/event.h>
-#include <util/system/mutex.h>
-#include <util/system/atomic.h>
-#include <util/system/condvar.h>
+
+#include <util/system/event.h>
+#include <util/system/mutex.h>
+#include <util/system/atomic.h>
+#include <util/system/condvar.h>
#include <util/system/thread.h>
#include <util/datetime/base.h>
#include "factory.h"
#include "pool.h"
-
+
namespace {
class TThreadNamer {
public:
@@ -36,7 +36,7 @@ namespace {
{
}
- explicit operator bool() const {
+ explicit operator bool() const {
return !ThreadName.empty();
}
@@ -62,269 +62,269 @@ namespace {
TThreadFactoryHolder::TThreadFactoryHolder() noexcept
: Pool_(SystemThreadFactory())
-{
-}
-
+{
+}
+
class TThreadPool::TImpl: public TIntrusiveListItem<TImpl>, public IThreadFactory::IThreadAble {
using TTsr = IThreadPool::TTsr;
using TJobQueue = TFastQueue<IObjectInQueue*>;
using TThreadRef = THolder<IThreadFactory::IThread>;
-
-public:
+
+public:
inline TImpl(TThreadPool* parent, size_t thrnum, size_t maxqueue, const TParams& params)
- : Parent_(parent)
+ : Parent_(parent)
, Blocking(params.Blocking_)
, Catching(params.Catching_)
, Namer(params)
, ShouldTerminate(1)
- , MaxQueueSize(0)
- , ThreadCountExpected(0)
- , ThreadCountReal(0)
- , Forked(false)
- {
- TAtforkQueueRestarter::Get().RegisterObject(this);
- Start(thrnum, maxqueue);
- }
-
+ , MaxQueueSize(0)
+ , ThreadCountExpected(0)
+ , ThreadCountReal(0)
+ , Forked(false)
+ {
+ TAtforkQueueRestarter::Get().RegisterObject(this);
+ Start(thrnum, maxqueue);
+ }
+
inline ~TImpl() override {
- try {
- Stop();
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
-
- TAtforkQueueRestarter::Get().UnregisterObject(this);
+ try {
+ Stop();
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+
+ TAtforkQueueRestarter::Get().UnregisterObject(this);
Y_ASSERT(Tharr.empty());
- }
-
- inline bool Add(IObjectInQueue* obj) {
+ }
+
+ inline bool Add(IObjectInQueue* obj) {
if (AtomicGet(ShouldTerminate)) {
- return false;
- }
-
- if (Tharr.empty()) {
- TTsr tsr(Parent_);
- obj->Process(tsr);
-
- return true;
- }
-
- with_lock (QueueMutex) {
+ return false;
+ }
+
+ if (Tharr.empty()) {
+ TTsr tsr(Parent_);
+ obj->Process(tsr);
+
+ return true;
+ }
+
+ with_lock (QueueMutex) {
while (MaxQueueSize > 0 && Queue.Size() >= MaxQueueSize && !AtomicGet(ShouldTerminate)) {
- if (!Blocking) {
- return false;
- }
- QueuePopCond.Wait(QueueMutex);
- }
-
+ if (!Blocking) {
+ return false;
+ }
+ QueuePopCond.Wait(QueueMutex);
+ }
+
if (AtomicGet(ShouldTerminate)) {
- return false;
- }
-
- Queue.Push(obj);
+ return false;
+ }
+
+ Queue.Push(obj);
}
-
+
QueuePushCond.Signal();
- return true;
- }
-
+ return true;
+ }
+
inline size_t Size() const noexcept {
- auto guard = Guard(QueueMutex);
+ auto guard = Guard(QueueMutex);
- return Queue.Size();
- }
+ return Queue.Size();
+ }
inline size_t GetMaxQueueSize() const noexcept {
- return MaxQueueSize;
- }
+ return MaxQueueSize;
+ }
inline size_t GetThreadCountExpected() const noexcept {
- return ThreadCountExpected;
- }
+ return ThreadCountExpected;
+ }
inline size_t GetThreadCountReal() const noexcept {
return ThreadCountReal;
}
inline void AtforkAction() noexcept Y_NO_SANITIZE("thread") {
- Forked = true;
- }
-
+ Forked = true;
+ }
+
inline bool NeedRestart() const noexcept {
- return Forked;
- }
-
-private:
- inline void Start(size_t num, size_t maxque) {
+ return Forked;
+ }
+
+private:
+ inline void Start(size_t num, size_t maxque) {
AtomicSet(ShouldTerminate, 0);
- MaxQueueSize = maxque;
- ThreadCountExpected = num;
-
- try {
- for (size_t i = 0; i < num; ++i) {
- Tharr.push_back(Parent_->Pool()->Run(this));
- ++ThreadCountReal;
- }
- } catch (...) {
- Stop();
-
- throw;
- }
- }
-
- inline void Stop() {
+ MaxQueueSize = maxque;
+ ThreadCountExpected = num;
+
+ try {
+ for (size_t i = 0; i < num; ++i) {
+ Tharr.push_back(Parent_->Pool()->Run(this));
+ ++ThreadCountReal;
+ }
+ } catch (...) {
+ Stop();
+
+ throw;
+ }
+ }
+
+ inline void Stop() {
AtomicSet(ShouldTerminate, 1);
- with_lock (QueueMutex) {
- QueuePopCond.BroadCast();
- }
-
- if (!NeedRestart()) {
- WaitForComplete();
- }
-
- Tharr.clear();
- ThreadCountExpected = 0;
- MaxQueueSize = 0;
- }
-
+ with_lock (QueueMutex) {
+ QueuePopCond.BroadCast();
+ }
+
+ if (!NeedRestart()) {
+ WaitForComplete();
+ }
+
+ Tharr.clear();
+ ThreadCountExpected = 0;
+ MaxQueueSize = 0;
+ }
+
inline void WaitForComplete() noexcept {
- with_lock (StopMutex) {
- while (ThreadCountReal) {
- with_lock (QueueMutex) {
+ with_lock (StopMutex) {
+ while (ThreadCountReal) {
+ with_lock (QueueMutex) {
QueuePushCond.Signal();
- }
-
- StopCond.Wait(StopMutex);
- }
- }
- }
-
+ }
+
+ StopCond.Wait(StopMutex);
+ }
+ }
+ }
+
void DoExecute() override {
- THolder<TTsr> tsr(new TTsr(Parent_));
-
+ THolder<TTsr> tsr(new TTsr(Parent_));
+
if (Namer) {
Namer.SetCurrentThreadName();
}
- while (true) {
+ while (true) {
IObjectInQueue* job = nullptr;
-
- with_lock (QueueMutex) {
+
+ with_lock (QueueMutex) {
while (Queue.Empty() && !AtomicGet(ShouldTerminate)) {
QueuePushCond.Wait(QueueMutex);
- }
-
+ }
+
if (AtomicGet(ShouldTerminate) && Queue.Empty()) {
- tsr.Destroy();
-
- break;
- }
-
- job = Queue.Pop();
- }
-
- QueuePopCond.Signal();
-
+ tsr.Destroy();
+
+ break;
+ }
+
+ job = Queue.Pop();
+ }
+
+ QueuePopCond.Signal();
+
if (Catching) {
- try {
+ try {
try {
job->Process(*tsr);
} catch (...) {
Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
}
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
} else {
job->Process(*tsr);
- }
- }
-
- FinishOneThread();
- }
-
+ }
+ }
+
+ FinishOneThread();
+ }
+
inline void FinishOneThread() noexcept {
- auto guard = Guard(StopMutex);
-
- --ThreadCountReal;
- StopCond.Signal();
- }
+ auto guard = Guard(StopMutex);
+
+ --ThreadCountReal;
+ StopCond.Signal();
+ }
-private:
+private:
TThreadPool* Parent_;
const bool Blocking;
const bool Catching;
TThreadNamer Namer;
- mutable TMutex QueueMutex;
- mutable TMutex StopMutex;
+ mutable TMutex QueueMutex;
+ mutable TMutex StopMutex;
TCondVar QueuePushCond;
- TCondVar QueuePopCond;
- TCondVar StopCond;
- TJobQueue Queue;
+ TCondVar QueuePopCond;
+ TCondVar StopCond;
+ TJobQueue Queue;
TVector<TThreadRef> Tharr;
TAtomic ShouldTerminate;
- size_t MaxQueueSize;
- size_t ThreadCountExpected;
- size_t ThreadCountReal;
- bool Forked;
-
- class TAtforkQueueRestarter {
- public:
- static TAtforkQueueRestarter& Get() {
- return *SingletonWithPriority<TAtforkQueueRestarter, 256>();
- }
-
- inline void RegisterObject(TImpl* obj) {
- auto guard = Guard(ActionMutex);
-
- RegisteredObjects.PushBack(obj);
- }
-
- inline void UnregisterObject(TImpl* obj) {
- auto guard = Guard(ActionMutex);
-
- obj->Unlink();
- }
-
- private:
- void ChildAction() {
- with_lock (ActionMutex) {
- for (auto it = RegisteredObjects.Begin(); it != RegisteredObjects.End(); ++it) {
- it->AtforkAction();
- }
- }
- }
-
- static void ProcessChildAction() {
- Get().ChildAction();
- }
-
- TIntrusiveList<TImpl> RegisteredObjects;
- TMutex ActionMutex;
-
- public:
- inline TAtforkQueueRestarter() {
-#if defined(_bionic_)
-//no pthread_atfork on android libc
-#elif defined(_unix_)
+ size_t MaxQueueSize;
+ size_t ThreadCountExpected;
+ size_t ThreadCountReal;
+ bool Forked;
+
+ class TAtforkQueueRestarter {
+ public:
+ static TAtforkQueueRestarter& Get() {
+ return *SingletonWithPriority<TAtforkQueueRestarter, 256>();
+ }
+
+ inline void RegisterObject(TImpl* obj) {
+ auto guard = Guard(ActionMutex);
+
+ RegisteredObjects.PushBack(obj);
+ }
+
+ inline void UnregisterObject(TImpl* obj) {
+ auto guard = Guard(ActionMutex);
+
+ obj->Unlink();
+ }
+
+ private:
+ void ChildAction() {
+ with_lock (ActionMutex) {
+ for (auto it = RegisteredObjects.Begin(); it != RegisteredObjects.End(); ++it) {
+ it->AtforkAction();
+ }
+ }
+ }
+
+ static void ProcessChildAction() {
+ Get().ChildAction();
+ }
+
+ TIntrusiveList<TImpl> RegisteredObjects;
+ TMutex ActionMutex;
+
+ public:
+ inline TAtforkQueueRestarter() {
+#if defined(_bionic_)
+//no pthread_atfork on android libc
+#elif defined(_unix_)
pthread_atfork(nullptr, nullptr, ProcessChildAction);
#endif
- }
- };
-};
-
+ }
+ };
+};
+
TThreadPool::~TThreadPool() = default;
-
+
size_t TThreadPool::Size() const noexcept {
- if (!Impl_.Get()) {
- return 0;
- }
-
- return Impl_->Size();
-}
-
+ if (!Impl_.Get()) {
+ return 0;
+ }
+
+ return Impl_->Size();
+}
+
size_t TThreadPool::GetThreadCountExpected() const noexcept {
if (!Impl_.Get()) {
return 0;
@@ -351,298 +351,298 @@ size_t TThreadPool::GetMaxQueueSize() const noexcept {
bool TThreadPool::Add(IObjectInQueue* obj) {
Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
-
- if (Impl_->NeedRestart()) {
- Start(Impl_->GetThreadCountExpected(), Impl_->GetMaxQueueSize());
+
+ if (Impl_->NeedRestart()) {
+ Start(Impl_->GetThreadCountExpected(), Impl_->GetMaxQueueSize());
}
- return Impl_->Add(obj);
+ return Impl_->Add(obj);
}
void TThreadPool::Start(size_t thrnum, size_t maxque) {
Impl_.Reset(new TImpl(this, thrnum, maxque, Params));
-}
-
+}
+
void TThreadPool::Stop() noexcept {
- Impl_.Destroy();
-}
-
-static TAtomic mtp_queue_counter = 0;
-
+ Impl_.Destroy();
+}
+
+static TAtomic mtp_queue_counter = 0;
+
class TAdaptiveThreadPool::TImpl {
-public:
+public:
class TThread: public IThreadFactory::IThreadAble {
public:
- inline TThread(TImpl* parent)
- : Impl_(parent)
- , Thread_(Impl_->Parent_->Pool()->Run(this))
- {
- }
-
+ inline TThread(TImpl* parent)
+ : Impl_(parent)
+ , Thread_(Impl_->Parent_->Pool()->Run(this))
+ {
+ }
+
inline ~TThread() override {
- Impl_->DecThreadCount();
- }
-
- private:
+ Impl_->DecThreadCount();
+ }
+
+ private:
void DoExecute() noexcept override {
- THolder<TThread> This(this);
-
+ THolder<TThread> This(this);
+
if (Impl_->Namer) {
Impl_->Namer.SetCurrentThreadName();
}
- {
- TTsr tsr(Impl_->Parent_);
- IObjectInQueue* obj;
-
+ {
+ TTsr tsr(Impl_->Parent_);
+ IObjectInQueue* obj;
+
while ((obj = Impl_->WaitForJob()) != nullptr) {
if (Impl_->Catching) {
- try {
+ try {
try {
obj->Process(tsr);
} catch (...) {
Cdbg << Impl_->Name() << " " << CurrentExceptionMessage() << Endl;
}
- } catch (...) {
+ } catch (...) {
// ¯\_(ツ)_/¯
- }
+ }
} else {
obj->Process(tsr);
- }
- }
- }
- }
-
- private:
- TImpl* Impl_;
+ }
+ }
+ }
+ }
+
+ private:
+ TImpl* Impl_;
THolder<IThreadFactory::IThread> Thread_;
- };
-
+ };
+
inline TImpl(TAdaptiveThreadPool* parent, const TParams& params)
- : Parent_(parent)
+ : Parent_(parent)
, Catching(params.Catching_)
, Namer(params)
- , ThrCount_(0)
- , AllDone_(false)
+ , ThrCount_(0)
+ , AllDone_(false)
, Obj_(nullptr)
- , Free_(0)
- , IdleTime_(TDuration::Max())
- {
- sprintf(Name_, "[mtp queue %ld]", (long)AtomicAdd(mtp_queue_counter, 1));
- }
-
+ , Free_(0)
+ , IdleTime_(TDuration::Max())
+ {
+ sprintf(Name_, "[mtp queue %ld]", (long)AtomicAdd(mtp_queue_counter, 1));
+ }
+
inline ~TImpl() {
- Stop();
- }
-
- inline void SetMaxIdleTime(TDuration idleTime) {
- IdleTime_ = idleTime;
- }
-
+ Stop();
+ }
+
+ inline void SetMaxIdleTime(TDuration idleTime) {
+ IdleTime_ = idleTime;
+ }
+
inline const char* Name() const noexcept {
- return Name_;
- }
-
- inline void Add(IObjectInQueue* obj) {
- with_lock (Mutex_) {
+ return Name_;
+ }
+
+ inline void Add(IObjectInQueue* obj) {
+ with_lock (Mutex_) {
while (Obj_ != nullptr) {
- CondFree_.Wait(Mutex_);
- }
-
- if (Free_ == 0) {
- AddThreadNoLock();
- }
-
- Obj_ = obj;
-
+ CondFree_.Wait(Mutex_);
+ }
+
+ if (Free_ == 0) {
+ AddThreadNoLock();
+ }
+
+ Obj_ = obj;
+
Y_ENSURE_EX(!AllDone_, TThreadPoolException() << TStringBuf("adding to a stopped queue"));
- }
-
- CondReady_.Signal();
- }
-
- inline void AddThreads(size_t n) {
- with_lock (Mutex_) {
- while (n) {
- AddThreadNoLock();
-
- --n;
- }
- }
- }
-
+ }
+
+ CondReady_.Signal();
+ }
+
+ inline void AddThreads(size_t n) {
+ with_lock (Mutex_) {
+ while (n) {
+ AddThreadNoLock();
+
+ --n;
+ }
+ }
+ }
+
inline size_t Size() const noexcept {
- return (size_t)ThrCount_;
- }
-
-private:
+ return (size_t)ThrCount_;
+ }
+
+private:
inline void IncThreadCount() noexcept {
- AtomicAdd(ThrCount_, 1);
- }
-
+ AtomicAdd(ThrCount_, 1);
+ }
+
inline void DecThreadCount() noexcept {
- AtomicAdd(ThrCount_, -1);
- }
-
- inline void AddThreadNoLock() {
- IncThreadCount();
-
- try {
- new TThread(this);
- } catch (...) {
- DecThreadCount();
-
- throw;
- }
- }
-
+ AtomicAdd(ThrCount_, -1);
+ }
+
+ inline void AddThreadNoLock() {
+ IncThreadCount();
+
+ try {
+ new TThread(this);
+ } catch (...) {
+ DecThreadCount();
+
+ throw;
+ }
+ }
+
inline void Stop() noexcept {
- Mutex_.Acquire();
-
- AllDone_ = true;
-
+ Mutex_.Acquire();
+
+ AllDone_ = true;
+
while (AtomicGet(ThrCount_)) {
- Mutex_.Release();
- CondReady_.Signal();
- Mutex_.Acquire();
- }
-
- Mutex_.Release();
- }
-
+ Mutex_.Release();
+ CondReady_.Signal();
+ Mutex_.Acquire();
+ }
+
+ Mutex_.Release();
+ }
+
inline IObjectInQueue* WaitForJob() noexcept {
- Mutex_.Acquire();
-
- ++Free_;
-
- while (!Obj_ && !AllDone_) {
+ Mutex_.Acquire();
+
+ ++Free_;
+
+ while (!Obj_ && !AllDone_) {
if (!CondReady_.WaitT(Mutex_, IdleTime_)) {
- break;
- }
- }
-
- IObjectInQueue* ret = Obj_;
+ break;
+ }
+ }
+
+ IObjectInQueue* ret = Obj_;
Obj_ = nullptr;
-
- --Free_;
-
- Mutex_.Release();
- CondFree_.Signal();
-
- return ret;
- }
-
-private:
+
+ --Free_;
+
+ Mutex_.Release();
+ CondFree_.Signal();
+
+ return ret;
+ }
+
+private:
TAdaptiveThreadPool* Parent_;
const bool Catching;
TThreadNamer Namer;
- TAtomic ThrCount_;
- TMutex Mutex_;
- TCondVar CondReady_;
- TCondVar CondFree_;
- bool AllDone_;
- IObjectInQueue* Obj_;
- size_t Free_;
- char Name_[64];
- TDuration IdleTime_;
-};
-
+ TAtomic ThrCount_;
+ TMutex Mutex_;
+ TCondVar CondReady_;
+ TCondVar CondFree_;
+ bool AllDone_;
+ IObjectInQueue* Obj_;
+ size_t Free_;
+ char Name_[64];
+ TDuration IdleTime_;
+};
+
TThreadPoolBase::TThreadPoolBase(const TParams& params)
: TThreadFactoryHolder(params.Factory_)
, Params(params)
-{
-}
-
+{
+}
+
#define DEFINE_THREAD_POOL_CTORS(type) \
- type::type(const TParams& params) \
- : TThreadPoolBase(params) \
- { \
- }
+ type::type(const TParams& params) \
+ : TThreadPoolBase(params) \
+ { \
+ }
DEFINE_THREAD_POOL_CTORS(TThreadPool)
DEFINE_THREAD_POOL_CTORS(TAdaptiveThreadPool)
DEFINE_THREAD_POOL_CTORS(TSimpleThreadPool)
TAdaptiveThreadPool::~TAdaptiveThreadPool() = default;
-
+
bool TAdaptiveThreadPool::Add(IObjectInQueue* obj) {
Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
-
- Impl_->Add(obj);
-
- return true;
-}
-
+
+ Impl_->Add(obj);
+
+ return true;
+}
+
void TAdaptiveThreadPool::Start(size_t, size_t) {
Impl_.Reset(new TImpl(this, Params));
-}
-
+}
+
void TAdaptiveThreadPool::Stop() noexcept {
- Impl_.Destroy();
-}
-
+ Impl_.Destroy();
+}
+
size_t TAdaptiveThreadPool::Size() const noexcept {
- if (Impl_.Get()) {
- return Impl_->Size();
- }
-
- return 0;
-}
-
+ if (Impl_.Get()) {
+ return Impl_->Size();
+ }
+
+ return 0;
+}
+
void TAdaptiveThreadPool::SetMaxIdleTime(TDuration interval) {
Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
-
+
Impl_->SetMaxIdleTime(interval);
-}
-
+}
+
TSimpleThreadPool::~TSimpleThreadPool() {
- try {
- Stop();
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
-}
-
+ try {
+ Stop();
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+}
+
bool TSimpleThreadPool::Add(IObjectInQueue* obj) {
Y_ENSURE_EX(Slave_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
-
- return Slave_->Add(obj);
-}
-
+
+ return Slave_->Add(obj);
+}
+
void TSimpleThreadPool::Start(size_t thrnum, size_t maxque) {
THolder<IThreadPool> tmp;
TAdaptiveThreadPool* adaptive(nullptr);
-
- if (thrnum) {
+
+ if (thrnum) {
tmp.Reset(new TThreadPoolBinder<TThreadPool, TSimpleThreadPool>(this, Params));
- } else {
+ } else {
adaptive = new TThreadPoolBinder<TAdaptiveThreadPool, TSimpleThreadPool>(this, Params);
- tmp.Reset(adaptive);
- }
-
- tmp->Start(thrnum, maxque);
-
- if (adaptive) {
+ tmp.Reset(adaptive);
+ }
+
+ tmp->Start(thrnum, maxque);
+
+ if (adaptive) {
adaptive->SetMaxIdleTime(TDuration::Seconds(100));
- }
-
- Slave_.Swap(tmp);
-}
-
+ }
+
+ Slave_.Swap(tmp);
+}
+
void TSimpleThreadPool::Stop() noexcept {
- Slave_.Destroy();
-}
-
+ Slave_.Destroy();
+}
+
size_t TSimpleThreadPool::Size() const noexcept {
- if (Slave_.Get()) {
- return Slave_->Size();
- }
-
- return 0;
-}
-
+ if (Slave_.Get()) {
+ return Slave_->Size();
+ }
+
+ return 0;
+}
+
namespace {
- class TOwnedObjectInQueue: public IObjectInQueue {
+ class TOwnedObjectInQueue: public IObjectInQueue {
private:
THolder<IObjectInQueue> Owned;
@@ -661,7 +661,7 @@ namespace {
void IThreadPool::SafeAdd(IObjectInQueue* obj) {
Y_ENSURE_EX(Add(obj), TThreadPoolException() << TStringBuf("can not add object to queue"));
-}
+}
void IThreadPool::SafeAddAndOwn(THolder<IObjectInQueue> obj) {
Y_ENSURE_EX(AddAndOwn(std::move(obj)), TThreadPoolException() << TStringBuf("can not add to queue and own"));
@@ -678,87 +678,87 @@ bool IThreadPool::AddAndOwn(THolder<IObjectInQueue> obj) {
using IThread = IThreadFactory::IThread;
using IThreadAble = IThreadFactory::IThreadAble;
-
-namespace {
- class TPoolThread: public IThread {
- class TThreadImpl: public IObjectInQueue, public TAtomicRefCount<TThreadImpl> {
- public:
- inline TThreadImpl(IThreadAble* func)
- : Func_(func)
- {
- }
-
+
+namespace {
+ class TPoolThread: public IThread {
+ class TThreadImpl: public IObjectInQueue, public TAtomicRefCount<TThreadImpl> {
+ public:
+ inline TThreadImpl(IThreadAble* func)
+ : Func_(func)
+ {
+ }
+
~TThreadImpl() override = default;
-
+
inline void WaitForStart() noexcept {
- StartEvent_.Wait();
- }
-
+ StartEvent_.Wait();
+ }
+
inline void WaitForComplete() noexcept {
- CompleteEvent_.Wait();
- }
-
- private:
+ CompleteEvent_.Wait();
+ }
+
+ private:
void Process(void* /*tsr*/) override {
- TThreadImplRef This(this);
-
- {
- StartEvent_.Signal();
-
- try {
- Func_->Execute();
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
-
- CompleteEvent_.Signal();
- }
- }
-
- private:
- IThreadAble* Func_;
+ TThreadImplRef This(this);
+
+ {
+ StartEvent_.Signal();
+
+ try {
+ Func_->Execute();
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+
+ CompleteEvent_.Signal();
+ }
+ }
+
+ private:
+ IThreadAble* Func_;
TSystemEvent CompleteEvent_;
TSystemEvent StartEvent_;
- };
-
+ };
+
using TThreadImplRef = TIntrusivePtr<TThreadImpl>;
-
- public:
+
+ public:
inline TPoolThread(IThreadPool* parent)
- : Parent_(parent)
- {
- }
-
+ : Parent_(parent)
+ {
+ }
+
~TPoolThread() override {
- if (Impl_) {
- Impl_->WaitForStart();
- }
- }
-
- private:
+ if (Impl_) {
+ Impl_->WaitForStart();
+ }
+ }
+
+ private:
void DoRun(IThreadAble* func) override {
- TThreadImplRef impl(new TThreadImpl(func));
-
- Parent_->SafeAdd(impl.Get());
- Impl_.Swap(impl);
- }
-
+ TThreadImplRef impl(new TThreadImpl(func));
+
+ Parent_->SafeAdd(impl.Get());
+ Impl_.Swap(impl);
+ }
+
void DoJoin() noexcept override {
- if (Impl_) {
- Impl_->WaitForComplete();
+ if (Impl_) {
+ Impl_->WaitForComplete();
Impl_ = nullptr;
- }
- }
-
- private:
+ }
+ }
+
+ private:
IThreadPool* Parent_;
- TThreadImplRef Impl_;
- };
-}
-
+ TThreadImplRef Impl_;
+ };
+}
+
IThread* IThreadPool::DoCreate() {
- return new TPoolThread(this);
-}
+ return new TPoolThread(this);
+}
THolder<IThreadPool> CreateThreadPool(size_t threadsCount, size_t queueSizeLimit, const TThreadPoolParams& params) {
THolder<IThreadPool> queue;