diff options
author | kulikov <kulikov@yandex-team.ru> | 2022-02-10 16:49:34 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:34 +0300 |
commit | 65e5266709e7ff94b14ae128309e229de714b0df (patch) | |
tree | d4901f06e56d95f5e5d36bd1806bcc144d03bf41 /library/cpp/threading/task_scheduler/task_scheduler.h | |
parent | 0041d99876ae3dccc3f0fa8787131d85ddfd486b (diff) | |
download | ydb-65e5266709e7ff94b14ae128309e229de714b0df.tar.gz |
Restoring authorship annotation for <kulikov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/task_scheduler/task_scheduler.h')
-rw-r--r-- | library/cpp/threading/task_scheduler/task_scheduler.h | 162 |
1 files changed, 81 insertions, 81 deletions
diff --git a/library/cpp/threading/task_scheduler/task_scheduler.h b/library/cpp/threading/task_scheduler/task_scheduler.h index df4da941a8..166946a2aa 100644 --- a/library/cpp/threading/task_scheduler/task_scheduler.h +++ b/library/cpp/threading/task_scheduler/task_scheduler.h @@ -1,86 +1,86 @@ -#pragma once - -#include <util/generic/vector.h> -#include <util/generic/ptr.h> -#include <util/generic/map.h> - -#include <util/datetime/base.h> - -#include <util/system/condvar.h> -#include <util/system/mutex.h> - -class TTaskScheduler { -public: - class ITask; - using ITaskRef = TIntrusivePtr<ITask>; - - class IRepeatedTask; - using IRepeatedTaskRef = TIntrusivePtr<IRepeatedTask>; -public: - explicit TTaskScheduler(size_t threadCount = 1, size_t maxTaskCount = Max<size_t>()); - ~TTaskScheduler(); - - void Start(); - void Stop(); - - bool Add(ITaskRef task, TInstant expire); - bool Add(IRepeatedTaskRef task, TDuration period); - - size_t GetTaskCount() const; -private: - class TWorkerThread; - - struct TTaskHolder { - explicit TTaskHolder(ITaskRef& task) - : Task(task) - { - } - public: - ITaskRef Task; - TWorkerThread* WaitingWorker = nullptr; - }; - +#pragma once + +#include <util/generic/vector.h> +#include <util/generic/ptr.h> +#include <util/generic/map.h> + +#include <util/datetime/base.h> + +#include <util/system/condvar.h> +#include <util/system/mutex.h> + +class TTaskScheduler { +public: + class ITask; + using ITaskRef = TIntrusivePtr<ITask>; + + class IRepeatedTask; + using IRepeatedTaskRef = TIntrusivePtr<IRepeatedTask>; +public: + explicit TTaskScheduler(size_t threadCount = 1, size_t maxTaskCount = Max<size_t>()); + ~TTaskScheduler(); + + void Start(); + void Stop(); + + bool Add(ITaskRef task, TInstant expire); + bool Add(IRepeatedTaskRef task, TDuration period); + + size_t GetTaskCount() const; +private: + class TWorkerThread; + + struct TTaskHolder { + explicit TTaskHolder(ITaskRef& task) + : Task(task) + { + } + public: + ITaskRef Task; + TWorkerThread* WaitingWorker = nullptr; + }; + using TQueueType = TMultiMap<TInstant, TTaskHolder>; using TQueueIterator = TQueueType::iterator; -private: +private: void ChangeDebugState(TWorkerThread* thread, const TString& state); - void ChooseFromQueue(TQueueIterator& toWait); - bool Wait(TWorkerThread* thread, TQueueIterator& toWait); - - void WorkerFunc(TWorkerThread* thread); -private: - bool IsStopped_ = false; - - TAtomic TaskCounter_ = 0; + void ChooseFromQueue(TQueueIterator& toWait); + bool Wait(TWorkerThread* thread, TQueueIterator& toWait); + + void WorkerFunc(TWorkerThread* thread); +private: + bool IsStopped_ = false; + + TAtomic TaskCounter_ = 0; TQueueType Queue_; - - TCondVar CondVar_; - TMutex Lock_; - + + TCondVar CondVar_; + TMutex Lock_; + TVector<TAutoPtr<TWorkerThread>> Workers_; - - const size_t MaxTaskCount_; -}; - -class TTaskScheduler::ITask - : public TAtomicRefCount<ITask> -{ -public: - virtual ~ITask(); - - virtual TInstant Process() {//returns time to repeat this task - return TInstant::Max(); - } -}; - -class TTaskScheduler::IRepeatedTask - : public TAtomicRefCount<IRepeatedTask> -{ -public: - virtual ~IRepeatedTask(); - - virtual bool Process() {//returns if to repeat task again - return false; - } -}; - + + const size_t MaxTaskCount_; +}; + +class TTaskScheduler::ITask + : public TAtomicRefCount<ITask> +{ +public: + virtual ~ITask(); + + virtual TInstant Process() {//returns time to repeat this task + return TInstant::Max(); + } +}; + +class TTaskScheduler::IRepeatedTask + : public TAtomicRefCount<IRepeatedTask> +{ +public: + virtual ~IRepeatedTask(); + + virtual bool Process() {//returns if to repeat task again + return false; + } +}; + |