diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/scheduler | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/scheduler')
-rw-r--r-- | library/cpp/messagebus/scheduler/scheduler.cpp | 119 | ||||
-rw-r--r-- | library/cpp/messagebus/scheduler/scheduler.h | 68 | ||||
-rw-r--r-- | library/cpp/messagebus/scheduler/scheduler_ut.cpp | 36 | ||||
-rw-r--r-- | library/cpp/messagebus/scheduler/ya.make | 13 |
4 files changed, 236 insertions, 0 deletions
diff --git a/library/cpp/messagebus/scheduler/scheduler.cpp b/library/cpp/messagebus/scheduler/scheduler.cpp new file mode 100644 index 00000000000..5a5fe528943 --- /dev/null +++ b/library/cpp/messagebus/scheduler/scheduler.cpp @@ -0,0 +1,119 @@ +#include "scheduler.h" + +#include <util/datetime/base.h> +#include <util/generic/algorithm.h> +#include <util/generic/yexception.h> + +//#include "dummy_debugger.h" + +using namespace NBus; +using namespace NBus::NPrivate; + +class TScheduleDeadlineCompare { +public: + bool operator()(const IScheduleItemAutoPtr& i1, const IScheduleItemAutoPtr& i2) const noexcept { + return i1->GetScheduleTime() > i2->GetScheduleTime(); + } +}; + +TScheduler::TScheduler() + : StopThread(false) + , Thread([&] { this->SchedulerThread(); }) +{ +} + +TScheduler::~TScheduler() { + Y_VERIFY(StopThread, "state check"); +} + +size_t TScheduler::Size() const { + TGuard<TLock> guard(Lock); + return Items.size() + (!!NextItem ? 1 : 0); +} + +void TScheduler::Stop() { + { + TGuard<TLock> guard(Lock); + Y_VERIFY(!StopThread, "Scheduler already stopped"); + StopThread = true; + CondVar.Signal(); + } + Thread.Get(); + + if (!!NextItem) { + NextItem.Destroy(); + } + + for (auto& item : Items) { + item.Destroy(); + } +} + +void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) { + TGuard<TLock> lock(Lock); + if (StopThread) + return; + + if (!!NextItem) { + if (i->GetScheduleTime() < NextItem->GetScheduleTime()) { + DoSwap(i, NextItem); + } + } + + Items.push_back(i); + PushHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); + + FillNextItem(); + + CondVar.Signal(); +} + +void TScheduler::FillNextItem() { + if (!NextItem && !Items.empty()) { + PopHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); + NextItem = Items.back(); + Items.erase(Items.end() - 1); + } +} + +void TScheduler::SchedulerThread() { + for (;;) { + IScheduleItemAutoPtr current; + + { + TGuard<TLock> guard(Lock); + + if (StopThread) { + break; + } + + if (!!NextItem) { + CondVar.WaitD(Lock, NextItem->GetScheduleTime()); + } else { + CondVar.WaitI(Lock); + } + + if (StopThread) { + break; + } + + // signal comes if either scheduler is to be stopped of there's work to do + Y_VERIFY(!!NextItem, "state check"); + + if (TInstant::Now() < NextItem->GetScheduleTime()) { + // NextItem is updated since WaitD + continue; + } + + current = NextItem.Release(); + } + + current->Do(); + current.Destroy(); + + { + TGuard<TLock> guard(Lock); + FillNextItem(); + } + } +} diff --git a/library/cpp/messagebus/scheduler/scheduler.h b/library/cpp/messagebus/scheduler/scheduler.h new file mode 100644 index 00000000000..afcc0de55d5 --- /dev/null +++ b/library/cpp/messagebus/scheduler/scheduler.h @@ -0,0 +1,68 @@ +#pragma once + +#include <library/cpp/threading/future/legacy_future.h> + +#include <util/datetime/base.h> +#include <util/generic/object_counter.h> +#include <util/generic/ptr.h> +#include <util/generic/vector.h> +#include <util/system/atomic.h> +#include <util/system/condvar.h> +#include <util/system/mutex.h> +#include <util/system/thread.h> + +namespace NBus { + namespace NPrivate { + class IScheduleItem { + public: + inline IScheduleItem(TInstant scheduleTime) noexcept; + virtual ~IScheduleItem() { + } + + virtual void Do() = 0; + inline TInstant GetScheduleTime() const noexcept; + + private: + TInstant ScheduleTime; + }; + + using IScheduleItemAutoPtr = TAutoPtr<IScheduleItem>; + + class TScheduler { + public: + TScheduler(); + ~TScheduler(); + void Stop(); + void Schedule(TAutoPtr<IScheduleItem> i); + + size_t Size() const; + + private: + void SchedulerThread(); + + void FillNextItem(); + + private: + TVector<IScheduleItemAutoPtr> Items; + IScheduleItemAutoPtr NextItem; + typedef TMutex TLock; + TLock Lock; + TCondVar CondVar; + + TObjectCounter<TScheduler> ObjectCounter; + + bool StopThread; + NThreading::TLegacyFuture<> Thread; + }; + + inline IScheduleItem::IScheduleItem(TInstant scheduleTime) noexcept + : ScheduleTime(scheduleTime) + { + } + + inline TInstant IScheduleItem::GetScheduleTime() const noexcept { + return ScheduleTime; + } + + } +} diff --git a/library/cpp/messagebus/scheduler/scheduler_ut.cpp b/library/cpp/messagebus/scheduler/scheduler_ut.cpp new file mode 100644 index 00000000000..a5ea641c108 --- /dev/null +++ b/library/cpp/messagebus/scheduler/scheduler_ut.cpp @@ -0,0 +1,36 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include "scheduler.h" + +#include <library/cpp/messagebus/misc/test_sync.h> + +using namespace NBus; +using namespace NBus::NPrivate; + +Y_UNIT_TEST_SUITE(TSchedulerTests) { + struct TSimpleScheduleItem: public IScheduleItem { + TTestSync* const TestSync; + + TSimpleScheduleItem(TTestSync* testSync) + : IScheduleItem((TInstant::Now() + TDuration::MilliSeconds(1))) + , TestSync(testSync) + { + } + + void Do() override { + TestSync->WaitForAndIncrement(0); + } + }; + + Y_UNIT_TEST(Simple) { + TTestSync testSync; + + TScheduler scheduler; + + scheduler.Schedule(new TSimpleScheduleItem(&testSync)); + + testSync.WaitForAndIncrement(1); + + scheduler.Stop(); + } +} diff --git a/library/cpp/messagebus/scheduler/ya.make b/library/cpp/messagebus/scheduler/ya.make new file mode 100644 index 00000000000..dcb7408a203 --- /dev/null +++ b/library/cpp/messagebus/scheduler/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +OWNER(g:messagebus) + +PEERDIR( + library/cpp/threading/future +) + +SRCS( + scheduler.cpp +) + +END() |