aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@mail.ru>2022-02-10 16:47:39 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:39 +0300
commitf3646f91e0de459836a7800b9ce3e8dc57a2ab3a (patch)
tree25c1423200152570c1f8307e5b8304b9bc3840c5 /library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
parentfccc62e9bfdce9be2fe7e0f23479da3a5512211a (diff)
downloadydb-f3646f91e0de459836a7800b9ce3e8dc57a2ab3a.tar.gz
Restoring authorship annotation for Alexander Rutkovsky <alexvru@mail.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp')
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp216
1 files changed, 108 insertions, 108 deletions
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
index 565a511859..ae996830ae 100644
--- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -1,115 +1,115 @@
#include <library/cpp/actors/interconnect/channel_scheduler.h>
#include <library/cpp/actors/interconnect/events_local.h>
#include <library/cpp/testing/unittest/registar.h>
-
-using namespace NActors;
-
-Y_UNIT_TEST_SUITE(ChannelScheduler) {
-
- Y_UNIT_TEST(PriorityTraffic) {
- auto common = MakeIntrusive<TInterconnectProxyCommon>();
- common->MonCounters = MakeIntrusive<NMonitoring::TDynamicCounters>();
+
+using namespace NActors;
+
+Y_UNIT_TEST_SUITE(ChannelScheduler) {
+
+ Y_UNIT_TEST(PriorityTraffic) {
+ auto common = MakeIntrusive<TInterconnectProxyCommon>();
+ common->MonCounters = MakeIntrusive<NMonitoring::TDynamicCounters>();
std::shared_ptr<IInterconnectMetrics> ctr = CreateInterconnectCounters(common);
ctr->SetPeerInfo("peer", "1");
- auto callback = [](THolder<IEventBase>) {};
- TEventHolderPool pool(common, callback);
- TSessionParams p;
+ auto callback = [](THolder<IEventBase>) {};
+ TEventHolderPool pool(common, callback);
+ TSessionParams p;
TChannelScheduler scheduler(1, {}, ctr, pool, 64 << 20, p);
-
- ui32 numEvents = 0;
-
- auto pushEvent = [&](size_t size, int channel) {
- TString payload(size, 'X');
+
+ ui32 numEvents = 0;
+
+ auto pushEvent = [&](size_t size, int channel) {
+ TString payload(size, 'X');
auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0);
- auto& ch = scheduler.GetOutputChannel(channel);
- const bool wasWorking = ch.IsWorking();
- ch.Push(*ev);
- if (!wasWorking) {
- scheduler.AddToHeap(ch, 0);
- }
- ++numEvents;
- };
-
- for (ui32 i = 0; i < 100; ++i) {
- pushEvent(10000, 1);
- }
-
- for (ui32 i = 0; i < 1000; ++i) {
- pushEvent(1000, 2);
- }
-
- std::map<ui16, ui32> run;
- ui32 step = 0;
-
- std::deque<std::map<ui16, ui32>> window;
-
- for (; numEvents; ++step) {
- TTcpPacketOutTask task(p);
-
- if (step == 100) {
- for (ui32 i = 0; i < 200; ++i) {
- pushEvent(1000, 3);
- }
- }
-
- std::map<ui16, ui32> ch;
-
- while (numEvents) {
- TEventOutputChannel *channel = scheduler.PickChannelWithLeastConsumedWeight();
- ui32 before = task.GetDataSize();
- ui64 weightConsumed = 0;
- numEvents -= channel->FeedBuf(task, 0, &weightConsumed);
- ui32 after = task.GetDataSize();
- Y_VERIFY(after >= before);
- scheduler.FinishPick(weightConsumed, 0);
- const ui32 bytesAdded = after - before;
- if (!bytesAdded) {
- break;
- }
- ch[channel->ChannelId] += bytesAdded;
- }
-
- scheduler.Equalize();
-
- for (const auto& [key, value] : ch) {
- run[key] += value;
- }
- window.push_back(ch);
-
- if (window.size() == 32) {
- for (const auto& [key, value] : window.front()) {
- run[key] -= value;
- if (!run[key]) {
- run.erase(key);
- }
- }
- window.pop_front();
- }
-
- double mean = 0.0;
- for (const auto& [key, value] : run) {
- mean += value;
- }
- mean /= run.size();
-
- double dev = 0.0;
- for (const auto& [key, value] : run) {
- dev += (value - mean) * (value - mean);
- }
- dev = sqrt(dev / run.size());
-
- double devToMean = dev / mean;
-
- Cerr << step << ": ";
- for (const auto& [key, value] : run) {
- Cerr << "ch" << key << "=" << value << " ";
- }
- Cerr << "mean# " << mean << " dev# " << dev << " part# " << devToMean;
-
- Cerr << Endl;
-
- UNIT_ASSERT(devToMean < 1);
- }
- }
-
-}
+ auto& ch = scheduler.GetOutputChannel(channel);
+ const bool wasWorking = ch.IsWorking();
+ ch.Push(*ev);
+ if (!wasWorking) {
+ scheduler.AddToHeap(ch, 0);
+ }
+ ++numEvents;
+ };
+
+ for (ui32 i = 0; i < 100; ++i) {
+ pushEvent(10000, 1);
+ }
+
+ for (ui32 i = 0; i < 1000; ++i) {
+ pushEvent(1000, 2);
+ }
+
+ std::map<ui16, ui32> run;
+ ui32 step = 0;
+
+ std::deque<std::map<ui16, ui32>> window;
+
+ for (; numEvents; ++step) {
+ TTcpPacketOutTask task(p);
+
+ if (step == 100) {
+ for (ui32 i = 0; i < 200; ++i) {
+ pushEvent(1000, 3);
+ }
+ }
+
+ std::map<ui16, ui32> ch;
+
+ while (numEvents) {
+ TEventOutputChannel *channel = scheduler.PickChannelWithLeastConsumedWeight();
+ ui32 before = task.GetDataSize();
+ ui64 weightConsumed = 0;
+ numEvents -= channel->FeedBuf(task, 0, &weightConsumed);
+ ui32 after = task.GetDataSize();
+ Y_VERIFY(after >= before);
+ scheduler.FinishPick(weightConsumed, 0);
+ const ui32 bytesAdded = after - before;
+ if (!bytesAdded) {
+ break;
+ }
+ ch[channel->ChannelId] += bytesAdded;
+ }
+
+ scheduler.Equalize();
+
+ for (const auto& [key, value] : ch) {
+ run[key] += value;
+ }
+ window.push_back(ch);
+
+ if (window.size() == 32) {
+ for (const auto& [key, value] : window.front()) {
+ run[key] -= value;
+ if (!run[key]) {
+ run.erase(key);
+ }
+ }
+ window.pop_front();
+ }
+
+ double mean = 0.0;
+ for (const auto& [key, value] : run) {
+ mean += value;
+ }
+ mean /= run.size();
+
+ double dev = 0.0;
+ for (const auto& [key, value] : run) {
+ dev += (value - mean) * (value - mean);
+ }
+ dev = sqrt(dev / run.size());
+
+ double devToMean = dev / mean;
+
+ Cerr << step << ": ";
+ for (const auto& [key, value] : run) {
+ Cerr << "ch" << key << "=" << value << " ";
+ }
+ Cerr << "mean# " << mean << " dev# " << dev << " part# " << devToMean;
+
+ Cerr << Endl;
+
+ UNIT_ASSERT(devToMean < 1);
+ }
+ }
+
+}