1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/executor_pool_basic.h>
#include <library/cpp/actors/core/scheduler_basic.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/util/should_continue.h>
#include <util/system/sigset.h>
#include <util/generic/xrange.h>
using namespace NActors;
static TProgramShouldContinue ShouldContinue;
void OnTerminate(int) {
ShouldContinue.ShouldStop();
}
class TPingActor : public TActorBootstrapped<TPingActor> {
const TActorId Target;
ui64 HandledEvents;
TInstant PeriodStart;
void Handle(TEvents::TEvPing::TPtr &ev) {
Send(ev->Sender, new TEvents::TEvPong());
Send(ev->Sender, new TEvents::TEvPing());
Become(&TThis::StatePing);
}
void Handle(TEvents::TEvPong::TPtr &ev) {
Y_UNUSED(ev);
Become(&TThis::StateWait);
}
void PrintStats() {
const i64 ms = (TInstant::Now() - PeriodStart).MilliSeconds();
Cout << "Handled " << 2 * HandledEvents << " over " << ms << "ms" << Endl;
ScheduleStats();
}
void ScheduleStats() {
HandledEvents = 0;
PeriodStart = TInstant::Now();
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup());
}
public:
TPingActor(TActorId target)
: Target(target)
, HandledEvents(0)
, PeriodStart(TInstant::Now())
{}
STFUNC(StateWait) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvPing, Handle);
sFunc(TEvents::TEvWakeup, PrintStats);
}
++HandledEvents;
}
STFUNC(StatePing) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvPong, Handle);
sFunc(TEvents::TEvWakeup, PrintStats);
}
++HandledEvents;
}
void Bootstrap() {
if (Target) {
Become(&TThis::StatePing);
Send(Target, new TEvents::TEvPing());
ScheduleStats();
}
else {
Become(&TThis::StateWait);
};
}
};
THolder<TActorSystemSetup> BuildActorSystemSetup(ui32 threads, ui32 pools) {
Y_ABORT_UNLESS(threads > 0 && threads < 100);
Y_ABORT_UNLESS(pools > 0 && pools < 10);
auto setup = MakeHolder<TActorSystemSetup>();
setup->NodeId = 1;
setup->ExecutorsCount = pools;
setup->Executors.Reset(new TAutoPtr<IExecutorPool>[pools]);
for (ui32 idx : xrange(pools)) {
setup->Executors[idx] = new TBasicExecutorPool(idx, threads, 50);
}
setup->Scheduler = new TBasicSchedulerThread(TSchedulerConfig(512, 0));
return setup;
}
int main(int argc, char **argv) {
Y_UNUSED(argc);
Y_UNUSED(argv);
#ifdef _unix_
signal(SIGPIPE, SIG_IGN);
#endif
signal(SIGINT, &OnTerminate);
signal(SIGTERM, &OnTerminate);
THolder<TActorSystemSetup> actorSystemSetup = BuildActorSystemSetup(2, 1);
TActorSystem actorSystem(actorSystemSetup);
actorSystem.Start();
const TActorId a = actorSystem.Register(new TPingActor(TActorId()));
const TActorId b = actorSystem.Register(new TPingActor(a));
Y_UNUSED(b);
while (ShouldContinue.PollState() == TProgramShouldContinue::Continue) {
Sleep(TDuration::MilliSeconds(200));
}
actorSystem.Stop();
actorSystem.Cleanup();
return ShouldContinue.GetReturnCode();
}
|