diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/actors/interconnect/load.cpp | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/load.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/load.cpp | 44 |
1 files changed, 22 insertions, 22 deletions
diff --git a/library/cpp/actors/interconnect/load.cpp b/library/cpp/actors/interconnect/load.cpp index 2a8443da71..f166ca0a99 100644 --- a/library/cpp/actors/interconnect/load.cpp +++ b/library/cpp/actors/interconnect/load.cpp @@ -34,9 +34,9 @@ namespace NInterconnect { CFunc(TEvents::TSystem::PoisonPill, Die); ) - void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) { ui64 bytes = ev->Get()->CalculateSerializedSizeCached(); - auto& record = ev->Get()->Record; + auto& record = ev->Get()->Record; auto *hops = record.MutableHops(); while (!hops->empty() && !hops->begin()->HasNextHop()) { record.ClearPayload(); @@ -81,7 +81,7 @@ namespace NInterconnect { CFunc(TEvents::TSystem::PoisonPill, Die); ) - void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) { ctx.ExecutorThread.ActorSystem->Send(ev->Forward(Slaves[SlaveIndex])); if (++SlaveIndex == Slaves.size()) { SlaveIndex = 0; @@ -107,7 +107,7 @@ namespace NInterconnect { TLoadResponderMasterActor() {} - void Bootstrap(const TActorContext& ctx) { + void Bootstrap(const TActorContext& ctx) { Become(&TLoadResponderMasterActor::StateFunc); while (Slaves.size() < 10) { Slaves.push_back(ctx.Register(new TLoadResponderActor(Traffic))); @@ -118,7 +118,7 @@ namespace NInterconnect { std::shared_ptr<std::atomic_uint64_t> Traffic = std::make_shared<std::atomic_uint64_t>(); }; - IActor* CreateLoadResponderActor() { + IActor* CreateLoadResponderActor() { return new TLoadResponderMasterActor(); } @@ -127,17 +127,17 @@ namespace NInterconnect { return TActorId(nodeId, TStringBuf(x, 12)); } - class TLoadActor: public TActorBootstrapped<TLoadActor> { + class TLoadActor: public TActorBootstrapped<TLoadActor> { struct TEvGenerateMessages : TEventLocal<TEvGenerateMessages, EvGenerateMessages> {}; struct TEvPublishResults : TEventLocal<TEvPublishResults, EvPublishResults> {}; struct TMessageInfo { TInstant SendTimestamp; - TMessageInfo(const TInstant& sendTimestamp) + TMessageInfo(const TInstant& sendTimestamp) : SendTimestamp(sendTimestamp) - { - } + { + } }; const TLoadParams Params; @@ -154,14 +154,14 @@ namespace NInterconnect { return IActor::INTERCONNECT_LOAD_ACTOR; } - TLoadActor(const TLoadParams& params) + TLoadActor(const TLoadParams& params) : Params(params) {} void Bootstrap(const TActorContext& ctx) { Become(&TLoadActor::QueryTrafficCounter); ctx.Send(MakeLoadResponderActorId(SelfId().NodeId()), new TEvQueryTrafficCounter); - } + } void Handle(TEvTrafficCounter::TPtr ev, const TActorContext& ctx) { Traffic = std::move(ev->Get()->Traffic); @@ -185,7 +185,7 @@ namespace NInterconnect { SchedulePublishResults(ctx); } - void GenerateMessages(const TActorContext& ctx) { + void GenerateMessages(const TActorContext& ctx) { while (InFly.size() < Params.InFlyMax && ctx.Now() >= NextMessageTimestamp) { // generate payload const ui32 size = Params.SizeMin + RandomNumber(Params.SizeMax - Params.SizeMin + 1); @@ -232,8 +232,8 @@ namespace NInterconnect { } } - void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record; + void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->Record; auto it = InFly.find(record.GetId()); if (it != InFly.end()) { // record message rtt @@ -294,18 +294,18 @@ namespace NInterconnect { TQueue<std::pair<TInstant, TString>> TimeoutQueue; - void PutTimeoutQueueItem(const TActorContext& ctx, TString id) { + void PutTimeoutQueueItem(const TActorContext& ctx, TString id) { TimeoutQueue.emplace(ctx.Now() + TDuration::Minutes(1), std::move(id)); if (TimeoutQueue.size() == 1) { ScheduleWakeup(ctx); } } - void ScheduleWakeup(const TActorContext& ctx) { + void ScheduleWakeup(const TActorContext& ctx) { ctx.Schedule(TimeoutQueue.front().first - ctx.Now(), new TEvents::TEvWakeup); } - void HandleWakeup(const TActorContext& ctx) { + void HandleWakeup(const TActorContext& ctx) { ui32 numDropped = 0; while (TimeoutQueue && TimeoutQueue.front().first <= ctx.Now()) { @@ -326,11 +326,11 @@ namespace NInterconnect { const TDuration ResultPublishPeriod = TDuration::Seconds(15); - void SchedulePublishResults(const TActorContext& ctx) { + void SchedulePublishResults(const TActorContext& ctx) { ctx.Schedule(ResultPublishPeriod, new TEvPublishResults); } - void PublishResults(const TActorContext& ctx, bool schedule = true) { + void PublishResults(const TActorContext& ctx, bool schedule = true) { const TInstant now = ctx.Now(); TStringStream msg; @@ -354,7 +354,7 @@ namespace NInterconnect { msg << "{window# " << duration << " samples# " << Histogram.size(); TVector<TDuration> v; v.reserve(Histogram.size()); - for (const auto& item : Histogram) { + for (const auto& item : Histogram) { v.push_back(item.second); } std::sort(v.begin(), v.end()); @@ -398,8 +398,8 @@ namespace NInterconnect { } }; - IActor* CreateLoadActor(const TLoadParams& params) { + IActor* CreateLoadActor(const TLoadParams& params) { return new TLoadActor(params); } -} +} |