aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/scheduler_actor.h
blob: d0c23c94c4edb8132af20de193875d5be78c712b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#pragma once

#include "local_tasks.h"

#include <library/cpp/messagebus/actor/actor.h>
#include <library/cpp/messagebus/actor/what_thread_does_guard.h>
#include <library/cpp/messagebus/scheduler/scheduler.h>

#include <util/system/mutex.h>

namespace NBus {
    namespace NPrivate {
        template <typename TThis, typename TTag = NActor::TDefaultTag>
        class TScheduleActor {
            typedef NActor::TActor<TThis, TTag> TActorForMe;

        private:
            TScheduler* const Scheduler;

            TMutex Mutex;

            TInstant ScheduleTime;

        public:
            TLocalTasks Alarm;

        private:
            struct TScheduleItemImpl: public IScheduleItem {
                TIntrusivePtr<TThis> Thiz;

                TScheduleItemImpl(TIntrusivePtr<TThis> thiz, TInstant when)
                    : IScheduleItem(when)
                    , Thiz(thiz)
                {
                }

                void Do() override {
                    {
                        TWhatThreadDoesAcquireGuard<TMutex> guard(Thiz->Mutex, "scheduler actor: acquiring lock for Do");

                        if (Thiz->ScheduleTime == TInstant::Max()) {
                            // was already fired
                            return;
                        }

                        Thiz->ScheduleTime = TInstant::Max();
                    }

                    Thiz->Alarm.AddTask();
                    Thiz->GetActorForMe()->Schedule();
                }
            };

        public:
            TScheduleActor(TScheduler* scheduler)
                : Scheduler(scheduler)
                , ScheduleTime(TInstant::Max())
            {
            }

            /// call Act(TTag) at specified time, unless it is already scheduled at earlier time.
            void ScheduleAt(TInstant when) {
                TWhatThreadDoesAcquireGuard<TMutex> guard(Mutex, "scheduler: acquiring lock for ScheduleAt");

                if (when > ScheduleTime) {
                    // already scheduled
                    return;
                }

                ScheduleTime = when;
                Scheduler->Schedule(new TScheduleItemImpl(GetThis(), when));
            }

        private:
            TThis* GetThis() {
                return static_cast<TThis*>(this);
            }

            TActorForMe* GetActorForMe() {
                return static_cast<TActorForMe*>(GetThis());
            }
        };

    }
}