aboutsummaryrefslogtreecommitdiffstats
path: root/util/thread/pool.h
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /util/thread/pool.h
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'util/thread/pool.h')
-rw-r--r--util/thread/pool.h260
1 files changed, 130 insertions, 130 deletions
diff --git a/util/thread/pool.h b/util/thread/pool.h
index e2a2a03968..d1ea3a67cb 100644
--- a/util/thread/pool.h
+++ b/util/thread/pool.h
@@ -2,19 +2,19 @@
#include "fwd.h"
#include "factory.h"
-
-#include <util/system/yassert.h>
+
+#include <util/system/yassert.h>
#include <util/system/defaults.h>
#include <util/generic/yexception.h>
#include <util/generic/ptr.h>
-#include <util/generic/noncopyable.h>
+#include <util/generic/noncopyable.h>
#include <functional>
class TDuration;
-struct IObjectInQueue {
+struct IObjectInQueue {
virtual ~IObjectInQueue() = default;
-
+
/**
* Supposed to be implemented by user, to define jobs processed
* in multiple threads.
@@ -32,38 +32,38 @@ struct IObjectInQueue {
* Useful only for creators of new queue classes.
*/
class TThreadFactoryHolder {
-public:
+public:
TThreadFactoryHolder() noexcept;
-
+
inline TThreadFactoryHolder(IThreadFactory* pool) noexcept
- : Pool_(pool)
- {
- }
-
+ : Pool_(pool)
+ {
+ }
+
inline ~TThreadFactoryHolder() = default;
-
+
inline IThreadFactory* Pool() const noexcept {
- return Pool_;
- }
-
-private:
+ return Pool_;
+ }
+
+private:
IThreadFactory* Pool_;
-};
-
+};
+
class TThreadPoolException: public yexception {
};
-template <class T>
-class TThrFuncObj: public IObjectInQueue {
+template <class T>
+class TThrFuncObj: public IObjectInQueue {
public:
TThrFuncObj(const T& func)
- : Func(func)
- {
+ : Func(func)
+ {
}
TThrFuncObj(T&& func)
- : Func(std::move(func))
- {
+ : Func(std::move(func))
+ {
}
void Process(void*) override {
@@ -75,7 +75,7 @@ private:
T Func;
};
-template <class T>
+template <class T>
IObjectInQueue* MakeThrFuncObj(T&& func) {
return new TThrFuncObj<std::remove_cv_t<std::remove_reference_t<T>>>(std::forward<T>(func));
}
@@ -134,10 +134,10 @@ struct TThreadPoolParams {
};
/**
- * A queue processed simultaneously by several threads
+ * A queue processed simultaneously by several threads
*/
class IThreadPool: public IThreadFactory, public TNonCopyable {
-public:
+public:
using TParams = TThreadPoolParams;
~IThreadPool() override = default;
@@ -148,7 +148,7 @@ public:
*/
void SafeAdd(IObjectInQueue* obj);
- template <class T>
+ template <class T>
void SafeAddFunc(T&& func) {
Y_ENSURE_EX(AddFunc(std::forward<T>(func)), TThreadPoolException() << TStringBuf("can not add function to queue"));
}
@@ -161,9 +161,9 @@ public:
* @return true of obj is successfully added to queue
* @return false if queue is full or shutting down
*/
- virtual bool Add(IObjectInQueue* obj) Y_WARN_UNUSED_RESULT = 0;
+ virtual bool Add(IObjectInQueue* obj) Y_WARN_UNUSED_RESULT = 0;
- template <class T>
+ template <class T>
Y_WARN_UNUSED_RESULT bool AddFunc(T&& func) {
THolder<IObjectInQueue> wrapper(MakeThrFuncObj(std::forward<T>(func)));
bool added = Add(wrapper.Get());
@@ -185,76 +185,76 @@ public:
* RAII wrapper for Create/DestroyThreadSpecificResource.
* Useful only for implementers of new IThreadPool queues.
*/
- class TTsr {
- public:
+ class TTsr {
+ public:
inline TTsr(IThreadPool* q)
- : Q_(q)
- , Data_(Q_->CreateThreadSpecificResource())
- {
- }
-
+ : Q_(q)
+ , Data_(Q_->CreateThreadSpecificResource())
+ {
+ }
+
inline ~TTsr() {
- try {
- Q_->DestroyThreadSpecificResource(Data_);
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
- }
-
+ try {
+ Q_->DestroyThreadSpecificResource(Data_);
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+ }
+
inline operator void*() noexcept {
- return Data_;
- }
-
- private:
+ return Data_;
+ }
+
+ private:
IThreadPool* Q_;
- void* Data_;
- };
-
+ void* Data_;
+ };
+
/**
* CreateThreadSpecificResource and DestroyThreadSpecificResource
* called from internals of (TAdaptiveThreadPool, TThreadPool, ...) implementation,
* not by user of IThreadPool interface.
* Created resource is passed to IObjectInQueue::Proccess function.
*/
- virtual void* CreateThreadSpecificResource() {
+ virtual void* CreateThreadSpecificResource() {
return nullptr;
- }
-
- virtual void DestroyThreadSpecificResource(void* resource) {
+ }
+
+ virtual void DestroyThreadSpecificResource(void* resource) {
if (resource != nullptr) {
Y_ASSERT(resource == nullptr);
- }
- }
-
-private:
+ }
+ }
+
+private:
IThread* DoCreate() override;
-};
-
+};
+
/**
* Single-threaded implementation of IThreadPool, process tasks in same thread when
* added.
* Can be used to remove multithreading.
*/
class TFakeThreadPool: public IThreadPool {
-public:
+public:
bool Add(IObjectInQueue* pObj) override Y_WARN_UNUSED_RESULT {
- TTsr tsr(this);
- pObj->Process(tsr);
-
- return true;
- }
-
+ TTsr tsr(this);
+ pObj->Process(tsr);
+
+ return true;
+ }
+
void Start(size_t, size_t = 0) override {
- }
-
+ }
+
void Stop() noexcept override {
- }
-
+ }
+
size_t Size() const noexcept override {
- return 0;
- }
-};
-
+ return 0;
+ }
+};
+
class TThreadPoolBase: public IThreadPool, public TThreadFactoryHolder {
public:
TThreadPoolBase(const TParams& params);
@@ -265,10 +265,10 @@ protected:
/** queue processed by fixed size thread pool */
class TThreadPool: public TThreadPoolBase {
-public:
+public:
TThreadPool(const TParams& params = {});
~TThreadPool() override;
-
+
bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT;
/**
* @param queueSizeLimit means "unlimited" when = 0
@@ -280,45 +280,45 @@ public:
size_t GetThreadCountExpected() const noexcept;
size_t GetThreadCountReal() const noexcept;
size_t GetMaxQueueSize() const noexcept;
-
-private:
- class TImpl;
- THolder<TImpl> Impl_;
-};
-
+
+private:
+ class TImpl;
+ THolder<TImpl> Impl_;
+};
+
/**
* Always create new thread for new task, when all existing threads are busy.
* Maybe dangerous, number of threads is not limited.
*/
class TAdaptiveThreadPool: public TThreadPoolBase {
-public:
+public:
TAdaptiveThreadPool(const TParams& params = {});
~TAdaptiveThreadPool() override;
-
+
/**
* If working thread waits task too long (more then interval parameter),
* then the thread would be killed. Default value - infinity, all created threads
* waits for new task forever, before Stop.
*/
- void SetMaxIdleTime(TDuration interval);
-
+ void SetMaxIdleTime(TDuration interval);
+
bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT;
/** @param thrnum, @param maxque are ignored */
void Start(size_t thrnum = 0, size_t maxque = 0) override;
void Stop() noexcept override;
size_t Size() const noexcept override;
-
+
private:
- class TImpl;
- THolder<TImpl> Impl_;
+ class TImpl;
+ THolder<TImpl> Impl_;
};
/** Behave like TThreadPool or TAdaptiveThreadPool, choosen by thrnum parameter of Start() */
class TSimpleThreadPool: public TThreadPoolBase {
-public:
+public:
TSimpleThreadPool(const TParams& params = {});
~TSimpleThreadPool() override;
-
+
bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT;
/**
* @parameter thrnum. If thrnum is 0, use TAdaptiveThreadPool with small
@@ -327,11 +327,11 @@ public:
void Start(size_t thrnum, size_t maxque = 0) override;
void Stop() noexcept override;
size_t Size() const noexcept override;
-
-private:
+
+private:
THolder<IThreadPool> Slave_;
-};
-
+};
+
/**
* Helper to override virtual functions Create/DestroyThreadSpecificResource
* from IThreadPool and implement them using functions with same name from
@@ -339,46 +339,46 @@ private:
*/
template <class TQueueType, class TSlave>
class TThreadPoolBinder: public TQueueType {
-public:
+public:
inline TThreadPoolBinder(TSlave* slave)
- : Slave_(slave)
- {
- }
-
- template <class... Args>
- inline TThreadPoolBinder(TSlave* slave, Args&&... args)
+ : Slave_(slave)
+ {
+ }
+
+ template <class... Args>
+ inline TThreadPoolBinder(TSlave* slave, Args&&... args)
: TQueueType(std::forward<Args>(args)...)
- , Slave_(slave)
- {
- }
-
+ , Slave_(slave)
+ {
+ }
+
inline TThreadPoolBinder(TSlave& slave)
- : Slave_(&slave)
- {
- }
-
+ : Slave_(&slave)
+ {
+ }
+
~TThreadPoolBinder() override {
- try {
- this->Stop();
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
- }
-
+ try {
+ this->Stop();
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+ }
+
void* CreateThreadSpecificResource() override {
- return Slave_->CreateThreadSpecificResource();
- }
-
+ return Slave_->CreateThreadSpecificResource();
+ }
+
void DestroyThreadSpecificResource(void* resource) override {
- Slave_->DestroyThreadSpecificResource(resource);
- }
-
-private:
- TSlave* Slave_;
-};
-
+ Slave_->DestroyThreadSpecificResource(resource);
+ }
+
+private:
+ TSlave* Slave_;
+};
+
inline void Delete(THolder<IThreadPool> q) {
- if (q.Get()) {
+ if (q.Get()) {
q->Stop();
}
}