diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | c2a1af049e9deca890e9923abe64fe6c59060348 (patch) | |
tree | b222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/scheduler | |
parent | 1f553f46fb4f3c5eec631352cdd900a0709016af (diff) | |
download | ydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/scheduler')
-rw-r--r-- | library/cpp/messagebus/scheduler/scheduler.cpp | 128 | ||||
-rw-r--r-- | library/cpp/messagebus/scheduler/scheduler.h | 12 | ||||
-rw-r--r-- | library/cpp/messagebus/scheduler/scheduler_ut.cpp | 58 | ||||
-rw-r--r-- | library/cpp/messagebus/scheduler/ya.make | 16 |
4 files changed, 107 insertions, 107 deletions
diff --git a/library/cpp/messagebus/scheduler/scheduler.cpp b/library/cpp/messagebus/scheduler/scheduler.cpp index 8c966da86d..5a5fe52894 100644 --- a/library/cpp/messagebus/scheduler/scheduler.cpp +++ b/library/cpp/messagebus/scheduler/scheduler.cpp @@ -5,19 +5,19 @@ #include <util/generic/yexception.h> //#include "dummy_debugger.h" - -using namespace NBus; -using namespace NBus::NPrivate; + +using namespace NBus; +using namespace NBus::NPrivate; class TScheduleDeadlineCompare { public: bool operator()(const IScheduleItemAutoPtr& i1, const IScheduleItemAutoPtr& i2) const noexcept { - return i1->GetScheduleTime() > i2->GetScheduleTime(); + return i1->GetScheduleTime() > i2->GetScheduleTime(); } }; TScheduler::TScheduler() - : StopThread(false) + : StopThread(false) , Thread([&] { this->SchedulerThread(); }) { } @@ -32,13 +32,13 @@ size_t TScheduler::Size() const { } void TScheduler::Stop() { - { - TGuard<TLock> guard(Lock); + { + TGuard<TLock> guard(Lock); Y_VERIFY(!StopThread, "Scheduler already stopped"); - StopThread = true; - CondVar.Signal(); - } - Thread.Get(); + StopThread = true; + CondVar.Signal(); + } + Thread.Get(); if (!!NextItem) { NextItem.Destroy(); @@ -50,70 +50,70 @@ void TScheduler::Stop() { } void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) { - TGuard<TLock> lock(Lock); - if (StopThread) + TGuard<TLock> lock(Lock); + if (StopThread) return; - - if (!!NextItem) { - if (i->GetScheduleTime() < NextItem->GetScheduleTime()) { - DoSwap(i, NextItem); - } + + if (!!NextItem) { + if (i->GetScheduleTime() < NextItem->GetScheduleTime()) { + DoSwap(i, NextItem); + } } - + Items.push_back(i); - PushHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); - - FillNextItem(); - - CondVar.Signal(); + PushHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); + + FillNextItem(); + + CondVar.Signal(); +} + +void TScheduler::FillNextItem() { + if (!NextItem && !Items.empty()) { + PopHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); + NextItem = Items.back(); + Items.erase(Items.end() - 1); + } } -void TScheduler::FillNextItem() { - if (!NextItem && !Items.empty()) { - PopHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); - NextItem = Items.back(); - Items.erase(Items.end() - 1); - } -} - void TScheduler::SchedulerThread() { - for (;;) { - IScheduleItemAutoPtr current; - + for (;;) { + IScheduleItemAutoPtr current; + { - TGuard<TLock> guard(Lock); - - if (StopThread) { - break; + TGuard<TLock> guard(Lock); + + if (StopThread) { + break; + } + + if (!!NextItem) { + CondVar.WaitD(Lock, NextItem->GetScheduleTime()); + } else { + CondVar.WaitI(Lock); } - - if (!!NextItem) { - CondVar.WaitD(Lock, NextItem->GetScheduleTime()); - } else { - CondVar.WaitI(Lock); + + if (StopThread) { + break; } - - if (StopThread) { - break; - } - - // signal comes if either scheduler is to be stopped of there's work to do + + // signal comes if either scheduler is to be stopped of there's work to do Y_VERIFY(!!NextItem, "state check"); - - if (TInstant::Now() < NextItem->GetScheduleTime()) { - // NextItem is updated since WaitD - continue; - } - + + if (TInstant::Now() < NextItem->GetScheduleTime()) { + // NextItem is updated since WaitD + continue; + } + current = NextItem.Release(); } - - current->Do(); - current.Destroy(); - - { - TGuard<TLock> guard(Lock); - FillNextItem(); - } + + current->Do(); + current.Destroy(); + + { + TGuard<TLock> guard(Lock); + FillNextItem(); + } } } diff --git a/library/cpp/messagebus/scheduler/scheduler.h b/library/cpp/messagebus/scheduler/scheduler.h index 6114c3cc88..afcc0de55d 100644 --- a/library/cpp/messagebus/scheduler/scheduler.h +++ b/library/cpp/messagebus/scheduler/scheduler.h @@ -8,7 +8,7 @@ #include <util/generic/vector.h> #include <util/system/atomic.h> #include <util/system/condvar.h> -#include <util/system/mutex.h> +#include <util/system/mutex.h> #include <util/system/thread.h> namespace NBus { @@ -18,10 +18,10 @@ namespace NBus { inline IScheduleItem(TInstant scheduleTime) noexcept; virtual ~IScheduleItem() { } - + virtual void Do() = 0; inline TInstant GetScheduleTime() const noexcept; - + private: TInstant ScheduleTime; }; @@ -50,16 +50,16 @@ namespace NBus { TCondVar CondVar; TObjectCounter<TScheduler> ObjectCounter; - + bool StopThread; NThreading::TLegacyFuture<> Thread; }; - + inline IScheduleItem::IScheduleItem(TInstant scheduleTime) noexcept : ScheduleTime(scheduleTime) { } - + inline TInstant IScheduleItem::GetScheduleTime() const noexcept { return ScheduleTime; } diff --git a/library/cpp/messagebus/scheduler/scheduler_ut.cpp b/library/cpp/messagebus/scheduler/scheduler_ut.cpp index 35fcccdd29..a5ea641c10 100644 --- a/library/cpp/messagebus/scheduler/scheduler_ut.cpp +++ b/library/cpp/messagebus/scheduler/scheduler_ut.cpp @@ -1,36 +1,36 @@ #include <library/cpp/testing/unittest/registar.h> - -#include "scheduler.h" - + +#include "scheduler.h" + #include <library/cpp/messagebus/misc/test_sync.h> - -using namespace NBus; -using namespace NBus::NPrivate; - + +using namespace NBus; +using namespace NBus::NPrivate; + Y_UNIT_TEST_SUITE(TSchedulerTests) { - struct TSimpleScheduleItem: public IScheduleItem { - TTestSync* const TestSync; - - TSimpleScheduleItem(TTestSync* testSync) - : IScheduleItem((TInstant::Now() + TDuration::MilliSeconds(1))) - , TestSync(testSync) + struct TSimpleScheduleItem: public IScheduleItem { + TTestSync* const TestSync; + + TSimpleScheduleItem(TTestSync* testSync) + : IScheduleItem((TInstant::Now() + TDuration::MilliSeconds(1))) + , TestSync(testSync) { } - + void Do() override { - TestSync->WaitForAndIncrement(0); - } - }; - + TestSync->WaitForAndIncrement(0); + } + }; + Y_UNIT_TEST(Simple) { - TTestSync testSync; - - TScheduler scheduler; - - scheduler.Schedule(new TSimpleScheduleItem(&testSync)); - - testSync.WaitForAndIncrement(1); - - scheduler.Stop(); - } -} + TTestSync testSync; + + TScheduler scheduler; + + scheduler.Schedule(new TSimpleScheduleItem(&testSync)); + + testSync.WaitForAndIncrement(1); + + scheduler.Stop(); + } +} diff --git a/library/cpp/messagebus/scheduler/ya.make b/library/cpp/messagebus/scheduler/ya.make index 382804c408..dcb7408a20 100644 --- a/library/cpp/messagebus/scheduler/ya.make +++ b/library/cpp/messagebus/scheduler/ya.make @@ -1,13 +1,13 @@ -LIBRARY() - +LIBRARY() + OWNER(g:messagebus) - + PEERDIR( library/cpp/threading/future ) -SRCS( - scheduler.cpp -) - -END() +SRCS( + scheduler.cpp +) + +END() |