diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /util/thread/pool.h | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'util/thread/pool.h')
-rw-r--r-- | util/thread/pool.h | 260 |
1 files changed, 130 insertions, 130 deletions
diff --git a/util/thread/pool.h b/util/thread/pool.h index d1ea3a67cb..e2a2a03968 100644 --- a/util/thread/pool.h +++ b/util/thread/pool.h @@ -2,19 +2,19 @@ #include "fwd.h" #include "factory.h" - -#include <util/system/yassert.h> + +#include <util/system/yassert.h> #include <util/system/defaults.h> #include <util/generic/yexception.h> #include <util/generic/ptr.h> -#include <util/generic/noncopyable.h> +#include <util/generic/noncopyable.h> #include <functional> class TDuration; -struct IObjectInQueue { +struct IObjectInQueue { virtual ~IObjectInQueue() = default; - + /** * Supposed to be implemented by user, to define jobs processed * in multiple threads. @@ -32,38 +32,38 @@ struct IObjectInQueue { * Useful only for creators of new queue classes. */ class TThreadFactoryHolder { -public: +public: TThreadFactoryHolder() noexcept; - + inline TThreadFactoryHolder(IThreadFactory* pool) noexcept - : Pool_(pool) - { - } - + : Pool_(pool) + { + } + inline ~TThreadFactoryHolder() = default; - + inline IThreadFactory* Pool() const noexcept { - return Pool_; - } - -private: + return Pool_; + } + +private: IThreadFactory* Pool_; -}; - +}; + class TThreadPoolException: public yexception { }; -template <class T> -class TThrFuncObj: public IObjectInQueue { +template <class T> +class TThrFuncObj: public IObjectInQueue { public: TThrFuncObj(const T& func) - : Func(func) - { + : Func(func) + { } TThrFuncObj(T&& func) - : Func(std::move(func)) - { + : Func(std::move(func)) + { } void Process(void*) override { @@ -75,7 +75,7 @@ private: T Func; }; -template <class T> +template <class T> IObjectInQueue* MakeThrFuncObj(T&& func) { return new TThrFuncObj<std::remove_cv_t<std::remove_reference_t<T>>>(std::forward<T>(func)); } @@ -134,10 +134,10 @@ struct TThreadPoolParams { }; /** - * A queue processed simultaneously by several threads + * A queue processed simultaneously by several threads */ class IThreadPool: public IThreadFactory, public TNonCopyable { -public: +public: using TParams = TThreadPoolParams; ~IThreadPool() override = default; @@ -148,7 +148,7 @@ public: */ void SafeAdd(IObjectInQueue* obj); - template <class T> + template <class T> void SafeAddFunc(T&& func) { Y_ENSURE_EX(AddFunc(std::forward<T>(func)), TThreadPoolException() << TStringBuf("can not add function to queue")); } @@ -161,9 +161,9 @@ public: * @return true of obj is successfully added to queue * @return false if queue is full or shutting down */ - virtual bool Add(IObjectInQueue* obj) Y_WARN_UNUSED_RESULT = 0; + virtual bool Add(IObjectInQueue* obj) Y_WARN_UNUSED_RESULT = 0; - template <class T> + template <class T> Y_WARN_UNUSED_RESULT bool AddFunc(T&& func) { THolder<IObjectInQueue> wrapper(MakeThrFuncObj(std::forward<T>(func))); bool added = Add(wrapper.Get()); @@ -185,76 +185,76 @@ public: * RAII wrapper for Create/DestroyThreadSpecificResource. * Useful only for implementers of new IThreadPool queues. */ - class TTsr { - public: + class TTsr { + public: inline TTsr(IThreadPool* q) - : Q_(q) - , Data_(Q_->CreateThreadSpecificResource()) - { - } - + : Q_(q) + , Data_(Q_->CreateThreadSpecificResource()) + { + } + inline ~TTsr() { - try { - Q_->DestroyThreadSpecificResource(Data_); - } catch (...) { - // ¯\_(ツ)_/¯ - } - } - + try { + Q_->DestroyThreadSpecificResource(Data_); + } catch (...) { + // ¯\_(ツ)_/¯ + } + } + inline operator void*() noexcept { - return Data_; - } - - private: + return Data_; + } + + private: IThreadPool* Q_; - void* Data_; - }; - + void* Data_; + }; + /** * CreateThreadSpecificResource and DestroyThreadSpecificResource * called from internals of (TAdaptiveThreadPool, TThreadPool, ...) implementation, * not by user of IThreadPool interface. * Created resource is passed to IObjectInQueue::Proccess function. */ - virtual void* CreateThreadSpecificResource() { + virtual void* CreateThreadSpecificResource() { return nullptr; - } - - virtual void DestroyThreadSpecificResource(void* resource) { + } + + virtual void DestroyThreadSpecificResource(void* resource) { if (resource != nullptr) { Y_ASSERT(resource == nullptr); - } - } - -private: + } + } + +private: IThread* DoCreate() override; -}; - +}; + /** * Single-threaded implementation of IThreadPool, process tasks in same thread when * added. * Can be used to remove multithreading. */ class TFakeThreadPool: public IThreadPool { -public: +public: bool Add(IObjectInQueue* pObj) override Y_WARN_UNUSED_RESULT { - TTsr tsr(this); - pObj->Process(tsr); - - return true; - } - + TTsr tsr(this); + pObj->Process(tsr); + + return true; + } + void Start(size_t, size_t = 0) override { - } - + } + void Stop() noexcept override { - } - + } + size_t Size() const noexcept override { - return 0; - } -}; - + return 0; + } +}; + class TThreadPoolBase: public IThreadPool, public TThreadFactoryHolder { public: TThreadPoolBase(const TParams& params); @@ -265,10 +265,10 @@ protected: /** queue processed by fixed size thread pool */ class TThreadPool: public TThreadPoolBase { -public: +public: TThreadPool(const TParams& params = {}); ~TThreadPool() override; - + bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT; /** * @param queueSizeLimit means "unlimited" when = 0 @@ -280,45 +280,45 @@ public: size_t GetThreadCountExpected() const noexcept; size_t GetThreadCountReal() const noexcept; size_t GetMaxQueueSize() const noexcept; - -private: - class TImpl; - THolder<TImpl> Impl_; -}; - + +private: + class TImpl; + THolder<TImpl> Impl_; +}; + /** * Always create new thread for new task, when all existing threads are busy. * Maybe dangerous, number of threads is not limited. */ class TAdaptiveThreadPool: public TThreadPoolBase { -public: +public: TAdaptiveThreadPool(const TParams& params = {}); ~TAdaptiveThreadPool() override; - + /** * If working thread waits task too long (more then interval parameter), * then the thread would be killed. Default value - infinity, all created threads * waits for new task forever, before Stop. */ - void SetMaxIdleTime(TDuration interval); - + void SetMaxIdleTime(TDuration interval); + bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT; /** @param thrnum, @param maxque are ignored */ void Start(size_t thrnum = 0, size_t maxque = 0) override; void Stop() noexcept override; size_t Size() const noexcept override; - + private: - class TImpl; - THolder<TImpl> Impl_; + class TImpl; + THolder<TImpl> Impl_; }; /** Behave like TThreadPool or TAdaptiveThreadPool, choosen by thrnum parameter of Start() */ class TSimpleThreadPool: public TThreadPoolBase { -public: +public: TSimpleThreadPool(const TParams& params = {}); ~TSimpleThreadPool() override; - + bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT; /** * @parameter thrnum. If thrnum is 0, use TAdaptiveThreadPool with small @@ -327,11 +327,11 @@ public: void Start(size_t thrnum, size_t maxque = 0) override; void Stop() noexcept override; size_t Size() const noexcept override; - -private: + +private: THolder<IThreadPool> Slave_; -}; - +}; + /** * Helper to override virtual functions Create/DestroyThreadSpecificResource * from IThreadPool and implement them using functions with same name from @@ -339,46 +339,46 @@ private: */ template <class TQueueType, class TSlave> class TThreadPoolBinder: public TQueueType { -public: +public: inline TThreadPoolBinder(TSlave* slave) - : Slave_(slave) - { - } - - template <class... Args> - inline TThreadPoolBinder(TSlave* slave, Args&&... args) + : Slave_(slave) + { + } + + template <class... Args> + inline TThreadPoolBinder(TSlave* slave, Args&&... args) : TQueueType(std::forward<Args>(args)...) - , Slave_(slave) - { - } - + , Slave_(slave) + { + } + inline TThreadPoolBinder(TSlave& slave) - : Slave_(&slave) - { - } - + : Slave_(&slave) + { + } + ~TThreadPoolBinder() override { - try { - this->Stop(); - } catch (...) { - // ¯\_(ツ)_/¯ - } - } - + try { + this->Stop(); + } catch (...) { + // ¯\_(ツ)_/¯ + } + } + void* CreateThreadSpecificResource() override { - return Slave_->CreateThreadSpecificResource(); - } - + return Slave_->CreateThreadSpecificResource(); + } + void DestroyThreadSpecificResource(void* resource) override { - Slave_->DestroyThreadSpecificResource(resource); - } - -private: - TSlave* Slave_; -}; - + Slave_->DestroyThreadSpecificResource(resource); + } + +private: + TSlave* Slave_; +}; + inline void Delete(THolder<IThreadPool> q) { - if (q.Get()) { + if (q.Get()) { q->Stop(); } } |