diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/scheduler/scheduler.cpp | |
parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
download | ydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/scheduler/scheduler.cpp')
-rw-r--r-- | library/cpp/messagebus/scheduler/scheduler.cpp | 128 |
1 files changed, 64 insertions, 64 deletions
diff --git a/library/cpp/messagebus/scheduler/scheduler.cpp b/library/cpp/messagebus/scheduler/scheduler.cpp index 5a5fe52894..8c966da86d 100644 --- a/library/cpp/messagebus/scheduler/scheduler.cpp +++ b/library/cpp/messagebus/scheduler/scheduler.cpp @@ -5,19 +5,19 @@ #include <util/generic/yexception.h> //#include "dummy_debugger.h" - -using namespace NBus; -using namespace NBus::NPrivate; + +using namespace NBus; +using namespace NBus::NPrivate; class TScheduleDeadlineCompare { public: bool operator()(const IScheduleItemAutoPtr& i1, const IScheduleItemAutoPtr& i2) const noexcept { - return i1->GetScheduleTime() > i2->GetScheduleTime(); + return i1->GetScheduleTime() > i2->GetScheduleTime(); } }; TScheduler::TScheduler() - : StopThread(false) + : StopThread(false) , Thread([&] { this->SchedulerThread(); }) { } @@ -32,13 +32,13 @@ size_t TScheduler::Size() const { } void TScheduler::Stop() { - { - TGuard<TLock> guard(Lock); + { + TGuard<TLock> guard(Lock); Y_VERIFY(!StopThread, "Scheduler already stopped"); - StopThread = true; - CondVar.Signal(); - } - Thread.Get(); + StopThread = true; + CondVar.Signal(); + } + Thread.Get(); if (!!NextItem) { NextItem.Destroy(); @@ -50,70 +50,70 @@ void TScheduler::Stop() { } void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) { - TGuard<TLock> lock(Lock); - if (StopThread) + TGuard<TLock> lock(Lock); + if (StopThread) return; - - if (!!NextItem) { - if (i->GetScheduleTime() < NextItem->GetScheduleTime()) { - DoSwap(i, NextItem); - } + + if (!!NextItem) { + if (i->GetScheduleTime() < NextItem->GetScheduleTime()) { + DoSwap(i, NextItem); + } } - + Items.push_back(i); - PushHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); - - FillNextItem(); - - CondVar.Signal(); -} - -void TScheduler::FillNextItem() { - if (!NextItem && !Items.empty()) { - PopHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); - NextItem = Items.back(); - Items.erase(Items.end() - 1); - } + PushHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); + + FillNextItem(); + + CondVar.Signal(); } +void TScheduler::FillNextItem() { + if (!NextItem && !Items.empty()) { + PopHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); + NextItem = Items.back(); + Items.erase(Items.end() - 1); + } +} + void TScheduler::SchedulerThread() { - for (;;) { - IScheduleItemAutoPtr current; - + for (;;) { + IScheduleItemAutoPtr current; + { - TGuard<TLock> guard(Lock); - - if (StopThread) { - break; - } - - if (!!NextItem) { - CondVar.WaitD(Lock, NextItem->GetScheduleTime()); - } else { - CondVar.WaitI(Lock); + TGuard<TLock> guard(Lock); + + if (StopThread) { + break; } - - if (StopThread) { - break; + + if (!!NextItem) { + CondVar.WaitD(Lock, NextItem->GetScheduleTime()); + } else { + CondVar.WaitI(Lock); } - - // signal comes if either scheduler is to be stopped of there's work to do + + if (StopThread) { + break; + } + + // signal comes if either scheduler is to be stopped of there's work to do Y_VERIFY(!!NextItem, "state check"); - - if (TInstant::Now() < NextItem->GetScheduleTime()) { - // NextItem is updated since WaitD - continue; - } - + + if (TInstant::Now() < NextItem->GetScheduleTime()) { + // NextItem is updated since WaitD + continue; + } + current = NextItem.Release(); } - - current->Do(); - current.Destroy(); - - { - TGuard<TLock> guard(Lock); - FillNextItem(); - } + + current->Do(); + current.Destroy(); + + { + TGuard<TLock> guard(Lock); + FillNextItem(); + } } } |