diff options
author | somov <somov@yandex-team.ru> | 2022-02-10 16:45:47 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:47 +0300 |
commit | a5950576e397b1909261050b8c7da16db58f10b1 (patch) | |
tree | 7ba7677f6a4c3e19e2cefab34d16df2c8963b4d4 /library/cpp/messagebus/scheduler | |
parent | 81eddc8c0b55990194e112b02d127b87d54164a9 (diff) | |
download | ydb-a5950576e397b1909261050b8c7da16db58f10b1.tar.gz |
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/scheduler')
-rw-r--r-- | library/cpp/messagebus/scheduler/scheduler.cpp | 72 | ||||
-rw-r--r-- | library/cpp/messagebus/scheduler/scheduler.h | 32 |
2 files changed, 52 insertions, 52 deletions
diff --git a/library/cpp/messagebus/scheduler/scheduler.cpp b/library/cpp/messagebus/scheduler/scheduler.cpp index 5a5fe52894..5c0686d32a 100644 --- a/library/cpp/messagebus/scheduler/scheduler.cpp +++ b/library/cpp/messagebus/scheduler/scheduler.cpp @@ -1,37 +1,37 @@ -#include "scheduler.h" - -#include <util/datetime/base.h> -#include <util/generic/algorithm.h> -#include <util/generic/yexception.h> - +#include "scheduler.h" + +#include <util/datetime/base.h> +#include <util/generic/algorithm.h> +#include <util/generic/yexception.h> + //#include "dummy_debugger.h" using namespace NBus; using namespace NBus::NPrivate; - -class TScheduleDeadlineCompare { -public: + +class TScheduleDeadlineCompare { +public: bool operator()(const IScheduleItemAutoPtr& i1, const IScheduleItemAutoPtr& i2) const noexcept { return i1->GetScheduleTime() > i2->GetScheduleTime(); - } -}; - -TScheduler::TScheduler() + } +}; + +TScheduler::TScheduler() : StopThread(false) , Thread([&] { this->SchedulerThread(); }) -{ -} - -TScheduler::~TScheduler() { +{ +} + +TScheduler::~TScheduler() { Y_VERIFY(StopThread, "state check"); -} - +} + size_t TScheduler::Size() const { TGuard<TLock> guard(Lock); return Items.size() + (!!NextItem ? 1 : 0); } -void TScheduler::Stop() { +void TScheduler::Stop() { { TGuard<TLock> guard(Lock); Y_VERIFY(!StopThread, "Scheduler already stopped"); @@ -46,28 +46,28 @@ void TScheduler::Stop() { for (auto& item : Items) { item.Destroy(); - } -} - -void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) { + } +} + +void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) { TGuard<TLock> lock(Lock); if (StopThread) - return; + return; if (!!NextItem) { if (i->GetScheduleTime() < NextItem->GetScheduleTime()) { DoSwap(i, NextItem); } - } + } - Items.push_back(i); + Items.push_back(i); PushHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); FillNextItem(); CondVar.Signal(); -} - +} + void TScheduler::FillNextItem() { if (!NextItem && !Items.empty()) { PopHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); @@ -76,22 +76,22 @@ void TScheduler::FillNextItem() { } } -void TScheduler::SchedulerThread() { +void TScheduler::SchedulerThread() { for (;;) { IScheduleItemAutoPtr current; - { + { TGuard<TLock> guard(Lock); if (StopThread) { break; - } + } if (!!NextItem) { CondVar.WaitD(Lock, NextItem->GetScheduleTime()); } else { CondVar.WaitI(Lock); - } + } if (StopThread) { break; @@ -106,7 +106,7 @@ void TScheduler::SchedulerThread() { } current = NextItem.Release(); - } + } current->Do(); current.Destroy(); @@ -115,5 +115,5 @@ void TScheduler::SchedulerThread() { TGuard<TLock> guard(Lock); FillNextItem(); } - } -} + } +} diff --git a/library/cpp/messagebus/scheduler/scheduler.h b/library/cpp/messagebus/scheduler/scheduler.h index afcc0de55d..996bf30f8c 100644 --- a/library/cpp/messagebus/scheduler/scheduler.h +++ b/library/cpp/messagebus/scheduler/scheduler.h @@ -1,16 +1,16 @@ -#pragma once - +#pragma once + #include <library/cpp/threading/future/legacy_future.h> #include <util/datetime/base.h> #include <util/generic/object_counter.h> -#include <util/generic/ptr.h> -#include <util/generic/vector.h> -#include <util/system/atomic.h> +#include <util/generic/ptr.h> +#include <util/generic/vector.h> +#include <util/system/atomic.h> #include <util/system/condvar.h> #include <util/system/mutex.h> -#include <util/system/thread.h> - +#include <util/system/thread.h> + namespace NBus { namespace NPrivate { class IScheduleItem { @@ -25,30 +25,30 @@ namespace NBus { private: TInstant ScheduleTime; }; - + using IScheduleItemAutoPtr = TAutoPtr<IScheduleItem>; - + class TScheduler { public: TScheduler(); ~TScheduler(); void Stop(); void Schedule(TAutoPtr<IScheduleItem> i); - + size_t Size() const; - + private: void SchedulerThread(); - + void FillNextItem(); - + private: TVector<IScheduleItemAutoPtr> Items; IScheduleItemAutoPtr NextItem; typedef TMutex TLock; TLock Lock; TCondVar CondVar; - + TObjectCounter<TScheduler> ObjectCounter; bool StopThread; @@ -63,6 +63,6 @@ namespace NBus { inline TInstant IScheduleItem::GetScheduleTime() const noexcept { return ScheduleTime; } - + } -} +} |