aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/load.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/actors/interconnect/load.cpp
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/load.cpp')
-rw-r--r--library/cpp/actors/interconnect/load.cpp44
1 files changed, 22 insertions, 22 deletions
diff --git a/library/cpp/actors/interconnect/load.cpp b/library/cpp/actors/interconnect/load.cpp
index 2a8443da71..f166ca0a99 100644
--- a/library/cpp/actors/interconnect/load.cpp
+++ b/library/cpp/actors/interconnect/load.cpp
@@ -34,9 +34,9 @@ namespace NInterconnect {
CFunc(TEvents::TSystem::PoisonPill, Die);
)
- void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) {
ui64 bytes = ev->Get()->CalculateSerializedSizeCached();
- auto& record = ev->Get()->Record;
+ auto& record = ev->Get()->Record;
auto *hops = record.MutableHops();
while (!hops->empty() && !hops->begin()->HasNextHop()) {
record.ClearPayload();
@@ -81,7 +81,7 @@ namespace NInterconnect {
CFunc(TEvents::TSystem::PoisonPill, Die);
)
- void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) {
ctx.ExecutorThread.ActorSystem->Send(ev->Forward(Slaves[SlaveIndex]));
if (++SlaveIndex == Slaves.size()) {
SlaveIndex = 0;
@@ -107,7 +107,7 @@ namespace NInterconnect {
TLoadResponderMasterActor()
{}
- void Bootstrap(const TActorContext& ctx) {
+ void Bootstrap(const TActorContext& ctx) {
Become(&TLoadResponderMasterActor::StateFunc);
while (Slaves.size() < 10) {
Slaves.push_back(ctx.Register(new TLoadResponderActor(Traffic)));
@@ -118,7 +118,7 @@ namespace NInterconnect {
std::shared_ptr<std::atomic_uint64_t> Traffic = std::make_shared<std::atomic_uint64_t>();
};
- IActor* CreateLoadResponderActor() {
+ IActor* CreateLoadResponderActor() {
return new TLoadResponderMasterActor();
}
@@ -127,17 +127,17 @@ namespace NInterconnect {
return TActorId(nodeId, TStringBuf(x, 12));
}
- class TLoadActor: public TActorBootstrapped<TLoadActor> {
+ class TLoadActor: public TActorBootstrapped<TLoadActor> {
struct TEvGenerateMessages : TEventLocal<TEvGenerateMessages, EvGenerateMessages> {};
struct TEvPublishResults : TEventLocal<TEvPublishResults, EvPublishResults> {};
struct TMessageInfo {
TInstant SendTimestamp;
- TMessageInfo(const TInstant& sendTimestamp)
+ TMessageInfo(const TInstant& sendTimestamp)
: SendTimestamp(sendTimestamp)
- {
- }
+ {
+ }
};
const TLoadParams Params;
@@ -154,14 +154,14 @@ namespace NInterconnect {
return IActor::INTERCONNECT_LOAD_ACTOR;
}
- TLoadActor(const TLoadParams& params)
+ TLoadActor(const TLoadParams& params)
: Params(params)
{}
void Bootstrap(const TActorContext& ctx) {
Become(&TLoadActor::QueryTrafficCounter);
ctx.Send(MakeLoadResponderActorId(SelfId().NodeId()), new TEvQueryTrafficCounter);
- }
+ }
void Handle(TEvTrafficCounter::TPtr ev, const TActorContext& ctx) {
Traffic = std::move(ev->Get()->Traffic);
@@ -185,7 +185,7 @@ namespace NInterconnect {
SchedulePublishResults(ctx);
}
- void GenerateMessages(const TActorContext& ctx) {
+ void GenerateMessages(const TActorContext& ctx) {
while (InFly.size() < Params.InFlyMax && ctx.Now() >= NextMessageTimestamp) {
// generate payload
const ui32 size = Params.SizeMin + RandomNumber(Params.SizeMax - Params.SizeMin + 1);
@@ -232,8 +232,8 @@ namespace NInterconnect {
}
}
- void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) {
- const auto& record = ev->Get()->Record;
+ void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) {
+ const auto& record = ev->Get()->Record;
auto it = InFly.find(record.GetId());
if (it != InFly.end()) {
// record message rtt
@@ -294,18 +294,18 @@ namespace NInterconnect {
TQueue<std::pair<TInstant, TString>> TimeoutQueue;
- void PutTimeoutQueueItem(const TActorContext& ctx, TString id) {
+ void PutTimeoutQueueItem(const TActorContext& ctx, TString id) {
TimeoutQueue.emplace(ctx.Now() + TDuration::Minutes(1), std::move(id));
if (TimeoutQueue.size() == 1) {
ScheduleWakeup(ctx);
}
}
- void ScheduleWakeup(const TActorContext& ctx) {
+ void ScheduleWakeup(const TActorContext& ctx) {
ctx.Schedule(TimeoutQueue.front().first - ctx.Now(), new TEvents::TEvWakeup);
}
- void HandleWakeup(const TActorContext& ctx) {
+ void HandleWakeup(const TActorContext& ctx) {
ui32 numDropped = 0;
while (TimeoutQueue && TimeoutQueue.front().first <= ctx.Now()) {
@@ -326,11 +326,11 @@ namespace NInterconnect {
const TDuration ResultPublishPeriod = TDuration::Seconds(15);
- void SchedulePublishResults(const TActorContext& ctx) {
+ void SchedulePublishResults(const TActorContext& ctx) {
ctx.Schedule(ResultPublishPeriod, new TEvPublishResults);
}
- void PublishResults(const TActorContext& ctx, bool schedule = true) {
+ void PublishResults(const TActorContext& ctx, bool schedule = true) {
const TInstant now = ctx.Now();
TStringStream msg;
@@ -354,7 +354,7 @@ namespace NInterconnect {
msg << "{window# " << duration << " samples# " << Histogram.size();
TVector<TDuration> v;
v.reserve(Histogram.size());
- for (const auto& item : Histogram) {
+ for (const auto& item : Histogram) {
v.push_back(item.second);
}
std::sort(v.begin(), v.end());
@@ -398,8 +398,8 @@ namespace NInterconnect {
}
};
- IActor* CreateLoadActor(const TLoadParams& params) {
+ IActor* CreateLoadActor(const TLoadParams& params) {
return new TLoadActor(params);
}
-}
+}