aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/scheduler
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commitc2a1af049e9deca890e9923abe64fe6c59060348 (patch)
treeb222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/scheduler
parent1f553f46fb4f3c5eec631352cdd900a0709016af (diff)
downloadydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/scheduler')
-rw-r--r--library/cpp/messagebus/scheduler/scheduler.cpp128
-rw-r--r--library/cpp/messagebus/scheduler/scheduler.h12
-rw-r--r--library/cpp/messagebus/scheduler/scheduler_ut.cpp58
-rw-r--r--library/cpp/messagebus/scheduler/ya.make16
4 files changed, 107 insertions, 107 deletions
diff --git a/library/cpp/messagebus/scheduler/scheduler.cpp b/library/cpp/messagebus/scheduler/scheduler.cpp
index 8c966da86d..5a5fe52894 100644
--- a/library/cpp/messagebus/scheduler/scheduler.cpp
+++ b/library/cpp/messagebus/scheduler/scheduler.cpp
@@ -5,19 +5,19 @@
#include <util/generic/yexception.h>
//#include "dummy_debugger.h"
-
-using namespace NBus;
-using namespace NBus::NPrivate;
+
+using namespace NBus;
+using namespace NBus::NPrivate;
class TScheduleDeadlineCompare {
public:
bool operator()(const IScheduleItemAutoPtr& i1, const IScheduleItemAutoPtr& i2) const noexcept {
- return i1->GetScheduleTime() > i2->GetScheduleTime();
+ return i1->GetScheduleTime() > i2->GetScheduleTime();
}
};
TScheduler::TScheduler()
- : StopThread(false)
+ : StopThread(false)
, Thread([&] { this->SchedulerThread(); })
{
}
@@ -32,13 +32,13 @@ size_t TScheduler::Size() const {
}
void TScheduler::Stop() {
- {
- TGuard<TLock> guard(Lock);
+ {
+ TGuard<TLock> guard(Lock);
Y_VERIFY(!StopThread, "Scheduler already stopped");
- StopThread = true;
- CondVar.Signal();
- }
- Thread.Get();
+ StopThread = true;
+ CondVar.Signal();
+ }
+ Thread.Get();
if (!!NextItem) {
NextItem.Destroy();
@@ -50,70 +50,70 @@ void TScheduler::Stop() {
}
void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) {
- TGuard<TLock> lock(Lock);
- if (StopThread)
+ TGuard<TLock> lock(Lock);
+ if (StopThread)
return;
-
- if (!!NextItem) {
- if (i->GetScheduleTime() < NextItem->GetScheduleTime()) {
- DoSwap(i, NextItem);
- }
+
+ if (!!NextItem) {
+ if (i->GetScheduleTime() < NextItem->GetScheduleTime()) {
+ DoSwap(i, NextItem);
+ }
}
-
+
Items.push_back(i);
- PushHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare());
-
- FillNextItem();
-
- CondVar.Signal();
+ PushHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare());
+
+ FillNextItem();
+
+ CondVar.Signal();
+}
+
+void TScheduler::FillNextItem() {
+ if (!NextItem && !Items.empty()) {
+ PopHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare());
+ NextItem = Items.back();
+ Items.erase(Items.end() - 1);
+ }
}
-void TScheduler::FillNextItem() {
- if (!NextItem && !Items.empty()) {
- PopHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare());
- NextItem = Items.back();
- Items.erase(Items.end() - 1);
- }
-}
-
void TScheduler::SchedulerThread() {
- for (;;) {
- IScheduleItemAutoPtr current;
-
+ for (;;) {
+ IScheduleItemAutoPtr current;
+
{
- TGuard<TLock> guard(Lock);
-
- if (StopThread) {
- break;
+ TGuard<TLock> guard(Lock);
+
+ if (StopThread) {
+ break;
+ }
+
+ if (!!NextItem) {
+ CondVar.WaitD(Lock, NextItem->GetScheduleTime());
+ } else {
+ CondVar.WaitI(Lock);
}
-
- if (!!NextItem) {
- CondVar.WaitD(Lock, NextItem->GetScheduleTime());
- } else {
- CondVar.WaitI(Lock);
+
+ if (StopThread) {
+ break;
}
-
- if (StopThread) {
- break;
- }
-
- // signal comes if either scheduler is to be stopped of there's work to do
+
+ // signal comes if either scheduler is to be stopped of there's work to do
Y_VERIFY(!!NextItem, "state check");
-
- if (TInstant::Now() < NextItem->GetScheduleTime()) {
- // NextItem is updated since WaitD
- continue;
- }
-
+
+ if (TInstant::Now() < NextItem->GetScheduleTime()) {
+ // NextItem is updated since WaitD
+ continue;
+ }
+
current = NextItem.Release();
}
-
- current->Do();
- current.Destroy();
-
- {
- TGuard<TLock> guard(Lock);
- FillNextItem();
- }
+
+ current->Do();
+ current.Destroy();
+
+ {
+ TGuard<TLock> guard(Lock);
+ FillNextItem();
+ }
}
}
diff --git a/library/cpp/messagebus/scheduler/scheduler.h b/library/cpp/messagebus/scheduler/scheduler.h
index 6114c3cc88..afcc0de55d 100644
--- a/library/cpp/messagebus/scheduler/scheduler.h
+++ b/library/cpp/messagebus/scheduler/scheduler.h
@@ -8,7 +8,7 @@
#include <util/generic/vector.h>
#include <util/system/atomic.h>
#include <util/system/condvar.h>
-#include <util/system/mutex.h>
+#include <util/system/mutex.h>
#include <util/system/thread.h>
namespace NBus {
@@ -18,10 +18,10 @@ namespace NBus {
inline IScheduleItem(TInstant scheduleTime) noexcept;
virtual ~IScheduleItem() {
}
-
+
virtual void Do() = 0;
inline TInstant GetScheduleTime() const noexcept;
-
+
private:
TInstant ScheduleTime;
};
@@ -50,16 +50,16 @@ namespace NBus {
TCondVar CondVar;
TObjectCounter<TScheduler> ObjectCounter;
-
+
bool StopThread;
NThreading::TLegacyFuture<> Thread;
};
-
+
inline IScheduleItem::IScheduleItem(TInstant scheduleTime) noexcept
: ScheduleTime(scheduleTime)
{
}
-
+
inline TInstant IScheduleItem::GetScheduleTime() const noexcept {
return ScheduleTime;
}
diff --git a/library/cpp/messagebus/scheduler/scheduler_ut.cpp b/library/cpp/messagebus/scheduler/scheduler_ut.cpp
index 35fcccdd29..a5ea641c10 100644
--- a/library/cpp/messagebus/scheduler/scheduler_ut.cpp
+++ b/library/cpp/messagebus/scheduler/scheduler_ut.cpp
@@ -1,36 +1,36 @@
#include <library/cpp/testing/unittest/registar.h>
-
-#include "scheduler.h"
-
+
+#include "scheduler.h"
+
#include <library/cpp/messagebus/misc/test_sync.h>
-
-using namespace NBus;
-using namespace NBus::NPrivate;
-
+
+using namespace NBus;
+using namespace NBus::NPrivate;
+
Y_UNIT_TEST_SUITE(TSchedulerTests) {
- struct TSimpleScheduleItem: public IScheduleItem {
- TTestSync* const TestSync;
-
- TSimpleScheduleItem(TTestSync* testSync)
- : IScheduleItem((TInstant::Now() + TDuration::MilliSeconds(1)))
- , TestSync(testSync)
+ struct TSimpleScheduleItem: public IScheduleItem {
+ TTestSync* const TestSync;
+
+ TSimpleScheduleItem(TTestSync* testSync)
+ : IScheduleItem((TInstant::Now() + TDuration::MilliSeconds(1)))
+ , TestSync(testSync)
{
}
-
+
void Do() override {
- TestSync->WaitForAndIncrement(0);
- }
- };
-
+ TestSync->WaitForAndIncrement(0);
+ }
+ };
+
Y_UNIT_TEST(Simple) {
- TTestSync testSync;
-
- TScheduler scheduler;
-
- scheduler.Schedule(new TSimpleScheduleItem(&testSync));
-
- testSync.WaitForAndIncrement(1);
-
- scheduler.Stop();
- }
-}
+ TTestSync testSync;
+
+ TScheduler scheduler;
+
+ scheduler.Schedule(new TSimpleScheduleItem(&testSync));
+
+ testSync.WaitForAndIncrement(1);
+
+ scheduler.Stop();
+ }
+}
diff --git a/library/cpp/messagebus/scheduler/ya.make b/library/cpp/messagebus/scheduler/ya.make
index 382804c408..dcb7408a20 100644
--- a/library/cpp/messagebus/scheduler/ya.make
+++ b/library/cpp/messagebus/scheduler/ya.make
@@ -1,13 +1,13 @@
-LIBRARY()
-
+LIBRARY()
+
OWNER(g:messagebus)
-
+
PEERDIR(
library/cpp/threading/future
)
-SRCS(
- scheduler.cpp
-)
-
-END()
+SRCS(
+ scheduler.cpp
+)
+
+END()