aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
blob: 98e81d7781f2f0ea06f44205db1583acb80939a8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include <library/cpp/actors/interconnect/channel_scheduler.h>
#include <library/cpp/actors/interconnect/events_local.h>
#include <library/cpp/testing/unittest/registar.h>

using namespace NActors;

Y_UNIT_TEST_SUITE(ChannelScheduler) {

    Y_UNIT_TEST(PriorityTraffic) {
        auto common = MakeIntrusive<TInterconnectProxyCommon>();
        common->MonCounters = MakeIntrusive<NMonitoring::TDynamicCounters>();
        std::shared_ptr<IInterconnectMetrics> ctr = CreateInterconnectCounters(common);
        ctr->SetPeerInfo("peer", "1");
        auto callback = [](THolder<IEventBase>) {};
        TEventHolderPool pool(common, callback);
        TSessionParams p;
        TChannelScheduler scheduler(1, {}, ctr, pool, 64 << 20, p);

        ui32 numEvents = 0;

        auto pushEvent = [&](size_t size, int channel) {
            TString payload(size, 'X');
            auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0);
            auto& ch = scheduler.GetOutputChannel(channel);
            const bool wasWorking = ch.IsWorking();
            ch.Push(*ev);
            if (!wasWorking) {
                scheduler.AddToHeap(ch, 0);
            }
            ++numEvents;
        };

        for (ui32 i = 0; i < 100; ++i) {
            pushEvent(10000, 1);
        }

        for (ui32 i = 0; i < 1000; ++i) {
            pushEvent(1000, 2);
        }

        std::map<ui16, ui32> run;
        ui32 step = 0;

        std::deque<std::map<ui16, ui32>> window;

        NInterconnect::TOutgoingStream stream;

        for (; numEvents; ++step) {
            TTcpPacketOutTask task(p, stream, stream);

            if (step == 100) {
                for (ui32 i = 0; i < 200; ++i) {
                    pushEvent(1000, 3);
                }
            }

            std::map<ui16, ui32> ch;

            while (numEvents) {
                TEventOutputChannel *channel = scheduler.PickChannelWithLeastConsumedWeight();
                ui32 before = task.GetDataSize();
                ui64 weightConsumed = 0;
                numEvents -= channel->FeedBuf(task, 0, &weightConsumed);
                ui32 after = task.GetDataSize();
                Y_ABORT_UNLESS(after >= before);
                scheduler.FinishPick(weightConsumed, 0);
                const ui32 bytesAdded = after - before;
                if (!bytesAdded) {
                    break;
                }
                ch[channel->ChannelId] += bytesAdded;
            }

            scheduler.Equalize();

            for (const auto& [key, value] : ch) {
                run[key] += value;
            }
            window.push_back(ch);

            if (window.size() == 32) {
                for (const auto& [key, value] : window.front()) {
                    run[key] -= value;
                    if (!run[key]) {
                        run.erase(key);
                    }
                }
                window.pop_front();
            }

            double mean = 0.0;
            for (const auto& [key, value] : run) {
                mean += value;
            }
            mean /= run.size();

            double dev = 0.0;
            for (const auto& [key, value] : run) {
                dev += (value - mean) * (value - mean);
            }
            dev = sqrt(dev / run.size());

            double devToMean = dev / mean;

            Cerr << step << ": ";
            for (const auto& [key, value] : run) {
                Cerr << "ch" << key << "=" << value << " ";
            }
            Cerr << "mean# " << mean << " dev# " << dev << " part# " << devToMean;

            Cerr << Endl;

            UNIT_ASSERT(devToMean < 1);
        }
    }

}