aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/scheduler_actor.h
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/scheduler_actor.h
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/scheduler_actor.h')
-rw-r--r--library/cpp/messagebus/scheduler_actor.h85
1 files changed, 85 insertions, 0 deletions
diff --git a/library/cpp/messagebus/scheduler_actor.h b/library/cpp/messagebus/scheduler_actor.h
new file mode 100644
index 0000000000..d0c23c94c4
--- /dev/null
+++ b/library/cpp/messagebus/scheduler_actor.h
@@ -0,0 +1,85 @@
+#pragma once
+
+#include "local_tasks.h"
+
+#include <library/cpp/messagebus/actor/actor.h>
+#include <library/cpp/messagebus/actor/what_thread_does_guard.h>
+#include <library/cpp/messagebus/scheduler/scheduler.h>
+
+#include <util/system/mutex.h>
+
+namespace NBus {
+ namespace NPrivate {
+ template <typename TThis, typename TTag = NActor::TDefaultTag>
+ class TScheduleActor {
+ typedef NActor::TActor<TThis, TTag> TActorForMe;
+
+ private:
+ TScheduler* const Scheduler;
+
+ TMutex Mutex;
+
+ TInstant ScheduleTime;
+
+ public:
+ TLocalTasks Alarm;
+
+ private:
+ struct TScheduleItemImpl: public IScheduleItem {
+ TIntrusivePtr<TThis> Thiz;
+
+ TScheduleItemImpl(TIntrusivePtr<TThis> thiz, TInstant when)
+ : IScheduleItem(when)
+ , Thiz(thiz)
+ {
+ }
+
+ void Do() override {
+ {
+ TWhatThreadDoesAcquireGuard<TMutex> guard(Thiz->Mutex, "scheduler actor: acquiring lock for Do");
+
+ if (Thiz->ScheduleTime == TInstant::Max()) {
+ // was already fired
+ return;
+ }
+
+ Thiz->ScheduleTime = TInstant::Max();
+ }
+
+ Thiz->Alarm.AddTask();
+ Thiz->GetActorForMe()->Schedule();
+ }
+ };
+
+ public:
+ TScheduleActor(TScheduler* scheduler)
+ : Scheduler(scheduler)
+ , ScheduleTime(TInstant::Max())
+ {
+ }
+
+ /// call Act(TTag) at specified time, unless it is already scheduled at earlier time.
+ void ScheduleAt(TInstant when) {
+ TWhatThreadDoesAcquireGuard<TMutex> guard(Mutex, "scheduler: acquiring lock for ScheduleAt");
+
+ if (when > ScheduleTime) {
+ // already scheduled
+ return;
+ }
+
+ ScheduleTime = when;
+ Scheduler->Schedule(new TScheduleItemImpl(GetThis(), when));
+ }
+
+ private:
+ TThis* GetThis() {
+ return static_cast<TThis*>(this);
+ }
+
+ TActorForMe* GetActorForMe() {
+ return static_cast<TActorForMe*>(GetThis());
+ }
+ };
+
+ }
+}