aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/examples/01_ping_pong/main.cpp
blob: 437f06eadd6ced301b72191e1018a75626bf67ee (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/executor_pool_basic.h>
#include <library/cpp/actors/core/scheduler_basic.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/util/should_continue.h>
#include <util/system/sigset.h>
#include <util/generic/xrange.h>

using namespace NActors;

static TProgramShouldContinue ShouldContinue;

void OnTerminate(int) {
    ShouldContinue.ShouldStop();
}

class TPingActor : public TActorBootstrapped<TPingActor> {
    const TActorId Target;
    ui64 HandledEvents;
    TInstant PeriodStart;

    void Handle(TEvents::TEvPing::TPtr &ev) {
        Send(ev->Sender, new TEvents::TEvPong());
        Send(ev->Sender, new TEvents::TEvPing());
        Become(&TThis::StatePing);
    }

    void Handle(TEvents::TEvPong::TPtr &ev) {
        Y_UNUSED(ev);
        Become(&TThis::StateWait);
    }

    void PrintStats() {
        const i64 ms = (TInstant::Now() - PeriodStart).MilliSeconds();
        Cout << "Handled " << 2 * HandledEvents << " over " << ms << "ms" << Endl;
        ScheduleStats();
    }

    void ScheduleStats() {
        HandledEvents = 0;
        PeriodStart = TInstant::Now();
        Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup());
    }

public:
    TPingActor(TActorId target)
        : Target(target)
        , HandledEvents(0)
        , PeriodStart(TInstant::Now())
    {}

    STFUNC(StateWait) {
        switch (ev->GetTypeRewrite()) {
            hFunc(TEvents::TEvPing, Handle);
            sFunc(TEvents::TEvWakeup, PrintStats);
        }

        ++HandledEvents;
    }

    STFUNC(StatePing) {
        switch (ev->GetTypeRewrite()) {
            hFunc(TEvents::TEvPong, Handle);
            sFunc(TEvents::TEvWakeup, PrintStats);
        }

        ++HandledEvents;
    }

    void Bootstrap() {
        if (Target) {
            Become(&TThis::StatePing);
            Send(Target, new TEvents::TEvPing());
            ScheduleStats();
        }
        else {
            Become(&TThis::StateWait);
        };
    }
};

THolder<TActorSystemSetup> BuildActorSystemSetup(ui32 threads, ui32 pools) {
    Y_ABORT_UNLESS(threads > 0 && threads < 100);
    Y_ABORT_UNLESS(pools > 0 && pools < 10);

    auto setup = MakeHolder<TActorSystemSetup>();

    setup->NodeId = 1;

    setup->ExecutorsCount = pools;
    setup->Executors.Reset(new TAutoPtr<IExecutorPool>[pools]);
    for (ui32 idx : xrange(pools)) {
        setup->Executors[idx] = new TBasicExecutorPool(idx, threads, 50);
    }

    setup->Scheduler = new TBasicSchedulerThread(TSchedulerConfig(512, 0));

    return setup;
}

int main(int argc, char **argv) {
    Y_UNUSED(argc);
    Y_UNUSED(argv);

#ifdef _unix_
    signal(SIGPIPE, SIG_IGN);
#endif
    signal(SIGINT, &OnTerminate);
    signal(SIGTERM, &OnTerminate);

    THolder<TActorSystemSetup> actorSystemSetup = BuildActorSystemSetup(2, 1);
    TActorSystem actorSystem(actorSystemSetup);

    actorSystem.Start();

    const TActorId a = actorSystem.Register(new TPingActor(TActorId()));
    const TActorId b = actorSystem.Register(new TPingActor(a));
    Y_UNUSED(b);

    while (ShouldContinue.PollState() == TProgramShouldContinue::Continue) {
        Sleep(TDuration::MilliSeconds(200));
    }

    actorSystem.Stop();
    actorSystem.Cleanup();

    return ShouldContinue.GetReturnCode();
}