aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/scheduler
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/scheduler
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/scheduler')
-rw-r--r--library/cpp/messagebus/scheduler/scheduler.cpp119
-rw-r--r--library/cpp/messagebus/scheduler/scheduler.h68
-rw-r--r--library/cpp/messagebus/scheduler/scheduler_ut.cpp36
-rw-r--r--library/cpp/messagebus/scheduler/ya.make13
4 files changed, 236 insertions, 0 deletions
diff --git a/library/cpp/messagebus/scheduler/scheduler.cpp b/library/cpp/messagebus/scheduler/scheduler.cpp
new file mode 100644
index 00000000000..5a5fe528943
--- /dev/null
+++ b/library/cpp/messagebus/scheduler/scheduler.cpp
@@ -0,0 +1,119 @@
+#include "scheduler.h"
+
+#include <util/datetime/base.h>
+#include <util/generic/algorithm.h>
+#include <util/generic/yexception.h>
+
+//#include "dummy_debugger.h"
+
+using namespace NBus;
+using namespace NBus::NPrivate;
+
+class TScheduleDeadlineCompare {
+public:
+ bool operator()(const IScheduleItemAutoPtr& i1, const IScheduleItemAutoPtr& i2) const noexcept {
+ return i1->GetScheduleTime() > i2->GetScheduleTime();
+ }
+};
+
+TScheduler::TScheduler()
+ : StopThread(false)
+ , Thread([&] { this->SchedulerThread(); })
+{
+}
+
+TScheduler::~TScheduler() {
+ Y_VERIFY(StopThread, "state check");
+}
+
+size_t TScheduler::Size() const {
+ TGuard<TLock> guard(Lock);
+ return Items.size() + (!!NextItem ? 1 : 0);
+}
+
+void TScheduler::Stop() {
+ {
+ TGuard<TLock> guard(Lock);
+ Y_VERIFY(!StopThread, "Scheduler already stopped");
+ StopThread = true;
+ CondVar.Signal();
+ }
+ Thread.Get();
+
+ if (!!NextItem) {
+ NextItem.Destroy();
+ }
+
+ for (auto& item : Items) {
+ item.Destroy();
+ }
+}
+
+void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) {
+ TGuard<TLock> lock(Lock);
+ if (StopThread)
+ return;
+
+ if (!!NextItem) {
+ if (i->GetScheduleTime() < NextItem->GetScheduleTime()) {
+ DoSwap(i, NextItem);
+ }
+ }
+
+ Items.push_back(i);
+ PushHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare());
+
+ FillNextItem();
+
+ CondVar.Signal();
+}
+
+void TScheduler::FillNextItem() {
+ if (!NextItem && !Items.empty()) {
+ PopHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare());
+ NextItem = Items.back();
+ Items.erase(Items.end() - 1);
+ }
+}
+
+void TScheduler::SchedulerThread() {
+ for (;;) {
+ IScheduleItemAutoPtr current;
+
+ {
+ TGuard<TLock> guard(Lock);
+
+ if (StopThread) {
+ break;
+ }
+
+ if (!!NextItem) {
+ CondVar.WaitD(Lock, NextItem->GetScheduleTime());
+ } else {
+ CondVar.WaitI(Lock);
+ }
+
+ if (StopThread) {
+ break;
+ }
+
+ // signal comes if either scheduler is to be stopped of there's work to do
+ Y_VERIFY(!!NextItem, "state check");
+
+ if (TInstant::Now() < NextItem->GetScheduleTime()) {
+ // NextItem is updated since WaitD
+ continue;
+ }
+
+ current = NextItem.Release();
+ }
+
+ current->Do();
+ current.Destroy();
+
+ {
+ TGuard<TLock> guard(Lock);
+ FillNextItem();
+ }
+ }
+}
diff --git a/library/cpp/messagebus/scheduler/scheduler.h b/library/cpp/messagebus/scheduler/scheduler.h
new file mode 100644
index 00000000000..afcc0de55d5
--- /dev/null
+++ b/library/cpp/messagebus/scheduler/scheduler.h
@@ -0,0 +1,68 @@
+#pragma once
+
+#include <library/cpp/threading/future/legacy_future.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/object_counter.h>
+#include <util/generic/ptr.h>
+#include <util/generic/vector.h>
+#include <util/system/atomic.h>
+#include <util/system/condvar.h>
+#include <util/system/mutex.h>
+#include <util/system/thread.h>
+
+namespace NBus {
+ namespace NPrivate {
+ class IScheduleItem {
+ public:
+ inline IScheduleItem(TInstant scheduleTime) noexcept;
+ virtual ~IScheduleItem() {
+ }
+
+ virtual void Do() = 0;
+ inline TInstant GetScheduleTime() const noexcept;
+
+ private:
+ TInstant ScheduleTime;
+ };
+
+ using IScheduleItemAutoPtr = TAutoPtr<IScheduleItem>;
+
+ class TScheduler {
+ public:
+ TScheduler();
+ ~TScheduler();
+ void Stop();
+ void Schedule(TAutoPtr<IScheduleItem> i);
+
+ size_t Size() const;
+
+ private:
+ void SchedulerThread();
+
+ void FillNextItem();
+
+ private:
+ TVector<IScheduleItemAutoPtr> Items;
+ IScheduleItemAutoPtr NextItem;
+ typedef TMutex TLock;
+ TLock Lock;
+ TCondVar CondVar;
+
+ TObjectCounter<TScheduler> ObjectCounter;
+
+ bool StopThread;
+ NThreading::TLegacyFuture<> Thread;
+ };
+
+ inline IScheduleItem::IScheduleItem(TInstant scheduleTime) noexcept
+ : ScheduleTime(scheduleTime)
+ {
+ }
+
+ inline TInstant IScheduleItem::GetScheduleTime() const noexcept {
+ return ScheduleTime;
+ }
+
+ }
+}
diff --git a/library/cpp/messagebus/scheduler/scheduler_ut.cpp b/library/cpp/messagebus/scheduler/scheduler_ut.cpp
new file mode 100644
index 00000000000..a5ea641c108
--- /dev/null
+++ b/library/cpp/messagebus/scheduler/scheduler_ut.cpp
@@ -0,0 +1,36 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include "scheduler.h"
+
+#include <library/cpp/messagebus/misc/test_sync.h>
+
+using namespace NBus;
+using namespace NBus::NPrivate;
+
+Y_UNIT_TEST_SUITE(TSchedulerTests) {
+ struct TSimpleScheduleItem: public IScheduleItem {
+ TTestSync* const TestSync;
+
+ TSimpleScheduleItem(TTestSync* testSync)
+ : IScheduleItem((TInstant::Now() + TDuration::MilliSeconds(1)))
+ , TestSync(testSync)
+ {
+ }
+
+ void Do() override {
+ TestSync->WaitForAndIncrement(0);
+ }
+ };
+
+ Y_UNIT_TEST(Simple) {
+ TTestSync testSync;
+
+ TScheduler scheduler;
+
+ scheduler.Schedule(new TSimpleScheduleItem(&testSync));
+
+ testSync.WaitForAndIncrement(1);
+
+ scheduler.Stop();
+ }
+}
diff --git a/library/cpp/messagebus/scheduler/ya.make b/library/cpp/messagebus/scheduler/ya.make
new file mode 100644
index 00000000000..dcb7408a203
--- /dev/null
+++ b/library/cpp/messagebus/scheduler/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+OWNER(g:messagebus)
+
+PEERDIR(
+ library/cpp/threading/future
+)
+
+SRCS(
+ scheduler.cpp
+)
+
+END()