aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/actor/actor.h
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commitc2a1af049e9deca890e9923abe64fe6c59060348 (patch)
treeb222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/actor/actor.h
parent1f553f46fb4f3c5eec631352cdd900a0709016af (diff)
downloadydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/actor/actor.h')
-rw-r--r--library/cpp/messagebus/actor/actor.h100
1 files changed, 50 insertions, 50 deletions
diff --git a/library/cpp/messagebus/actor/actor.h b/library/cpp/messagebus/actor/actor.h
index 6c05d474ae..9b8f20298a 100644
--- a/library/cpp/messagebus/actor/actor.h
+++ b/library/cpp/messagebus/actor/actor.h
@@ -1,100 +1,100 @@
-#pragma once
-
+#pragma once
+
#include "executor.h"
-#include "tasks.h"
-#include "what_thread_does.h"
-
+#include "tasks.h"
+#include "what_thread_does.h"
+
#include <util/system/yassert.h>
-namespace NActor {
+namespace NActor {
class IActor: protected IWorkItem {
public:
// TODO: make private
TTasks Tasks;
-
+
public:
virtual void ScheduleHereV() = 0;
virtual void ScheduleV() = 0;
virtual void ScheduleHereAtMostOnceV() = 0;
-
+
// TODO: make private
virtual void RefV() = 0;
virtual void UnRefV() = 0;
-
+
// mute warnings
~IActor() override {
}
};
-
+
struct TDefaultTag {};
-
+
template <typename TThis, typename TTag = TDefaultTag>
class TActor: public IActor {
private:
TExecutor* const Executor;
-
+
public:
TActor(TExecutor* executor)
: Executor(executor)
{
}
-
+
void AddTaskFromActorLoop() {
bool schedule = Tasks.AddTask();
// TODO: check thread id
Y_ASSERT(!schedule);
}
-
+
/**
- * Schedule actor.
- *
- * If actor is sleeping, then actor will be executed right now.
- * If actor is executing right now, it will be executed one more time.
- * If this method is called multiple time, actor will be re-executed no more than one more time.
- */
+ * Schedule actor.
+ *
+ * If actor is sleeping, then actor will be executed right now.
+ * If actor is executing right now, it will be executed one more time.
+ * If this method is called multiple time, actor will be re-executed no more than one more time.
+ */
void Schedule() {
if (Tasks.AddTask()) {
EnqueueWork();
}
- }
-
+ }
+
/**
- * Schedule actor, execute it in current thread.
- *
- * If actor is running, continue executing where it is executing.
- * If actor is sleeping, execute it in current thread.
- *
- * Operation is useful for tasks that are likely to complete quickly.
- */
+ * Schedule actor, execute it in current thread.
+ *
+ * If actor is running, continue executing where it is executing.
+ * If actor is sleeping, execute it in current thread.
+ *
+ * Operation is useful for tasks that are likely to complete quickly.
+ */
void ScheduleHere() {
if (Tasks.AddTask()) {
Loop();
}
- }
-
+ }
+
/**
- * Schedule actor, execute in current thread no more than once.
- *
- * If actor is running, continue executing where it is executing.
- * If actor is sleeping, execute one iteration here, and if actor got new tasks,
- * reschedule it in worker pool.
- */
+ * Schedule actor, execute in current thread no more than once.
+ *
+ * If actor is running, continue executing where it is executing.
+ * If actor is sleeping, execute one iteration here, and if actor got new tasks,
+ * reschedule it in worker pool.
+ */
void ScheduleHereAtMostOnce() {
if (Tasks.AddTask()) {
bool fetched = Tasks.FetchTask();
Y_VERIFY(fetched, "happens");
-
+
DoAct();
-
+
// if someone added more tasks, schedule them
if (Tasks.FetchTask()) {
bool added = Tasks.AddTask();
Y_VERIFY(!added, "happens");
EnqueueWork();
}
- }
- }
-
+ }
+ }
+
void ScheduleHereV() override {
ScheduleHere();
}
@@ -110,35 +110,35 @@ namespace NActor {
void UnRefV() override {
GetThis()->UnRef();
}
-
+
private:
TThis* GetThis() {
return static_cast<TThis*>(this);
}
-
+
void EnqueueWork() {
GetThis()->Ref();
Executor->EnqueueWork({this});
}
-
+
void DoAct() {
WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
-
+
GetThis()->Act(TTag());
}
-
+
void Loop() {
// TODO: limit number of iterations
while (Tasks.FetchTask()) {
DoAct();
}
- }
-
+ }
+
void DoWork() override {
Y_ASSERT(GetThis()->RefCount() >= 1);
Loop();
GetThis()->UnRef();
}
};
-
+
}