diff options
author | Alexander Rutkovsky <alexvru@mail.ru> | 2022-02-10 16:47:39 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:39 +0300 |
commit | f3646f91e0de459836a7800b9ce3e8dc57a2ab3a (patch) | |
tree | 25c1423200152570c1f8307e5b8304b9bc3840c5 /library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp | |
parent | fccc62e9bfdce9be2fe7e0f23479da3a5512211a (diff) | |
download | ydb-f3646f91e0de459836a7800b9ce3e8dc57a2ab3a.tar.gz |
Restoring authorship annotation for Alexander Rutkovsky <alexvru@mail.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp | 216 |
1 files changed, 108 insertions, 108 deletions
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index 565a511859..ae996830ae 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -1,115 +1,115 @@ #include <library/cpp/actors/interconnect/channel_scheduler.h> #include <library/cpp/actors/interconnect/events_local.h> #include <library/cpp/testing/unittest/registar.h> - -using namespace NActors; - -Y_UNIT_TEST_SUITE(ChannelScheduler) { - - Y_UNIT_TEST(PriorityTraffic) { - auto common = MakeIntrusive<TInterconnectProxyCommon>(); - common->MonCounters = MakeIntrusive<NMonitoring::TDynamicCounters>(); + +using namespace NActors; + +Y_UNIT_TEST_SUITE(ChannelScheduler) { + + Y_UNIT_TEST(PriorityTraffic) { + auto common = MakeIntrusive<TInterconnectProxyCommon>(); + common->MonCounters = MakeIntrusive<NMonitoring::TDynamicCounters>(); std::shared_ptr<IInterconnectMetrics> ctr = CreateInterconnectCounters(common); ctr->SetPeerInfo("peer", "1"); - auto callback = [](THolder<IEventBase>) {}; - TEventHolderPool pool(common, callback); - TSessionParams p; + auto callback = [](THolder<IEventBase>) {}; + TEventHolderPool pool(common, callback); + TSessionParams p; TChannelScheduler scheduler(1, {}, ctr, pool, 64 << 20, p); - - ui32 numEvents = 0; - - auto pushEvent = [&](size_t size, int channel) { - TString payload(size, 'X'); + + ui32 numEvents = 0; + + auto pushEvent = [&](size_t size, int channel) { + TString payload(size, 'X'); auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0); - auto& ch = scheduler.GetOutputChannel(channel); - const bool wasWorking = ch.IsWorking(); - ch.Push(*ev); - if (!wasWorking) { - scheduler.AddToHeap(ch, 0); - } - ++numEvents; - }; - - for (ui32 i = 0; i < 100; ++i) { - pushEvent(10000, 1); - } - - for (ui32 i = 0; i < 1000; ++i) { - pushEvent(1000, 2); - } - - std::map<ui16, ui32> run; - ui32 step = 0; - - std::deque<std::map<ui16, ui32>> window; - - for (; numEvents; ++step) { - TTcpPacketOutTask task(p); - - if (step == 100) { - for (ui32 i = 0; i < 200; ++i) { - pushEvent(1000, 3); - } - } - - std::map<ui16, ui32> ch; - - while (numEvents) { - TEventOutputChannel *channel = scheduler.PickChannelWithLeastConsumedWeight(); - ui32 before = task.GetDataSize(); - ui64 weightConsumed = 0; - numEvents -= channel->FeedBuf(task, 0, &weightConsumed); - ui32 after = task.GetDataSize(); - Y_VERIFY(after >= before); - scheduler.FinishPick(weightConsumed, 0); - const ui32 bytesAdded = after - before; - if (!bytesAdded) { - break; - } - ch[channel->ChannelId] += bytesAdded; - } - - scheduler.Equalize(); - - for (const auto& [key, value] : ch) { - run[key] += value; - } - window.push_back(ch); - - if (window.size() == 32) { - for (const auto& [key, value] : window.front()) { - run[key] -= value; - if (!run[key]) { - run.erase(key); - } - } - window.pop_front(); - } - - double mean = 0.0; - for (const auto& [key, value] : run) { - mean += value; - } - mean /= run.size(); - - double dev = 0.0; - for (const auto& [key, value] : run) { - dev += (value - mean) * (value - mean); - } - dev = sqrt(dev / run.size()); - - double devToMean = dev / mean; - - Cerr << step << ": "; - for (const auto& [key, value] : run) { - Cerr << "ch" << key << "=" << value << " "; - } - Cerr << "mean# " << mean << " dev# " << dev << " part# " << devToMean; - - Cerr << Endl; - - UNIT_ASSERT(devToMean < 1); - } - } - -} + auto& ch = scheduler.GetOutputChannel(channel); + const bool wasWorking = ch.IsWorking(); + ch.Push(*ev); + if (!wasWorking) { + scheduler.AddToHeap(ch, 0); + } + ++numEvents; + }; + + for (ui32 i = 0; i < 100; ++i) { + pushEvent(10000, 1); + } + + for (ui32 i = 0; i < 1000; ++i) { + pushEvent(1000, 2); + } + + std::map<ui16, ui32> run; + ui32 step = 0; + + std::deque<std::map<ui16, ui32>> window; + + for (; numEvents; ++step) { + TTcpPacketOutTask task(p); + + if (step == 100) { + for (ui32 i = 0; i < 200; ++i) { + pushEvent(1000, 3); + } + } + + std::map<ui16, ui32> ch; + + while (numEvents) { + TEventOutputChannel *channel = scheduler.PickChannelWithLeastConsumedWeight(); + ui32 before = task.GetDataSize(); + ui64 weightConsumed = 0; + numEvents -= channel->FeedBuf(task, 0, &weightConsumed); + ui32 after = task.GetDataSize(); + Y_VERIFY(after >= before); + scheduler.FinishPick(weightConsumed, 0); + const ui32 bytesAdded = after - before; + if (!bytesAdded) { + break; + } + ch[channel->ChannelId] += bytesAdded; + } + + scheduler.Equalize(); + + for (const auto& [key, value] : ch) { + run[key] += value; + } + window.push_back(ch); + + if (window.size() == 32) { + for (const auto& [key, value] : window.front()) { + run[key] -= value; + if (!run[key]) { + run.erase(key); + } + } + window.pop_front(); + } + + double mean = 0.0; + for (const auto& [key, value] : run) { + mean += value; + } + mean /= run.size(); + + double dev = 0.0; + for (const auto& [key, value] : run) { + dev += (value - mean) * (value - mean); + } + dev = sqrt(dev / run.size()); + + double devToMean = dev / mean; + + Cerr << step << ": "; + for (const auto& [key, value] : run) { + Cerr << "ch" << key << "=" << value << " "; + } + Cerr << "mean# " << mean << " dev# " << dev << " part# " << devToMean; + + Cerr << Endl; + + UNIT_ASSERT(devToMean < 1); + } + } + +} |