aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/ut/sticking_ut.cpp
blob: 2fa3d0933e87546733d53a482f1e54f9a2b7e63c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#include <library/cpp/actors/interconnect/ut/lib/node.h>
#include <library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h>
#include <library/cpp/testing/unittest/registar.h>

using namespace NActors;

struct TEvPing : TEventBase<TEvPing, TEvents::THelloWorld::Ping> {
    TString Data;

    TEvPing(TString data)
        : Data(data)
    {}

    TEvPing() = default;

    ui32 CalculateSerializedSize() const override { return Data.size(); }
    bool IsSerializable() const override { return true; }
    bool SerializeToArcadiaStream(TChunkSerializer *serializer) const override { serializer->WriteAliasedRaw(Data.data(), Data.size()); return true; }
    TString ToStringHeader() const override { return {}; }
};

class TPonger : public TActor<TPonger> {
public:
    TPonger()
        : TActor(&TThis::StateFunc)
    {}

    void Handle(TEvPing::TPtr ev) {
        Send(ev->Sender, new TEvents::TEvPong(), 0, ev->Cookie);
    }

    STRICT_STFUNC(StateFunc,
        hFunc(TEvPing, Handle);
    )
};

class TPinger : public TActorBootstrapped<TPinger> {
    ui32 PingInFlight = 0;
    TActorId PongerId;
    TDuration MaxRTT;

public:
    TPinger(TActorId pongerId)
        : PongerId(pongerId)
    {}

    void Bootstrap() {
        Become(&TThis::StateFunc);
        Action();
    }

    void Action() {
        if (PingInFlight) {
            return;
        }
        const ui32 max = 1 + RandomNumber(10u);
        while (PingInFlight < max) {
            IssuePing();
        }
    }

    void IssuePing() {
        TString data = TString::Uninitialized(RandomNumber<size_t>(256 * 1024) + 1);
        memset(data.Detach(), 0, data.size());
        Send(PongerId, new TEvPing(data), 0, GetCycleCountFast());
        ++PingInFlight;
    }

    void Handle(TEvents::TEvPong::TPtr ev) {
        const TDuration rtt = CyclesToDuration(GetCycleCountFast() - ev->Cookie);
        if (MaxRTT < rtt) {
            MaxRTT = rtt;
            Cerr << "Updated MaxRTT# " << MaxRTT << Endl;
            Y_ABORT_UNLESS(MaxRTT <= TDuration::MilliSeconds(500));
        }
        --PingInFlight;
        Action();
    }

    STRICT_STFUNC(StateFunc,
        hFunc(TEvents::TEvPong, Handle);
    )
};

Y_UNIT_TEST_SUITE(Sticking) {
    Y_UNIT_TEST(Check) {
        TPortManager portman;
        THashMap<ui32, ui16> nodeToPort;
        nodeToPort.emplace(1, portman.GetPort());
        nodeToPort.emplace(2, portman.GetPort());

        NMonitoring::TDynamicCounterPtr counters = new NMonitoring::TDynamicCounters;
        std::list<TNode> nodes;
        for (auto [nodeId, _] : nodeToPort) {
            nodes.emplace_back(nodeId, nodeToPort.size(), nodeToPort, "127.1.0.0",
                counters->GetSubgroup("nodeId", TStringBuilder() << nodeId), TDuration::Seconds(10),
                TChannelsConfig(), 0, 1, nullptr, 40 << 20);
        }

        auto& node1 = *nodes.begin();
        auto& node2 = *++nodes.begin();

        const TActorId ponger = node2.RegisterActor(new TPonger());
        node1.RegisterActor(new TPinger(ponger));

        Sleep(TDuration::Seconds(10));
    }
}