aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
blob: ae996830aee72a0a0fc79ea6cae9cfe50215d529 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#include <library/cpp/actors/interconnect/channel_scheduler.h>
#include <library/cpp/actors/interconnect/events_local.h>
#include <library/cpp/testing/unittest/registar.h>
 
using namespace NActors; 
 
Y_UNIT_TEST_SUITE(ChannelScheduler) { 
 
    Y_UNIT_TEST(PriorityTraffic) { 
        auto common = MakeIntrusive<TInterconnectProxyCommon>(); 
        common->MonCounters = MakeIntrusive<NMonitoring::TDynamicCounters>(); 
        std::shared_ptr<IInterconnectMetrics> ctr = CreateInterconnectCounters(common);
        ctr->SetPeerInfo("peer", "1");
        auto callback = [](THolder<IEventBase>) {}; 
        TEventHolderPool pool(common, callback); 
        TSessionParams p; 
        TChannelScheduler scheduler(1, {}, ctr, pool, 64 << 20, p);
 
        ui32 numEvents = 0; 
 
        auto pushEvent = [&](size_t size, int channel) { 
            TString payload(size, 'X'); 
            auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0);
            auto& ch = scheduler.GetOutputChannel(channel); 
            const bool wasWorking = ch.IsWorking(); 
            ch.Push(*ev); 
            if (!wasWorking) { 
                scheduler.AddToHeap(ch, 0); 
            } 
            ++numEvents; 
        }; 
 
        for (ui32 i = 0; i < 100; ++i) { 
            pushEvent(10000, 1); 
        } 
 
        for (ui32 i = 0; i < 1000; ++i) { 
            pushEvent(1000, 2); 
        } 
 
        std::map<ui16, ui32> run; 
        ui32 step = 0; 
 
        std::deque<std::map<ui16, ui32>> window; 
 
        for (; numEvents; ++step) { 
            TTcpPacketOutTask task(p); 
 
            if (step == 100) { 
                for (ui32 i = 0; i < 200; ++i) { 
                    pushEvent(1000, 3); 
                } 
            } 
 
            std::map<ui16, ui32> ch; 
 
            while (numEvents) { 
                TEventOutputChannel *channel = scheduler.PickChannelWithLeastConsumedWeight(); 
                ui32 before = task.GetDataSize(); 
                ui64 weightConsumed = 0; 
                numEvents -= channel->FeedBuf(task, 0, &weightConsumed); 
                ui32 after = task.GetDataSize(); 
                Y_VERIFY(after >= before); 
                scheduler.FinishPick(weightConsumed, 0); 
                const ui32 bytesAdded = after - before; 
                if (!bytesAdded) { 
                    break; 
                } 
                ch[channel->ChannelId] += bytesAdded; 
            } 
 
            scheduler.Equalize(); 
 
            for (const auto& [key, value] : ch) { 
                run[key] += value; 
            } 
            window.push_back(ch); 
 
            if (window.size() == 32) { 
                for (const auto& [key, value] : window.front()) { 
                    run[key] -= value; 
                    if (!run[key]) { 
                        run.erase(key); 
                    } 
                } 
                window.pop_front(); 
            } 
 
            double mean = 0.0; 
            for (const auto& [key, value] : run) { 
                mean += value; 
            } 
            mean /= run.size(); 
 
            double dev = 0.0; 
            for (const auto& [key, value] : run) { 
                dev += (value - mean) * (value - mean); 
            } 
            dev = sqrt(dev / run.size()); 
 
            double devToMean = dev / mean; 
 
            Cerr << step << ": "; 
            for (const auto& [key, value] : run) { 
                Cerr << "ch" << key << "=" << value << " "; 
            } 
            Cerr << "mean# " << mean << " dev# " << dev << " part# " << devToMean; 
 
            Cerr << Endl; 
 
            UNIT_ASSERT(devToMean < 1); 
        } 
    } 
 
}