aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/actor/actor.h
blob: 9b8f20298a259b4870460a0df298b6b8768c3c98 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#pragma once

#include "executor.h"
#include "tasks.h"
#include "what_thread_does.h"

#include <util/system/yassert.h>

namespace NActor {
    class IActor: protected  IWorkItem {
    public:
        // TODO: make private
        TTasks Tasks;

    public:
        virtual void ScheduleHereV() = 0;
        virtual void ScheduleV() = 0;
        virtual void ScheduleHereAtMostOnceV() = 0;

        // TODO: make private
        virtual void RefV() = 0;
        virtual void UnRefV() = 0;

        // mute warnings
        ~IActor() override {
        }
    };

    struct TDefaultTag {};

    template <typename TThis, typename TTag = TDefaultTag>
    class TActor: public IActor {
    private:
        TExecutor* const Executor;

    public:
        TActor(TExecutor* executor)
            : Executor(executor)
        {
        }

        void AddTaskFromActorLoop() {
            bool schedule = Tasks.AddTask();
            // TODO: check thread id
            Y_ASSERT(!schedule);
        }

        /**
     * Schedule actor.
     *
     * If actor is sleeping, then actor will be executed right now.
     * If actor is executing right now, it will be executed one more time.
     * If this method is called multiple time, actor will be re-executed no more than one more time.
     */
        void Schedule() {
            if (Tasks.AddTask()) {
                EnqueueWork();
            }
        }

        /**
     * Schedule actor, execute it in current thread.
     *
     * If actor is running, continue executing where it is executing.
     * If actor is sleeping, execute it in current thread.
     *
     * Operation is useful for tasks that are likely to complete quickly.
     */
        void ScheduleHere() {
            if (Tasks.AddTask()) {
                Loop();
            }
        }

        /**
     * Schedule actor, execute in current thread no more than once.
     *
     * If actor is running, continue executing where it is executing.
     * If actor is sleeping, execute one iteration here, and if actor got new tasks,
     * reschedule it in worker pool.
     */
        void ScheduleHereAtMostOnce() {
            if (Tasks.AddTask()) {
                bool fetched = Tasks.FetchTask();
                Y_VERIFY(fetched, "happens");

                DoAct();

                // if someone added more tasks, schedule them
                if (Tasks.FetchTask()) {
                    bool added = Tasks.AddTask();
                    Y_VERIFY(!added, "happens");
                    EnqueueWork();
                }
            }
        }

        void ScheduleHereV() override {
            ScheduleHere();
        }
        void ScheduleV() override {
            Schedule();
        }
        void ScheduleHereAtMostOnceV() override {
            ScheduleHereAtMostOnce();
        }
        void RefV() override {
            GetThis()->Ref();
        }
        void UnRefV() override {
            GetThis()->UnRef();
        }

    private:
        TThis* GetThis() {
            return static_cast<TThis*>(this);
        }

        void EnqueueWork() {
            GetThis()->Ref();
            Executor->EnqueueWork({this});
        }

        void DoAct() {
            WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();

            GetThis()->Act(TTag());
        }

        void Loop() {
            // TODO: limit number of iterations
            while (Tasks.FetchTask()) {
                DoAct();
            }
        }

        void DoWork() override {
            Y_ASSERT(GetThis()->RefCount() >= 1);
            Loop();
            GetThis()->UnRef();
        }
    };

}