blob: d0c23c94c4edb8132af20de193875d5be78c712b (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
#pragma once
#include "local_tasks.h"
#include <library/cpp/messagebus/actor/actor.h>
#include <library/cpp/messagebus/actor/what_thread_does_guard.h>
#include <library/cpp/messagebus/scheduler/scheduler.h>
#include <util/system/mutex.h>
namespace NBus {
namespace NPrivate {
template <typename TThis, typename TTag = NActor::TDefaultTag>
class TScheduleActor {
typedef NActor::TActor<TThis, TTag> TActorForMe;
private:
TScheduler* const Scheduler;
TMutex Mutex;
TInstant ScheduleTime;
public:
TLocalTasks Alarm;
private:
struct TScheduleItemImpl: public IScheduleItem {
TIntrusivePtr<TThis> Thiz;
TScheduleItemImpl(TIntrusivePtr<TThis> thiz, TInstant when)
: IScheduleItem(when)
, Thiz(thiz)
{
}
void Do() override {
{
TWhatThreadDoesAcquireGuard<TMutex> guard(Thiz->Mutex, "scheduler actor: acquiring lock for Do");
if (Thiz->ScheduleTime == TInstant::Max()) {
// was already fired
return;
}
Thiz->ScheduleTime = TInstant::Max();
}
Thiz->Alarm.AddTask();
Thiz->GetActorForMe()->Schedule();
}
};
public:
TScheduleActor(TScheduler* scheduler)
: Scheduler(scheduler)
, ScheduleTime(TInstant::Max())
{
}
/// call Act(TTag) at specified time, unless it is already scheduled at earlier time.
void ScheduleAt(TInstant when) {
TWhatThreadDoesAcquireGuard<TMutex> guard(Mutex, "scheduler: acquiring lock for ScheduleAt");
if (when > ScheduleTime) {
// already scheduled
return;
}
ScheduleTime = when;
Scheduler->Schedule(new TScheduleItemImpl(GetThis(), when));
}
private:
TThis* GetThis() {
return static_cast<TThis*>(this);
}
TActorForMe* GetActorForMe() {
return static_cast<TActorForMe*>(GetThis());
}
};
}
}
|