aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/scheduler_actor.h
blob: 4176ea45a9fc738bd780475e39b8daefe77e65e4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#pragma once

#include "local_tasks.h"

#include <library/cpp/messagebus/actor/actor.h>
#include <library/cpp/messagebus/actor/what_thread_does_guard.h>
#include <library/cpp/messagebus/scheduler/scheduler.h>

#include <util/system/mutex.h>

namespace NBus { 
    namespace NPrivate { 
        template <typename TThis, typename TTag = NActor::TDefaultTag> 
        class TScheduleActor { 
            typedef NActor::TActor<TThis, TTag> TActorForMe; 

        private: 
            TScheduler* const Scheduler; 

            TMutex Mutex; 

            TInstant ScheduleTime; 

        public: 
            TLocalTasks Alarm; 

        private: 
            struct TScheduleItemImpl: public IScheduleItem { 
                TIntrusivePtr<TThis> Thiz; 

                TScheduleItemImpl(TIntrusivePtr<TThis> thiz, TInstant when) 
                    : IScheduleItem(when) 
                    , Thiz(thiz) 
                { 
                } 

                void Do() override { 
                    { 
                        TWhatThreadDoesAcquireGuard<TMutex> guard(Thiz->Mutex, "scheduler actor: acquiring lock for Do"); 

                        if (Thiz->ScheduleTime == TInstant::Max()) { 
                            // was already fired 
                            return; 
                        } 

                        Thiz->ScheduleTime = TInstant::Max(); 
                    } 

                    Thiz->Alarm.AddTask(); 
                    Thiz->GetActorForMe()->Schedule(); 
                } 
            }; 
 
        public: 
            TScheduleActor(TScheduler* scheduler) 
                : Scheduler(scheduler) 
                , ScheduleTime(TInstant::Max()) 
            {
            } 

            /// call Act(TTag) at specified time, unless it is already scheduled at earlier time. 
            void ScheduleAt(TInstant when) { 
                TWhatThreadDoesAcquireGuard<TMutex> guard(Mutex, "scheduler: acquiring lock for ScheduleAt"); 
 
                if (when > ScheduleTime) { 
                    // already scheduled 
                    return;
                }

                ScheduleTime = when; 
                Scheduler->Schedule(new TScheduleItemImpl(GetThis(), when)); 
            }

        private: 
            TThis* GetThis() { 
                return static_cast<TThis*>(this); 
            } 

            TActorForMe* GetActorForMe() { 
                return static_cast<TActorForMe*>(GetThis()); 
            } 
        }; 

    }
}