diff options
author | akchernikov <akchernikov@yandex-team.ru> | 2022-02-10 16:50:44 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:44 +0300 |
commit | 87cccadbd489f00bc6d81b27ad182277cbb25826 (patch) | |
tree | 631e1bdaa6d14f4263934dd0b2d6098cafcfe296 /library | |
parent | 7f150aad14bac3241bf862a8f85bbd69547769ef (diff) | |
download | ydb-87cccadbd489f00bc6d81b27ad182277cbb25826.tar.gz |
Restoring authorship annotation for <akchernikov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library')
15 files changed, 758 insertions, 758 deletions
diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp index 5f63b5af58..90beec0407 100644 --- a/library/cpp/actors/core/log.cpp +++ b/library/cpp/actors/core/log.cpp @@ -33,137 +33,137 @@ namespace { } namespace NActors { - - class TLoggerCounters : public ILoggerMetrics { - public: - TLoggerCounters(TIntrusivePtr<NMonitoring::TDynamicCounters> counters) - : DynamicCounters(counters) - { - ActorMsgs_ = DynamicCounters->GetCounter("ActorMsgs", true); - DirectMsgs_ = DynamicCounters->GetCounter("DirectMsgs", true); - LevelRequests_ = DynamicCounters->GetCounter("LevelRequests", true); - IgnoredMsgs_ = DynamicCounters->GetCounter("IgnoredMsgs", true); - DroppedMsgs_ = DynamicCounters->GetCounter("DroppedMsgs", true); - - AlertMsgs_ = DynamicCounters->GetCounter("AlertMsgs", true); - EmergMsgs_ = DynamicCounters->GetCounter("EmergMsgs", true); - } - - ~TLoggerCounters() = default; - - void IncActorMsgs() override { - ++*ActorMsgs_; - } - void IncDirectMsgs() override { - ++*DirectMsgs_; - } - void IncLevelRequests() override { - ++*LevelRequests_; - } - void IncIgnoredMsgs() override { - ++*IgnoredMsgs_; - } - void IncAlertMsgs() override { - ++*AlertMsgs_; - } - void IncEmergMsgs() override { - ++*EmergMsgs_; - } - void IncDroppedMsgs() override { - DroppedMsgs_->Inc(); - }; - - void GetOutputHtml(IOutputStream& str) override { - HTML(str) { - DIV_CLASS("row") { - DIV_CLASS("col-md-12") { - H4() { - str << "Counters" << Endl; - } - DynamicCounters->OutputHtml(str); - } - } - } - } - - private: - NMonitoring::TDynamicCounters::TCounterPtr ActorMsgs_; - NMonitoring::TDynamicCounters::TCounterPtr DirectMsgs_; - NMonitoring::TDynamicCounters::TCounterPtr LevelRequests_; - NMonitoring::TDynamicCounters::TCounterPtr IgnoredMsgs_; - NMonitoring::TDynamicCounters::TCounterPtr AlertMsgs_; - NMonitoring::TDynamicCounters::TCounterPtr EmergMsgs_; - // Dropped while the logger backend was unavailable - NMonitoring::TDynamicCounters::TCounterPtr DroppedMsgs_; - - TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters; - }; - - class TLoggerMetrics : public ILoggerMetrics { - public: - TLoggerMetrics(std::shared_ptr<NMonitoring::TMetricRegistry> metrics) - : Metrics(metrics) - { - ActorMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.actor_msgs"}}); - DirectMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.direct_msgs"}}); - LevelRequests_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.level_requests"}}); - IgnoredMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.ignored_msgs"}}); - DroppedMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.dropped_msgs"}}); - - AlertMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.alert_msgs"}}); - EmergMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.emerg_msgs"}}); - } - - ~TLoggerMetrics() = default; - - void IncActorMsgs() override { - ActorMsgs_->Inc(); - } - void IncDirectMsgs() override { - DirectMsgs_->Inc(); - } - void IncLevelRequests() override { - LevelRequests_->Inc(); - } - void IncIgnoredMsgs() override { - IgnoredMsgs_->Inc(); - } - void IncAlertMsgs() override { - AlertMsgs_->Inc(); - } - void IncEmergMsgs() override { - EmergMsgs_->Inc(); - } - void IncDroppedMsgs() override { - DroppedMsgs_->Inc(); - }; - - void GetOutputHtml(IOutputStream& str) override { - HTML(str) { - DIV_CLASS("row") { - DIV_CLASS("col-md-12") { - H4() { - str << "Metrics" << Endl; - } - // TODO: Now, TMetricRegistry does not have the GetOutputHtml function - } - } - } - } - - private: - NMonitoring::TRate* ActorMsgs_; - NMonitoring::TRate* DirectMsgs_; - NMonitoring::TRate* LevelRequests_; - NMonitoring::TRate* IgnoredMsgs_; - NMonitoring::TRate* AlertMsgs_; - NMonitoring::TRate* EmergMsgs_; - // Dropped while the logger backend was unavailable - NMonitoring::TRate* DroppedMsgs_; - - std::shared_ptr<NMonitoring::TMetricRegistry> Metrics; - }; - + + class TLoggerCounters : public ILoggerMetrics { + public: + TLoggerCounters(TIntrusivePtr<NMonitoring::TDynamicCounters> counters) + : DynamicCounters(counters) + { + ActorMsgs_ = DynamicCounters->GetCounter("ActorMsgs", true); + DirectMsgs_ = DynamicCounters->GetCounter("DirectMsgs", true); + LevelRequests_ = DynamicCounters->GetCounter("LevelRequests", true); + IgnoredMsgs_ = DynamicCounters->GetCounter("IgnoredMsgs", true); + DroppedMsgs_ = DynamicCounters->GetCounter("DroppedMsgs", true); + + AlertMsgs_ = DynamicCounters->GetCounter("AlertMsgs", true); + EmergMsgs_ = DynamicCounters->GetCounter("EmergMsgs", true); + } + + ~TLoggerCounters() = default; + + void IncActorMsgs() override { + ++*ActorMsgs_; + } + void IncDirectMsgs() override { + ++*DirectMsgs_; + } + void IncLevelRequests() override { + ++*LevelRequests_; + } + void IncIgnoredMsgs() override { + ++*IgnoredMsgs_; + } + void IncAlertMsgs() override { + ++*AlertMsgs_; + } + void IncEmergMsgs() override { + ++*EmergMsgs_; + } + void IncDroppedMsgs() override { + DroppedMsgs_->Inc(); + }; + + void GetOutputHtml(IOutputStream& str) override { + HTML(str) { + DIV_CLASS("row") { + DIV_CLASS("col-md-12") { + H4() { + str << "Counters" << Endl; + } + DynamicCounters->OutputHtml(str); + } + } + } + } + + private: + NMonitoring::TDynamicCounters::TCounterPtr ActorMsgs_; + NMonitoring::TDynamicCounters::TCounterPtr DirectMsgs_; + NMonitoring::TDynamicCounters::TCounterPtr LevelRequests_; + NMonitoring::TDynamicCounters::TCounterPtr IgnoredMsgs_; + NMonitoring::TDynamicCounters::TCounterPtr AlertMsgs_; + NMonitoring::TDynamicCounters::TCounterPtr EmergMsgs_; + // Dropped while the logger backend was unavailable + NMonitoring::TDynamicCounters::TCounterPtr DroppedMsgs_; + + TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters; + }; + + class TLoggerMetrics : public ILoggerMetrics { + public: + TLoggerMetrics(std::shared_ptr<NMonitoring::TMetricRegistry> metrics) + : Metrics(metrics) + { + ActorMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.actor_msgs"}}); + DirectMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.direct_msgs"}}); + LevelRequests_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.level_requests"}}); + IgnoredMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.ignored_msgs"}}); + DroppedMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.dropped_msgs"}}); + + AlertMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.alert_msgs"}}); + EmergMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.emerg_msgs"}}); + } + + ~TLoggerMetrics() = default; + + void IncActorMsgs() override { + ActorMsgs_->Inc(); + } + void IncDirectMsgs() override { + DirectMsgs_->Inc(); + } + void IncLevelRequests() override { + LevelRequests_->Inc(); + } + void IncIgnoredMsgs() override { + IgnoredMsgs_->Inc(); + } + void IncAlertMsgs() override { + AlertMsgs_->Inc(); + } + void IncEmergMsgs() override { + EmergMsgs_->Inc(); + } + void IncDroppedMsgs() override { + DroppedMsgs_->Inc(); + }; + + void GetOutputHtml(IOutputStream& str) override { + HTML(str) { + DIV_CLASS("row") { + DIV_CLASS("col-md-12") { + H4() { + str << "Metrics" << Endl; + } + // TODO: Now, TMetricRegistry does not have the GetOutputHtml function + } + } + } + } + + private: + NMonitoring::TRate* ActorMsgs_; + NMonitoring::TRate* DirectMsgs_; + NMonitoring::TRate* LevelRequests_; + NMonitoring::TRate* IgnoredMsgs_; + NMonitoring::TRate* AlertMsgs_; + NMonitoring::TRate* EmergMsgs_; + // Dropped while the logger backend was unavailable + NMonitoring::TRate* DroppedMsgs_; + + std::shared_ptr<NMonitoring::TMetricRegistry> Metrics; + }; + TAtomic TLoggerActor::IsOverflow = 0; TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings, @@ -172,7 +172,7 @@ namespace NActors { : TActor(&TLoggerActor::StateFunc) , Settings(settings) , LogBackend(logBackend.Release()) - , Metrics(std::make_unique<TLoggerCounters>(counters)) + , Metrics(std::make_unique<TLoggerCounters>(counters)) { } @@ -182,35 +182,35 @@ namespace NActors { : TActor(&TLoggerActor::StateFunc) , Settings(settings) , LogBackend(logBackend) - , Metrics(std::make_unique<TLoggerCounters>(counters)) - { - } - - TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings, - TAutoPtr<TLogBackend> logBackend, - std::shared_ptr<NMonitoring::TMetricRegistry> metrics) - : TActor(&TLoggerActor::StateFunc) - , Settings(settings) - , LogBackend(logBackend.Release()) - , Metrics(std::make_unique<TLoggerMetrics>(metrics)) + , Metrics(std::make_unique<TLoggerCounters>(counters)) { } - TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings, - std::shared_ptr<TLogBackend> logBackend, - std::shared_ptr<NMonitoring::TMetricRegistry> metrics) - : TActor(&TLoggerActor::StateFunc) - , Settings(settings) - , LogBackend(logBackend) - , Metrics(std::make_unique<TLoggerMetrics>(metrics)) + TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings, + TAutoPtr<TLogBackend> logBackend, + std::shared_ptr<NMonitoring::TMetricRegistry> metrics) + : TActor(&TLoggerActor::StateFunc) + , Settings(settings) + , LogBackend(logBackend.Release()) + , Metrics(std::make_unique<TLoggerMetrics>(metrics)) { + } + + TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings, + std::shared_ptr<TLogBackend> logBackend, + std::shared_ptr<NMonitoring::TMetricRegistry> metrics) + : TActor(&TLoggerActor::StateFunc) + , Settings(settings) + , LogBackend(logBackend) + , Metrics(std::make_unique<TLoggerMetrics>(metrics)) + { } TLoggerActor::~TLoggerActor() { } void TLoggerActor::Log(TInstant time, NLog::EPriority priority, NLog::EComponent component, const char* c, ...) { - Metrics->IncDirectMsgs(); + Metrics->IncDirectMsgs(); if (Settings && Settings->Satisfies(priority, component, 0ull)) { va_list params; va_start(params, c); @@ -247,16 +247,16 @@ namespace NActors { } void TLoggerActor::WriteMessageStat(const NLog::TEvLog& ev) { - Metrics->IncActorMsgs(); + Metrics->IncActorMsgs(); const auto prio = ev.Level.ToPrio(); switch (prio) { case ::NActors::NLog::EPrio::Alert: - Metrics->IncAlertMsgs(); + Metrics->IncAlertMsgs(); break; case ::NActors::NLog::EPrio::Emerg: - Metrics->IncEmergMsgs(); + Metrics->IncEmergMsgs(); break; default: break; @@ -274,7 +274,7 @@ namespace NActors { // Check if some records have to be dropped if ((PassedCount > 10 && delayMillisec > (i64)Settings->TimeThresholdMs) || IgnoredCount > 0) { - Metrics->IncIgnoredMsgs(); + Metrics->IncIgnoredMsgs(); if (IgnoredCount == 0) { ctx.Send(ctx.SelfID, new TLogIgnored()); } @@ -303,7 +303,7 @@ namespace NActors { } void TLoggerActor::HandleLogComponentLevelRequest(TLogComponentLevelRequest::TPtr& ev, const NActors::TActorContext& ctx) { - Metrics->IncLevelRequests(); + Metrics->IncLevelRequests(); TString explanation; int code = Settings->SetLevel(ev->Get()->Priority, ev->Get()->Component, explanation); ctx.Send(ev->Sender, new TLogComponentLevelResponse(code, explanation)); @@ -565,7 +565,7 @@ namespace NActors { str << "</form>" << Endl; } } - Metrics->GetOutputHtml(str); + Metrics->GetOutputHtml(str); } } @@ -642,7 +642,7 @@ namespace NActors { void TLoggerActor::HandleLogEventDrop(const NLog::TEvLog::TPtr& ev) { WriteMessageStat(*ev->Get()); - Metrics->IncDroppedMsgs(); + Metrics->IncDroppedMsgs(); } void TLoggerActor::HandleWakeup() { diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index c11a7cf3c1..f5f11db614 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -15,7 +15,7 @@ #include <util/string/builder.h> #include <library/cpp/logger/all.h> #include <library/cpp/monlib/dynamic_counters/counters.h> -#include <library/cpp/monlib/metrics/metric_registry.h> +#include <library/cpp/monlib/metrics/metric_registry.h> #include <library/cpp/json/writer/json.h> #include <library/cpp/svnversion/svnversion.h> @@ -175,21 +175,21 @@ namespace NActors { //////////////////////////////////////////////////////////////////////////////// // LOGGER ACTOR //////////////////////////////////////////////////////////////////////////////// - class ILoggerMetrics { - public: - virtual ~ILoggerMetrics() = default; - - virtual void IncActorMsgs() = 0; - virtual void IncDirectMsgs() = 0; - virtual void IncLevelRequests() = 0; - virtual void IncIgnoredMsgs() = 0; - virtual void IncAlertMsgs() = 0; - virtual void IncEmergMsgs() = 0; - virtual void IncDroppedMsgs() = 0; - - virtual void GetOutputHtml(IOutputStream&) = 0; - }; - + class ILoggerMetrics { + public: + virtual ~ILoggerMetrics() = default; + + virtual void IncActorMsgs() = 0; + virtual void IncDirectMsgs() = 0; + virtual void IncLevelRequests() = 0; + virtual void IncIgnoredMsgs() = 0; + virtual void IncAlertMsgs() = 0; + virtual void IncEmergMsgs() = 0; + virtual void IncDroppedMsgs() = 0; + + virtual void GetOutputHtml(IOutputStream&) = 0; + }; + class TLoggerActor: public TActor<TLoggerActor> { public: static constexpr IActor::EActivityType ActorActivityType() { @@ -202,12 +202,12 @@ namespace NActors { TLoggerActor(TIntrusivePtr<NLog::TSettings> settings, std::shared_ptr<TLogBackend> logBackend, TIntrusivePtr<NMonitoring::TDynamicCounters> counters); - TLoggerActor(TIntrusivePtr<NLog::TSettings> settings, - TAutoPtr<TLogBackend> logBackend, - std::shared_ptr<NMonitoring::TMetricRegistry> metrics); - TLoggerActor(TIntrusivePtr<NLog::TSettings> settings, - std::shared_ptr<TLogBackend> logBackend, - std::shared_ptr<NMonitoring::TMetricRegistry> metrics); + TLoggerActor(TIntrusivePtr<NLog::TSettings> settings, + TAutoPtr<TLogBackend> logBackend, + std::shared_ptr<NMonitoring::TMetricRegistry> metrics); + TLoggerActor(TIntrusivePtr<NLog::TSettings> settings, + std::shared_ptr<TLogBackend> logBackend, + std::shared_ptr<NMonitoring::TMetricRegistry> metrics); ~TLoggerActor(); void StateFunc(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { @@ -241,7 +241,7 @@ namespace NActors { ui64 PassedCount = 0; static TAtomic IsOverflow; TDuration WakeupInterval{TDuration::Seconds(5)}; - std::unique_ptr<ILoggerMetrics> Metrics; + std::unique_ptr<ILoggerMetrics> Metrics; void BecomeDefunct(); void HandleIgnoredEvent(TLogIgnored::TPtr& ev, const NActors::TActorContext& ctx); diff --git a/library/cpp/actors/interconnect/channel_scheduler.h b/library/cpp/actors/interconnect/channel_scheduler.h index 551a4cb61a..803135e839 100644 --- a/library/cpp/actors/interconnect/channel_scheduler.h +++ b/library/cpp/actors/interconnect/channel_scheduler.h @@ -3,15 +3,15 @@ #include "interconnect_channel.h" #include "event_holder_pool.h" -#include <memory> - +#include <memory> + namespace NActors { class TChannelScheduler { const ui32 PeerNodeId; std::array<std::optional<TEventOutputChannel>, 16> ChannelArray; THashMap<ui16, TEventOutputChannel> ChannelMap; - std::shared_ptr<IInterconnectMetrics> Metrics; + std::shared_ptr<IInterconnectMetrics> Metrics; TEventHolderPool& Pool; const ui32 MaxSerializedEventSize; const TSessionParams Params; @@ -29,10 +29,10 @@ namespace NActors { public: TChannelScheduler(ui32 peerNodeId, const TChannelsConfig& predefinedChannels, - std::shared_ptr<IInterconnectMetrics> metrics, TEventHolderPool& pool, ui32 maxSerializedEventSize, + std::shared_ptr<IInterconnectMetrics> metrics, TEventHolderPool& pool, ui32 maxSerializedEventSize, TSessionParams params) : PeerNodeId(peerNodeId) - , Metrics(std::move(metrics)) + , Metrics(std::move(metrics)) , Pool(pool) , MaxSerializedEventSize(maxSerializedEventSize) , Params(std::move(params)) @@ -73,7 +73,7 @@ namespace NActors { if (channel < ChannelArray.size()) { auto& res = ChannelArray[channel]; if (Y_UNLIKELY(!res)) { - res.emplace(Pool, channel, PeerNodeId, MaxSerializedEventSize, Metrics, + res.emplace(Pool, channel, PeerNodeId, MaxSerializedEventSize, Metrics, Params); } return *res; @@ -82,7 +82,7 @@ namespace NActors { if (Y_UNLIKELY(it == ChannelMap.end())) { it = ChannelMap.emplace(std::piecewise_construct, std::forward_as_tuple(channel), std::forward_as_tuple(Pool, channel, PeerNodeId, MaxSerializedEventSize, - Metrics, Params)).first; + Metrics, Params)).first; } return it->second; } diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index a66ba2a154..dc366d590c 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -38,7 +38,7 @@ namespace NActors { task.AppendBuf(part, amount); *weightConsumed += amount; OutputQueueSize -= part->Size; - Metrics->UpdateOutputChannelEvents(ChannelId); + Metrics->UpdateOutputChannelEvents(ChannelId); return true; } diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index e4a0ae3cda..5a9cc206d7 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -43,12 +43,12 @@ namespace NActors { class TEventOutputChannel : public TInterconnectLoggingBase { public: TEventOutputChannel(TEventHolderPool& pool, ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize, - std::shared_ptr<IInterconnectMetrics> metrics, TSessionParams params) + std::shared_ptr<IInterconnectMetrics> metrics, TSessionParams params) : TInterconnectLoggingBase(Sprintf("OutputChannel %" PRIu16 " [node %" PRIu32 "]", id, peerNodeId)) , Pool(pool) , PeerNodeId(peerNodeId) , ChannelId(id) - , Metrics(std::move(metrics)) + , Metrics(std::move(metrics)) , Params(std::move(params)) , MaxSerializedEventSize(maxSerializedEventSize) {} @@ -88,7 +88,7 @@ namespace NActors { TEventHolderPool& Pool; const ui32 PeerNodeId; const ui16 ChannelId; - std::shared_ptr<IInterconnectMetrics> Metrics; + std::shared_ptr<IInterconnectMetrics> Metrics; const TSessionParams Params; const ui32 MaxSerializedEventSize; ui64 UnaccountedTraffic = 0; @@ -118,7 +118,7 @@ namespace NActors { void AccountTraffic() { if (const ui64 amount = std::exchange(UnaccountedTraffic, 0)) { - Metrics->UpdateOutputChannelTraffic(ChannelId, amount); + Metrics->UpdateOutputChannelTraffic(ChannelId, amount); } } diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h index 285709a00c..a039312fb6 100644 --- a/library/cpp/actors/interconnect/interconnect_common.h +++ b/library/cpp/actors/interconnect/interconnect_common.h @@ -4,7 +4,7 @@ #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/util/datetime.h> #include <library/cpp/monlib/dynamic_counters/counters.h> -#include <library/cpp/monlib/metrics/metric_registry.h> +#include <library/cpp/monlib/metrics/metric_registry.h> #include <util/generic/map.h> #include <util/generic/set.h> #include <util/system/datetime.h> @@ -73,7 +73,7 @@ namespace NActors { struct TInterconnectProxyCommon : TAtomicRefCount<TInterconnectProxyCommon> { TActorId NameserviceId; NMonitoring::TDynamicCounterPtr MonCounters; - std::shared_ptr<NMonitoring::IMetricRegistry> Metrics; + std::shared_ptr<NMonitoring::IMetricRegistry> Metrics; TChannelsConfig ChannelsConfig; TInterconnectSettings Settings; TRegisterMonPageCallback RegisterMonPage; diff --git a/library/cpp/actors/interconnect/interconnect_counters.cpp b/library/cpp/actors/interconnect/interconnect_counters.cpp index 224160d4b4..a206818160 100644 --- a/library/cpp/actors/interconnect/interconnect_counters.cpp +++ b/library/cpp/actors/interconnect/interconnect_counters.cpp @@ -1,15 +1,15 @@ -#include "interconnect_counters.h" +#include "interconnect_counters.h" -#include <library/cpp/monlib/metrics/metric_registry.h> -#include <library/cpp/monlib/metrics/metric_sub_registry.h> +#include <library/cpp/monlib/metrics/metric_registry.h> +#include <library/cpp/monlib/metrics/metric_sub_registry.h> -#include <unordered_map> +#include <unordered_map> namespace NActors { -namespace { - - class TInterconnectCounters: public IInterconnectMetrics { +namespace { + + class TInterconnectCounters: public IInterconnectMetrics { public: struct TOutputChannel { NMonitoring::TDynamicCounters::TCounterPtr Traffic; @@ -87,7 +87,7 @@ namespace { NMonitoring::TDynamicCounterPtr PerSessionCounters; NMonitoring::TDynamicCounterPtr PerDataCenterCounters; NMonitoring::TDynamicCounterPtr& AdaptiveCounters; - + bool Initialized = false; NMonitoring::TDynamicCounters::TCounterPtr Traffic; @@ -100,135 +100,135 @@ namespace { , MergePerDataCenterCounters(common->Settings.MergePerDataCenterCounters) , MergePerPeerCounters(common->Settings.MergePerPeerCounters) , Counters(common->MonCounters) - , AdaptiveCounters(MergePerDataCenterCounters - ? PerDataCenterCounters : - MergePerPeerCounters ? Counters : PerSessionCounters) + , AdaptiveCounters(MergePerDataCenterCounters + ? PerDataCenterCounters : + MergePerPeerCounters ? Counters : PerSessionCounters) {} void AddInflightDataAmount(ui64 value) override { - *InflightDataAmount += value; - } - + *InflightDataAmount += value; + } + void SubInflightDataAmount(ui64 value) override { - *InflightDataAmount -= value; - } - + *InflightDataAmount -= value; + } + void AddTotalBytesWritten(ui64 value) override { - *TotalBytesWritten += value; - } - + *TotalBytesWritten += value; + } + void SetClockSkewMicrosec(i64 value) override { - *ClockSkewMicrosec = value; - } - - void IncSessionDeaths() override { - ++*SessionDeaths; - } - - void IncHandshakeFails() override { - ++*HandshakeFails; - } - - void SetConnected(ui32 value) override { - *Connected = value; - } - - void IncSubscribersCount() override { - ++*SubscribersCount; - } - - void SubSubscribersCount(ui32 value) override { - *SubscribersCount -= value; - } - + *ClockSkewMicrosec = value; + } + + void IncSessionDeaths() override { + ++*SessionDeaths; + } + + void IncHandshakeFails() override { + ++*HandshakeFails; + } + + void SetConnected(ui32 value) override { + *Connected = value; + } + + void IncSubscribersCount() override { + ++*SubscribersCount; + } + + void SubSubscribersCount(ui32 value) override { + *SubscribersCount -= value; + } + void SubOutputBuffersTotalSize(ui64 value) override { - *OutputBuffersTotalSize -= value; - } - + *OutputBuffersTotalSize -= value; + } + void AddOutputBuffersTotalSize(ui64 value) override { - *OutputBuffersTotalSize += value; - } - + *OutputBuffersTotalSize += value; + } + ui64 GetOutputBuffersTotalSize() const override { - return *OutputBuffersTotalSize; - } - - void IncDisconnections() override { - ++*Disconnections; - } - - void IncUsefulWriteWakeups() override { - ++*UsefulWriteWakeups; - } - - void IncSpuriousWriteWakeups() override { - ++*SpuriousWriteWakeups; - } - - void IncSendSyscalls() override { - ++*SendSyscalls; - } - - void IncInflyLimitReach() override { - ++*InflyLimitReach; - } - - void IncUsefulReadWakeups() override { - ++*UsefulReadWakeups; - } - - void IncSpuriousReadWakeups() override { - ++*SpuriousReadWakeups; - } - - void IncDisconnectByReason(const TString& s) override { - if (auto it = DisconnectByReason.find(s); it != DisconnectByReason.end()) { - it->second->Inc(); - } - } - + return *OutputBuffersTotalSize; + } + + void IncDisconnections() override { + ++*Disconnections; + } + + void IncUsefulWriteWakeups() override { + ++*UsefulWriteWakeups; + } + + void IncSpuriousWriteWakeups() override { + ++*SpuriousWriteWakeups; + } + + void IncSendSyscalls() override { + ++*SendSyscalls; + } + + void IncInflyLimitReach() override { + ++*InflyLimitReach; + } + + void IncUsefulReadWakeups() override { + ++*UsefulReadWakeups; + } + + void IncSpuriousReadWakeups() override { + ++*SpuriousReadWakeups; + } + + void IncDisconnectByReason(const TString& s) override { + if (auto it = DisconnectByReason.find(s); it != DisconnectByReason.end()) { + it->second->Inc(); + } + } + void AddInputChannelsIncomingTraffic(ui16 channel, ui64 incomingTraffic) override { - auto& ch = InputChannels.Get(channel); - *ch.IncomingTraffic += incomingTraffic; - } - - void IncInputChannelsIncomingEvents(ui16 channel) override { - auto& ch = InputChannels.Get(channel); - ++*ch.IncomingEvents; - } - - void IncRecvSyscalls() override { - ++*RecvSyscalls; - } - + auto& ch = InputChannels.Get(channel); + *ch.IncomingTraffic += incomingTraffic; + } + + void IncInputChannelsIncomingEvents(ui16 channel) override { + auto& ch = InputChannels.Get(channel); + ++*ch.IncomingEvents; + } + + void IncRecvSyscalls() override { + ++*RecvSyscalls; + } + void AddTotalBytesRead(ui64 value) override { - *TotalBytesRead += value; - } - - void UpdateLegacyPingTimeHist(ui64 value) override { - LegacyPingTimeHist.Add(value); - PingTimeHistogram->Collect(value); - } - + *TotalBytesRead += value; + } + + void UpdateLegacyPingTimeHist(ui64 value) override { + LegacyPingTimeHist.Add(value); + PingTimeHistogram->Collect(value); + } + void UpdateOutputChannelTraffic(ui16 channel, ui64 value) override { - if (GetOutputChannel(channel).OutgoingTraffic) { - *(GetOutputChannel(channel).OutgoingTraffic) += value; - } - if (GetOutputChannel(channel).Traffic) { - *(GetOutputChannel(channel).Traffic) += value; - } - } - - void UpdateOutputChannelEvents(ui16 channel) override { - if (GetOutputChannel(channel).OutgoingEvents) { - ++*(GetOutputChannel(channel).OutgoingEvents); - } - if (GetOutputChannel(channel).Events) { - ++*(GetOutputChannel(channel).Events); - } - } - - void SetPeerInfo(const TString& name, const TString& dataCenterId) override { + if (GetOutputChannel(channel).OutgoingTraffic) { + *(GetOutputChannel(channel).OutgoingTraffic) += value; + } + if (GetOutputChannel(channel).Traffic) { + *(GetOutputChannel(channel).Traffic) += value; + } + } + + void UpdateOutputChannelEvents(ui16 channel) override { + if (GetOutputChannel(channel).OutgoingEvents) { + ++*(GetOutputChannel(channel).OutgoingEvents); + } + if (GetOutputChannel(channel).Events) { + ++*(GetOutputChannel(channel).Events); + } + } + + void SetPeerInfo(const TString& name, const TString& dataCenterId) override { if (name != std::exchange(HumanFriendlyPeerHostName, name)) { PerSessionCounters.Reset(); } @@ -312,7 +312,7 @@ namespace { return it != OutputChannels.end() ? it->second : OtherOutputChannel; } - private: + private: NMonitoring::TDynamicCounters::TCounterPtr SessionDeaths; NMonitoring::TDynamicCounters::TCounterPtr HandshakeFails; NMonitoring::TDynamicCounters::TCounterPtr Connected; @@ -340,353 +340,353 @@ namespace { NMonitoring::TDynamicCounters::TCounterPtr TotalBytesWritten, TotalBytesRead; }; - class TInterconnectMetrics: public IInterconnectMetrics { - public: - struct TOutputChannel { - NMonitoring::IRate* Traffic; - NMonitoring::IRate* Events; - NMonitoring::IRate* OutgoingTraffic; - NMonitoring::IRate* OutgoingEvents; - - TOutputChannel() = default; - - TOutputChannel(const std::shared_ptr<NMonitoring::IMetricRegistry>& metrics, - NMonitoring::IRate* traffic, - NMonitoring::IRate* events) - : Traffic(traffic) - , Events(events) - , OutgoingTraffic(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.outgoing_traffic"}}))) - , OutgoingEvents(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.outgoing_events"}}))) - {} - - TOutputChannel(const TOutputChannel&) = default; - }; - - struct TInputChannel { - NMonitoring::IRate* Traffic; - NMonitoring::IRate* Events; - NMonitoring::IRate* ScopeErrors; - NMonitoring::IRate* IncomingTraffic; - NMonitoring::IRate* IncomingEvents; - - TInputChannel() = default; - - TInputChannel(const std::shared_ptr<NMonitoring::IMetricRegistry>& metrics, - NMonitoring::IRate* traffic, NMonitoring::IRate* events, - NMonitoring::IRate* scopeErrors) - : Traffic(traffic) - , Events(events) - , ScopeErrors(scopeErrors) - , IncomingTraffic(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.incoming_traffic"}}))) - , IncomingEvents(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.incoming_events"}}))) - {} - - TInputChannel(const TInputChannel&) = default; - }; - - struct TInputChannels : std::unordered_map<ui16, TInputChannel> { - TInputChannel OtherInputChannel; - - TInputChannels() = default; - - TInputChannels(const std::shared_ptr<NMonitoring::IMetricRegistry>& metrics, - const std::unordered_map<ui16, TString>& names, - NMonitoring::IRate* traffic, NMonitoring::IRate* events, - NMonitoring::IRate* scopeErrors) - : OtherInputChannel(std::make_shared<NMonitoring::TMetricSubRegistry>( - NMonitoring::TLabels{{"channel", "other"}}, metrics), traffic, events, scopeErrors) - { - for (const auto& [id, name] : names) { - try_emplace(id, std::make_shared<NMonitoring::TMetricSubRegistry>(NMonitoring::TLabels{{"channel", name}}, metrics), - traffic, events, scopeErrors); - } - } - - TInputChannels(const TInputChannels&) = default; - - const TInputChannel& Get(ui16 id) const { - const auto it = find(id); - return it != end() ? it->second : OtherInputChannel; - } - }; - - TInterconnectMetrics(const TInterconnectProxyCommon::TPtr& common) - : Common(common) - , MergePerDataCenterMetrics_(common->Settings.MergePerDataCenterCounters) - , MergePerPeerMetrics_(common->Settings.MergePerPeerCounters) - , Metrics_(common->Metrics) - , AdaptiveMetrics_(MergePerDataCenterMetrics_ - ? PerDataCenterMetrics_ : - MergePerPeerMetrics_ ? Metrics_ : PerSessionMetrics_) - {} - + class TInterconnectMetrics: public IInterconnectMetrics { + public: + struct TOutputChannel { + NMonitoring::IRate* Traffic; + NMonitoring::IRate* Events; + NMonitoring::IRate* OutgoingTraffic; + NMonitoring::IRate* OutgoingEvents; + + TOutputChannel() = default; + + TOutputChannel(const std::shared_ptr<NMonitoring::IMetricRegistry>& metrics, + NMonitoring::IRate* traffic, + NMonitoring::IRate* events) + : Traffic(traffic) + , Events(events) + , OutgoingTraffic(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.outgoing_traffic"}}))) + , OutgoingEvents(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.outgoing_events"}}))) + {} + + TOutputChannel(const TOutputChannel&) = default; + }; + + struct TInputChannel { + NMonitoring::IRate* Traffic; + NMonitoring::IRate* Events; + NMonitoring::IRate* ScopeErrors; + NMonitoring::IRate* IncomingTraffic; + NMonitoring::IRate* IncomingEvents; + + TInputChannel() = default; + + TInputChannel(const std::shared_ptr<NMonitoring::IMetricRegistry>& metrics, + NMonitoring::IRate* traffic, NMonitoring::IRate* events, + NMonitoring::IRate* scopeErrors) + : Traffic(traffic) + , Events(events) + , ScopeErrors(scopeErrors) + , IncomingTraffic(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.incoming_traffic"}}))) + , IncomingEvents(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.incoming_events"}}))) + {} + + TInputChannel(const TInputChannel&) = default; + }; + + struct TInputChannels : std::unordered_map<ui16, TInputChannel> { + TInputChannel OtherInputChannel; + + TInputChannels() = default; + + TInputChannels(const std::shared_ptr<NMonitoring::IMetricRegistry>& metrics, + const std::unordered_map<ui16, TString>& names, + NMonitoring::IRate* traffic, NMonitoring::IRate* events, + NMonitoring::IRate* scopeErrors) + : OtherInputChannel(std::make_shared<NMonitoring::TMetricSubRegistry>( + NMonitoring::TLabels{{"channel", "other"}}, metrics), traffic, events, scopeErrors) + { + for (const auto& [id, name] : names) { + try_emplace(id, std::make_shared<NMonitoring::TMetricSubRegistry>(NMonitoring::TLabels{{"channel", name}}, metrics), + traffic, events, scopeErrors); + } + } + + TInputChannels(const TInputChannels&) = default; + + const TInputChannel& Get(ui16 id) const { + const auto it = find(id); + return it != end() ? it->second : OtherInputChannel; + } + }; + + TInterconnectMetrics(const TInterconnectProxyCommon::TPtr& common) + : Common(common) + , MergePerDataCenterMetrics_(common->Settings.MergePerDataCenterCounters) + , MergePerPeerMetrics_(common->Settings.MergePerPeerCounters) + , Metrics_(common->Metrics) + , AdaptiveMetrics_(MergePerDataCenterMetrics_ + ? PerDataCenterMetrics_ : + MergePerPeerMetrics_ ? Metrics_ : PerSessionMetrics_) + {} + void AddInflightDataAmount(ui64 value) override { - InflightDataAmount_->Add(value); - } - + InflightDataAmount_->Add(value); + } + void SubInflightDataAmount(ui64 value) override { - InflightDataAmount_->Add(-value); - } - + InflightDataAmount_->Add(-value); + } + void AddTotalBytesWritten(ui64 value) override { - TotalBytesWritten_->Add(value); - } - + TotalBytesWritten_->Add(value); + } + void SetClockSkewMicrosec(i64 value) override { - ClockSkewMicrosec_->Set(value); - } - - void IncSessionDeaths() override { - SessionDeaths_->Inc(); - } - - void IncHandshakeFails() override { - HandshakeFails_->Inc(); - } - - void SetConnected(ui32 value) override { - Connected_->Set(value); - } - - void IncSubscribersCount() override { - SubscribersCount_->Inc(); - } - - void SubSubscribersCount(ui32 value) override { - SubscribersCount_->Add(-value); - } - + ClockSkewMicrosec_->Set(value); + } + + void IncSessionDeaths() override { + SessionDeaths_->Inc(); + } + + void IncHandshakeFails() override { + HandshakeFails_->Inc(); + } + + void SetConnected(ui32 value) override { + Connected_->Set(value); + } + + void IncSubscribersCount() override { + SubscribersCount_->Inc(); + } + + void SubSubscribersCount(ui32 value) override { + SubscribersCount_->Add(-value); + } + void SubOutputBuffersTotalSize(ui64 value) override { - OutputBuffersTotalSize_->Add(-value); - } - + OutputBuffersTotalSize_->Add(-value); + } + void AddOutputBuffersTotalSize(ui64 value) override { - OutputBuffersTotalSize_->Add(value); - } - + OutputBuffersTotalSize_->Add(value); + } + ui64 GetOutputBuffersTotalSize() const override { - return OutputBuffersTotalSize_->Get(); - } - - void IncDisconnections() override { - Disconnections_->Inc(); - } - - void IncUsefulWriteWakeups() override { - UsefulWriteWakeups_->Inc(); - } - - void IncSpuriousWriteWakeups() override { - SpuriousWriteWakeups_->Inc(); - } - - void IncSendSyscalls() override { - SendSyscalls_->Inc(); - } - - void IncInflyLimitReach() override { - InflyLimitReach_->Inc(); - } - - void IncUsefulReadWakeups() override { - UsefulReadWakeups_->Inc(); - } - - void IncSpuriousReadWakeups() override { - SpuriousReadWakeups_->Inc(); - } - - void IncDisconnectByReason(const TString& s) override { - if (auto it = DisconnectByReason_.find(s); it != DisconnectByReason_.end()) { - it->second->Inc(); - } - } - + return OutputBuffersTotalSize_->Get(); + } + + void IncDisconnections() override { + Disconnections_->Inc(); + } + + void IncUsefulWriteWakeups() override { + UsefulWriteWakeups_->Inc(); + } + + void IncSpuriousWriteWakeups() override { + SpuriousWriteWakeups_->Inc(); + } + + void IncSendSyscalls() override { + SendSyscalls_->Inc(); + } + + void IncInflyLimitReach() override { + InflyLimitReach_->Inc(); + } + + void IncUsefulReadWakeups() override { + UsefulReadWakeups_->Inc(); + } + + void IncSpuriousReadWakeups() override { + SpuriousReadWakeups_->Inc(); + } + + void IncDisconnectByReason(const TString& s) override { + if (auto it = DisconnectByReason_.find(s); it != DisconnectByReason_.end()) { + it->second->Inc(); + } + } + void AddInputChannelsIncomingTraffic(ui16 channel, ui64 incomingTraffic) override { - auto& ch = InputChannels_.Get(channel); - ch.IncomingTraffic->Add(incomingTraffic); - } - - void IncInputChannelsIncomingEvents(ui16 channel) override { - auto& ch = InputChannels_.Get(channel); - ch.IncomingEvents->Inc(); - } - - void IncRecvSyscalls() override { - RecvSyscalls_->Inc(); - } - + auto& ch = InputChannels_.Get(channel); + ch.IncomingTraffic->Add(incomingTraffic); + } + + void IncInputChannelsIncomingEvents(ui16 channel) override { + auto& ch = InputChannels_.Get(channel); + ch.IncomingEvents->Inc(); + } + + void IncRecvSyscalls() override { + RecvSyscalls_->Inc(); + } + void AddTotalBytesRead(ui64 value) override { - TotalBytesRead_->Add(value); - } - - void UpdateLegacyPingTimeHist(ui64 value) override { - PingTimeHistogram_->Record(value); - } - + TotalBytesRead_->Add(value); + } + + void UpdateLegacyPingTimeHist(ui64 value) override { + PingTimeHistogram_->Record(value); + } + void UpdateOutputChannelTraffic(ui16 channel, ui64 value) override { - if (GetOutputChannel(channel).OutgoingTraffic) { - GetOutputChannel(channel).OutgoingTraffic->Add(value); - } - if (GetOutputChannel(channel).Traffic) { - GetOutputChannel(channel).Traffic->Add(value); - } - } - - void UpdateOutputChannelEvents(ui16 channel) override { - if (GetOutputChannel(channel).OutgoingEvents) { - GetOutputChannel(channel).OutgoingEvents->Inc(); - } - if (GetOutputChannel(channel).Events) { - GetOutputChannel(channel).Events->Inc(); - } - } - - void SetPeerInfo(const TString& name, const TString& dataCenterId) override { - if (name != std::exchange(HumanFriendlyPeerHostName, name)) { - PerSessionMetrics_.reset(); - } - VALGRIND_MAKE_READABLE(&DataCenterId, sizeof(DataCenterId)); - if (dataCenterId != std::exchange(DataCenterId, dataCenterId)) { - PerDataCenterMetrics_.reset(); - } - - const bool updatePerDataCenter = !PerDataCenterMetrics_ && MergePerDataCenterMetrics_; - if (updatePerDataCenter) { - PerDataCenterMetrics_ = std::make_shared<NMonitoring::TMetricSubRegistry>( - NMonitoring::TLabels{{"datacenter_id", *DataCenterId}}, Metrics_); - } - - const bool updatePerSession = !PerSessionMetrics_ || updatePerDataCenter; - if (updatePerSession) { - auto base = MergePerDataCenterMetrics_ ? PerDataCenterMetrics_ : Metrics_; - PerSessionMetrics_ = std::make_shared<NMonitoring::TMetricSubRegistry>( - NMonitoring::TLabels{{"peer", *HumanFriendlyPeerHostName}}, base); - } - - const bool updateGlobal = !Initialized_; - - const bool updateAdaptive = - &AdaptiveMetrics_ == &Metrics_ ? updateGlobal : - &AdaptiveMetrics_ == &PerSessionMetrics_ ? updatePerSession : - &AdaptiveMetrics_ == &PerDataCenterMetrics_ ? updatePerDataCenter : - false; - - auto createRate = [](std::shared_ptr<NMonitoring::IMetricRegistry> metrics, TStringBuf name) mutable { - return metrics->Rate(NMonitoring::MakeLabels(NMonitoring::TLabels{{"sensor", name}})); - }; - auto createIntGauge = [](std::shared_ptr<NMonitoring::IMetricRegistry> metrics, TStringBuf name) mutable { - return metrics->IntGauge(NMonitoring::MakeLabels(NMonitoring::TLabels{{"sensor", name}})); - }; - - if (updatePerSession) { - Connected_ = createIntGauge(PerSessionMetrics_, "interconnect.connected"); - Disconnections_ = createRate(PerSessionMetrics_, "interconnect.disconnections"); - ClockSkewMicrosec_ = createIntGauge(PerSessionMetrics_, "interconnect.clock_skew_microsec"); - Traffic_ = createRate(PerSessionMetrics_, "interconnect.traffic"); - Events_ = createRate(PerSessionMetrics_, "interconnect.events"); - ScopeErrors_ = createRate(PerSessionMetrics_, "interconnect.scope_errors"); - - for (const auto& [id, name] : Common->ChannelName) { - OutputChannels_.try_emplace(id, std::make_shared<NMonitoring::TMetricSubRegistry>( - NMonitoring::TLabels{{"channel", name}}, Metrics_), Traffic_, Events_); - } - OtherOutputChannel_ = TOutputChannel(std::make_shared<NMonitoring::TMetricSubRegistry>( - NMonitoring::TLabels{{"channel", "other"}}, Metrics_), Traffic_, Events_); - - InputChannels_ = TInputChannels(Metrics_, Common->ChannelName, Traffic_, Events_, ScopeErrors_); - } - - if (updateAdaptive) { - SessionDeaths_ = createRate(AdaptiveMetrics_, "interconnect.session_deaths"); - HandshakeFails_ = createRate(AdaptiveMetrics_, "interconnect.handshake_fails"); - InflyLimitReach_ = createRate(AdaptiveMetrics_, "interconnect.infly_limit_reach"); - InflightDataAmount_ = createRate(AdaptiveMetrics_, "interconnect.inflight_data"); - PingTimeHistogram_ = AdaptiveMetrics_->HistogramRate( - NMonitoring::MakeLabels({{"sensor", "interconnect.ping_time_us"}}), NMonitoring::ExponentialHistogram(18, 2, 125)); - } - - if (updateGlobal) { - OutputBuffersTotalSize_ = createRate(Metrics_, "interconnect.output_buffers_total_size"); - SendSyscalls_ = createRate(Metrics_, "interconnect.send_syscalls"); - RecvSyscalls_ = createRate(Metrics_, "interconnect.recv_syscalls"); - SpuriousReadWakeups_ = createRate(Metrics_, "interconnect.spurious_read_wakeups"); - UsefulReadWakeups_ = createRate(Metrics_, "interconnect.useful_read_wakeups"); - SpuriousWriteWakeups_ = createRate(Metrics_, "interconnect.spurious_write_wakeups"); - UsefulWriteWakeups_ = createRate(Metrics_, "interconnect.useful_write_wakeups"); - SubscribersCount_ = createIntGauge(AdaptiveMetrics_, "interconnect.subscribers_count"); - TotalBytesWritten_ = createRate(Metrics_, "interconnect.total_bytes_written"); - TotalBytesRead_ = createRate(Metrics_, "interconnect.total_bytes_read"); - - for (const char *reason : TDisconnectReason::Reasons) { + if (GetOutputChannel(channel).OutgoingTraffic) { + GetOutputChannel(channel).OutgoingTraffic->Add(value); + } + if (GetOutputChannel(channel).Traffic) { + GetOutputChannel(channel).Traffic->Add(value); + } + } + + void UpdateOutputChannelEvents(ui16 channel) override { + if (GetOutputChannel(channel).OutgoingEvents) { + GetOutputChannel(channel).OutgoingEvents->Inc(); + } + if (GetOutputChannel(channel).Events) { + GetOutputChannel(channel).Events->Inc(); + } + } + + void SetPeerInfo(const TString& name, const TString& dataCenterId) override { + if (name != std::exchange(HumanFriendlyPeerHostName, name)) { + PerSessionMetrics_.reset(); + } + VALGRIND_MAKE_READABLE(&DataCenterId, sizeof(DataCenterId)); + if (dataCenterId != std::exchange(DataCenterId, dataCenterId)) { + PerDataCenterMetrics_.reset(); + } + + const bool updatePerDataCenter = !PerDataCenterMetrics_ && MergePerDataCenterMetrics_; + if (updatePerDataCenter) { + PerDataCenterMetrics_ = std::make_shared<NMonitoring::TMetricSubRegistry>( + NMonitoring::TLabels{{"datacenter_id", *DataCenterId}}, Metrics_); + } + + const bool updatePerSession = !PerSessionMetrics_ || updatePerDataCenter; + if (updatePerSession) { + auto base = MergePerDataCenterMetrics_ ? PerDataCenterMetrics_ : Metrics_; + PerSessionMetrics_ = std::make_shared<NMonitoring::TMetricSubRegistry>( + NMonitoring::TLabels{{"peer", *HumanFriendlyPeerHostName}}, base); + } + + const bool updateGlobal = !Initialized_; + + const bool updateAdaptive = + &AdaptiveMetrics_ == &Metrics_ ? updateGlobal : + &AdaptiveMetrics_ == &PerSessionMetrics_ ? updatePerSession : + &AdaptiveMetrics_ == &PerDataCenterMetrics_ ? updatePerDataCenter : + false; + + auto createRate = [](std::shared_ptr<NMonitoring::IMetricRegistry> metrics, TStringBuf name) mutable { + return metrics->Rate(NMonitoring::MakeLabels(NMonitoring::TLabels{{"sensor", name}})); + }; + auto createIntGauge = [](std::shared_ptr<NMonitoring::IMetricRegistry> metrics, TStringBuf name) mutable { + return metrics->IntGauge(NMonitoring::MakeLabels(NMonitoring::TLabels{{"sensor", name}})); + }; + + if (updatePerSession) { + Connected_ = createIntGauge(PerSessionMetrics_, "interconnect.connected"); + Disconnections_ = createRate(PerSessionMetrics_, "interconnect.disconnections"); + ClockSkewMicrosec_ = createIntGauge(PerSessionMetrics_, "interconnect.clock_skew_microsec"); + Traffic_ = createRate(PerSessionMetrics_, "interconnect.traffic"); + Events_ = createRate(PerSessionMetrics_, "interconnect.events"); + ScopeErrors_ = createRate(PerSessionMetrics_, "interconnect.scope_errors"); + + for (const auto& [id, name] : Common->ChannelName) { + OutputChannels_.try_emplace(id, std::make_shared<NMonitoring::TMetricSubRegistry>( + NMonitoring::TLabels{{"channel", name}}, Metrics_), Traffic_, Events_); + } + OtherOutputChannel_ = TOutputChannel(std::make_shared<NMonitoring::TMetricSubRegistry>( + NMonitoring::TLabels{{"channel", "other"}}, Metrics_), Traffic_, Events_); + + InputChannels_ = TInputChannels(Metrics_, Common->ChannelName, Traffic_, Events_, ScopeErrors_); + } + + if (updateAdaptive) { + SessionDeaths_ = createRate(AdaptiveMetrics_, "interconnect.session_deaths"); + HandshakeFails_ = createRate(AdaptiveMetrics_, "interconnect.handshake_fails"); + InflyLimitReach_ = createRate(AdaptiveMetrics_, "interconnect.infly_limit_reach"); + InflightDataAmount_ = createRate(AdaptiveMetrics_, "interconnect.inflight_data"); + PingTimeHistogram_ = AdaptiveMetrics_->HistogramRate( + NMonitoring::MakeLabels({{"sensor", "interconnect.ping_time_us"}}), NMonitoring::ExponentialHistogram(18, 2, 125)); + } + + if (updateGlobal) { + OutputBuffersTotalSize_ = createRate(Metrics_, "interconnect.output_buffers_total_size"); + SendSyscalls_ = createRate(Metrics_, "interconnect.send_syscalls"); + RecvSyscalls_ = createRate(Metrics_, "interconnect.recv_syscalls"); + SpuriousReadWakeups_ = createRate(Metrics_, "interconnect.spurious_read_wakeups"); + UsefulReadWakeups_ = createRate(Metrics_, "interconnect.useful_read_wakeups"); + SpuriousWriteWakeups_ = createRate(Metrics_, "interconnect.spurious_write_wakeups"); + UsefulWriteWakeups_ = createRate(Metrics_, "interconnect.useful_write_wakeups"); + SubscribersCount_ = createIntGauge(AdaptiveMetrics_, "interconnect.subscribers_count"); + TotalBytesWritten_ = createRate(Metrics_, "interconnect.total_bytes_written"); + TotalBytesRead_ = createRate(Metrics_, "interconnect.total_bytes_read"); + + for (const char *reason : TDisconnectReason::Reasons) { DisconnectByReason_[reason] = Metrics_->Rate( NMonitoring::MakeLabels({ {"sensor", "interconnect.disconnect_reason"}, {"reason", reason}, })); - } - } - - Initialized_ = true; - } - - TOutputChannel GetOutputChannel(ui16 index) const { - Y_VERIFY(Initialized_); - const auto it = OutputChannels_.find(index); - return it != OutputChannels_.end() ? it->second : OtherOutputChannel_; - } - - private: - const TInterconnectProxyCommon::TPtr Common; - const bool MergePerDataCenterMetrics_; - const bool MergePerPeerMetrics_; - std::shared_ptr<NMonitoring::IMetricRegistry> Metrics_; - std::shared_ptr<NMonitoring::IMetricRegistry> PerSessionMetrics_; - std::shared_ptr<NMonitoring::IMetricRegistry> PerDataCenterMetrics_; - std::shared_ptr<NMonitoring::IMetricRegistry>& AdaptiveMetrics_; - bool Initialized_ = false; - - NMonitoring::IRate* Traffic_; - - NMonitoring::IRate* Events_; - NMonitoring::IRate* ScopeErrors_; - NMonitoring::IRate* Disconnections_; - NMonitoring::IIntGauge* Connected_; - - NMonitoring::IRate* SessionDeaths_; - NMonitoring::IRate* HandshakeFails_; - NMonitoring::IRate* InflyLimitReach_; - NMonitoring::IRate* InflightDataAmount_; - NMonitoring::IRate* OutputBuffersTotalSize_; - NMonitoring::IIntGauge* SubscribersCount_; - NMonitoring::IRate* SendSyscalls_; - NMonitoring::IRate* RecvSyscalls_; - NMonitoring::IRate* SpuriousWriteWakeups_; - NMonitoring::IRate* UsefulWriteWakeups_; - NMonitoring::IRate* SpuriousReadWakeups_; - NMonitoring::IRate* UsefulReadWakeups_; - NMonitoring::IIntGauge* ClockSkewMicrosec_; - - NMonitoring::IHistogram* PingTimeHistogram_; - - std::unordered_map<ui16, TOutputChannel> OutputChannels_; - TOutputChannel OtherOutputChannel_; - TInputChannels InputChannels_; - - THashMap<TString, NMonitoring::IRate*> DisconnectByReason_; - - NMonitoring::IRate* TotalBytesWritten_; - NMonitoring::IRate* TotalBytesRead_; - }; - -} // namespace - -std::unique_ptr<IInterconnectMetrics> CreateInterconnectCounters(const TInterconnectProxyCommon::TPtr& common) { - return std::make_unique<TInterconnectCounters>(common); -} - -std::unique_ptr<IInterconnectMetrics> CreateInterconnectMetrics(const TInterconnectProxyCommon::TPtr& common) { - return std::make_unique<TInterconnectMetrics>(common); -} - + } + } + + Initialized_ = true; + } + + TOutputChannel GetOutputChannel(ui16 index) const { + Y_VERIFY(Initialized_); + const auto it = OutputChannels_.find(index); + return it != OutputChannels_.end() ? it->second : OtherOutputChannel_; + } + + private: + const TInterconnectProxyCommon::TPtr Common; + const bool MergePerDataCenterMetrics_; + const bool MergePerPeerMetrics_; + std::shared_ptr<NMonitoring::IMetricRegistry> Metrics_; + std::shared_ptr<NMonitoring::IMetricRegistry> PerSessionMetrics_; + std::shared_ptr<NMonitoring::IMetricRegistry> PerDataCenterMetrics_; + std::shared_ptr<NMonitoring::IMetricRegistry>& AdaptiveMetrics_; + bool Initialized_ = false; + + NMonitoring::IRate* Traffic_; + + NMonitoring::IRate* Events_; + NMonitoring::IRate* ScopeErrors_; + NMonitoring::IRate* Disconnections_; + NMonitoring::IIntGauge* Connected_; + + NMonitoring::IRate* SessionDeaths_; + NMonitoring::IRate* HandshakeFails_; + NMonitoring::IRate* InflyLimitReach_; + NMonitoring::IRate* InflightDataAmount_; + NMonitoring::IRate* OutputBuffersTotalSize_; + NMonitoring::IIntGauge* SubscribersCount_; + NMonitoring::IRate* SendSyscalls_; + NMonitoring::IRate* RecvSyscalls_; + NMonitoring::IRate* SpuriousWriteWakeups_; + NMonitoring::IRate* UsefulWriteWakeups_; + NMonitoring::IRate* SpuriousReadWakeups_; + NMonitoring::IRate* UsefulReadWakeups_; + NMonitoring::IIntGauge* ClockSkewMicrosec_; + + NMonitoring::IHistogram* PingTimeHistogram_; + + std::unordered_map<ui16, TOutputChannel> OutputChannels_; + TOutputChannel OtherOutputChannel_; + TInputChannels InputChannels_; + + THashMap<TString, NMonitoring::IRate*> DisconnectByReason_; + + NMonitoring::IRate* TotalBytesWritten_; + NMonitoring::IRate* TotalBytesRead_; + }; + +} // namespace + +std::unique_ptr<IInterconnectMetrics> CreateInterconnectCounters(const TInterconnectProxyCommon::TPtr& common) { + return std::make_unique<TInterconnectCounters>(common); +} + +std::unique_ptr<IInterconnectMetrics> CreateInterconnectMetrics(const TInterconnectProxyCommon::TPtr& common) { + return std::make_unique<TInterconnectMetrics>(common); +} + } // NActors diff --git a/library/cpp/actors/interconnect/interconnect_counters.h b/library/cpp/actors/interconnect/interconnect_counters.h index e30f03a0bc..6f19f343e3 100644 --- a/library/cpp/actors/interconnect/interconnect_counters.h +++ b/library/cpp/actors/interconnect/interconnect_counters.h @@ -1,59 +1,59 @@ -#pragma once - -#include <library/cpp/actors/helpers/mon_histogram_helper.h> - -#include <util/system/valgrind.h> - -#include "types.h" - -#include "interconnect_common.h" - -#include <memory> -#include <optional> - -namespace NActors { - -class IInterconnectMetrics { -public: - virtual ~IInterconnectMetrics() = default; - +#pragma once + +#include <library/cpp/actors/helpers/mon_histogram_helper.h> + +#include <util/system/valgrind.h> + +#include "types.h" + +#include "interconnect_common.h" + +#include <memory> +#include <optional> + +namespace NActors { + +class IInterconnectMetrics { +public: + virtual ~IInterconnectMetrics() = default; + virtual void AddInflightDataAmount(ui64 value) = 0; virtual void SubInflightDataAmount(ui64 value) = 0; virtual void AddTotalBytesWritten(ui64 value) = 0; virtual void SetClockSkewMicrosec(i64 value) = 0; - virtual void IncSessionDeaths() = 0; - virtual void IncHandshakeFails() = 0; - virtual void SetConnected(ui32 value) = 0; - virtual void IncSubscribersCount() = 0; - virtual void SubSubscribersCount(ui32 value) = 0; + virtual void IncSessionDeaths() = 0; + virtual void IncHandshakeFails() = 0; + virtual void SetConnected(ui32 value) = 0; + virtual void IncSubscribersCount() = 0; + virtual void SubSubscribersCount(ui32 value) = 0; virtual void SubOutputBuffersTotalSize(ui64 value) = 0; virtual void AddOutputBuffersTotalSize(ui64 value) = 0; virtual ui64 GetOutputBuffersTotalSize() const = 0; - virtual void IncDisconnections() = 0; - virtual void IncUsefulWriteWakeups() = 0; - virtual void IncSpuriousWriteWakeups() = 0; - virtual void IncSendSyscalls() = 0; - virtual void IncInflyLimitReach() = 0; - virtual void IncDisconnectByReason(const TString& s) = 0; - virtual void IncUsefulReadWakeups() = 0; - virtual void IncSpuriousReadWakeups() = 0; - virtual void SetPeerInfo(const TString& name, const TString& dataCenterId) = 0; + virtual void IncDisconnections() = 0; + virtual void IncUsefulWriteWakeups() = 0; + virtual void IncSpuriousWriteWakeups() = 0; + virtual void IncSendSyscalls() = 0; + virtual void IncInflyLimitReach() = 0; + virtual void IncDisconnectByReason(const TString& s) = 0; + virtual void IncUsefulReadWakeups() = 0; + virtual void IncSpuriousReadWakeups() = 0; + virtual void SetPeerInfo(const TString& name, const TString& dataCenterId) = 0; virtual void AddInputChannelsIncomingTraffic(ui16 channel, ui64 incomingTraffic) = 0; - virtual void IncInputChannelsIncomingEvents(ui16 channel) = 0; - virtual void IncRecvSyscalls() = 0; + virtual void IncInputChannelsIncomingEvents(ui16 channel) = 0; + virtual void IncRecvSyscalls() = 0; virtual void AddTotalBytesRead(ui64 value) = 0; - virtual void UpdateLegacyPingTimeHist(ui64 value) = 0; + virtual void UpdateLegacyPingTimeHist(ui64 value) = 0; virtual void UpdateOutputChannelTraffic(ui16 channel, ui64 value) = 0; - virtual void UpdateOutputChannelEvents(ui16 channel) = 0; - TString GetHumanFriendlyPeerHostName() const { - return HumanFriendlyPeerHostName.value_or(TString()); - } - -protected: - std::optional<TString> DataCenterId; - std::optional<TString> HumanFriendlyPeerHostName; -}; - -std::unique_ptr<IInterconnectMetrics> CreateInterconnectCounters(const NActors::TInterconnectProxyCommon::TPtr& common); -std::unique_ptr<IInterconnectMetrics> CreateInterconnectMetrics(const NActors::TInterconnectProxyCommon::TPtr& common); -} // NActors + virtual void UpdateOutputChannelEvents(ui16 channel) = 0; + TString GetHumanFriendlyPeerHostName() const { + return HumanFriendlyPeerHostName.value_or(TString()); + } + +protected: + std::optional<TString> DataCenterId; + std::optional<TString> HumanFriendlyPeerHostName; +}; + +std::unique_ptr<IInterconnectMetrics> CreateInterconnectCounters(const NActors::TInterconnectProxyCommon::TPtr& common); +std::unique_ptr<IInterconnectMetrics> CreateInterconnectMetrics(const NActors::TInterconnectProxyCommon::TPtr& common); +} // NActors diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 0abe9fe659..c97969815c 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -8,7 +8,7 @@ namespace NActors { TInputSessionTCP::TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket, TIntrusivePtr<TReceiveContext> context, TInterconnectProxyCommon::TPtr common, - std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId, ui64 lastConfirmed, + std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId, ui64 lastConfirmed, TDuration deadPeerTimeout, TSessionParams params) : SessionId(sessionId) , Socket(std::move(socket)) @@ -17,7 +17,7 @@ namespace NActors { , NodeId(nodeId) , Params(std::move(params)) , ConfirmedByInput(lastConfirmed) - , Metrics(std::move(metrics)) + , Metrics(std::move(metrics)) , DeadPeerTimeout(deadPeerTimeout) { Y_VERIFY(Context); @@ -26,7 +26,7 @@ namespace NActors { AtomicSet(Context->PacketsReadFromSocket, 0); - Metrics->SetClockSkewMicrosec(0); + Metrics->SetClockSkewMicrosec(0); Context->UpdateState = EUpdateState::NONE; @@ -50,9 +50,9 @@ namespace NActors { void TInputSessionTCP::Handle(TEvPollerReady::TPtr ev) { if (Context->ReadPending) { - Metrics->IncUsefulReadWakeups(); + Metrics->IncUsefulReadWakeups(); } else if (!ev->Cookie) { - Metrics->IncSpuriousReadWakeups(); + Metrics->IncSpuriousReadWakeups(); } Context->ReadPending = false; ReceiveData(); @@ -197,7 +197,7 @@ namespace NActors { ui64 sendTime = AtomicGet(Context->ControlPacketSendTimer); TDuration duration = CyclesToDuration(GetCycleCountFast() - sendTime); const auto durationUs = duration.MicroSeconds(); - Metrics->UpdateLegacyPingTimeHist(durationUs); + Metrics->UpdateLegacyPingTimeHist(durationUs); PingQ.push_back(duration); if (PingQ.size() > 16) { PingQ.pop_front(); @@ -268,7 +268,7 @@ namespace NActors { ? &Context->ChannelArray[channel] : &Context->ChannelMap[channel]; - Metrics->AddInputChannelsIncomingTraffic(channel, sizeof(part) + part.Size); + Metrics->AddInputChannelsIncomingTraffic(channel, sizeof(part) + part.Size); TEventDescr descr; if (~part.Channel & TChannelPart::LastPartFlag) { @@ -277,7 +277,7 @@ namespace NActors { LOG_CRIT_IC_SESSION("ICIS11", "incorrect last part of an event"); return DestroySession(TDisconnectReason::FormatError()); } else if (Payload.ExtractFrontPlain(&descr, sizeof(descr))) { - Metrics->IncInputChannelsIncomingEvents(channel); + Metrics->IncInputChannelsIncomingEvents(channel); ProcessEvent(*eventData, descr); *eventData = TRope(); } else { @@ -359,7 +359,7 @@ namespace NActors { #else recvres = Socket->Recv(iovec[0].iov_base, iovec[0].iov_len, &err); #endif - Metrics->IncRecvSyscalls(); + Metrics->IncRecvSyscalls(); } while (recvres == -EINTR); } @@ -388,7 +388,7 @@ namespace NActors { } Y_VERIFY(recvres > 0); - Metrics->AddTotalBytesRead(recvres); + Metrics->AddTotalBytesRead(recvres); TDeque<TIntrusivePtr<TRopeAlignedBuffer>>::iterator it; for (it = Buffers.begin(); recvres; ++it) { Y_VERIFY(it != Buffers.end()); @@ -451,7 +451,7 @@ namespace NActors { const auto pingUs = ping.MicroSeconds(); Context->PingRTT_us = pingUs; NewPingProtocol = true; - Metrics->UpdateLegacyPingTimeHist(pingUs); + Metrics->UpdateLegacyPingTimeHist(pingUs); } void TInputSessionTCP::HandleClock(TInstant clock) { @@ -469,7 +469,7 @@ namespace NActors { } } Context->ClockSkew_us = clockSkew; - Metrics->SetClockSkewMicrosec(clockSkew); + Metrics->SetClockSkewMicrosec(clockSkew); } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp index 7e2d8ccb94..ea3777b1a1 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp @@ -108,10 +108,10 @@ namespace NActors { auto& info = *ev->Get()->Node; TString name = PeerNameForHuman(PeerNodeId, info.Host, info.Port); TechnicalPeerHostName = info.Host; - if (!Metrics) { - Metrics = Common->Metrics ? CreateInterconnectMetrics(Common) : CreateInterconnectCounters(Common); + if (!Metrics) { + Metrics = Common->Metrics ? CreateInterconnectMetrics(Common) : CreateInterconnectCounters(Common); } - Metrics->SetPeerInfo(name, info.Location.GetDataCenterId()); + Metrics->SetPeerInfo(name, info.Location.GetDataCenterId()); LOG_DEBUG_IC("ICP02", "configured for host %s", name.data()); @@ -416,8 +416,8 @@ namespace NActors { return; } - if (Metrics) { - Metrics->IncHandshakeFails(); + if (Metrics) { + Metrics->IncHandshakeFails(); } if (IncomingHandshakeActor || OutgoingHandshakeActor) { @@ -545,8 +545,8 @@ namespace NActors { SessionVirtualId = TActorId(); RemoteSessionVirtualId = TActorId(); - if (Metrics) { - Metrics->IncSessionDeaths(); + if (Metrics) { + Metrics->IncSessionDeaths(); } LastSessionDieTime = TActivationContext::Now(); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h index 023e5bd1ee..2ef30a20e8 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h @@ -372,7 +372,7 @@ namespace NActors { TString TechnicalPeerHostName; - std::shared_ptr<IInterconnectMetrics> Metrics; + std::shared_ptr<IInterconnectMetrics> Metrics; void HandleClosePeerSocket(); void HandleCloseInputSession(); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 2ded7f9f53..74e0be4b3e 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -40,7 +40,7 @@ namespace NActors { , OutputQueueUtilization(16) , OutputCounter(0ULL) { - Proxy->Metrics->SetConnected(0); + Proxy->Metrics->SetConnected(0); ReceiveContext.Reset(new TReceiveContext); } @@ -56,7 +56,7 @@ namespace NActors { as->Send(id, event.Release()); }; Pool.ConstructInPlace(Proxy->Common, std::move(destroyCallback)); - ChannelScheduler.ConstructInPlace(Proxy->PeerNodeId, Proxy->Common->ChannelsConfig, Proxy->Metrics, *Pool, + ChannelScheduler.ConstructInPlace(Proxy->PeerNodeId, Proxy->Common->ChannelsConfig, Proxy->Metrics, *Pool, Proxy->Common->Settings.MaxSerializedEventSize, Params); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session created", Proxy->PeerNodeId); @@ -85,7 +85,7 @@ namespace NActors { for (const auto& kv : Subscribers) { Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second); } - Proxy->Metrics->SubSubscribersCount(Subscribers.size()); + Proxy->Metrics->SubSubscribersCount(Subscribers.size()); Subscribers.clear(); ChannelScheduler->ForEach([&](TEventOutputChannel& channel) { @@ -98,13 +98,13 @@ namespace NActors { SendUpdateToWhiteboard(false); - Proxy->Metrics->SubOutputBuffersTotalSize(TotalOutputQueueSize); - Proxy->Metrics->SubInflightDataAmount(InflightDataAmount); + Proxy->Metrics->SubOutputBuffersTotalSize(TotalOutputQueueSize); + Proxy->Metrics->SubInflightDataAmount(InflightDataAmount); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session destroyed", Proxy->PeerNodeId); if (!Subscribers.empty()) { - Proxy->Metrics->SubSubscribersCount(Subscribers.size()); + Proxy->Metrics->SubSubscribersCount(Subscribers.size()); } TActor::PassAway(); @@ -132,7 +132,7 @@ namespace NActors { LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize); TotalOutputQueueSize += dataSize; - Proxy->Metrics->AddOutputBuffersTotalSize(dataSize); + Proxy->Metrics->AddOutputBuffersTotalSize(dataSize); if (!wasWorking) { // this channel has returned to work -- it was empty and this we have just put first event in the queue ChannelScheduler->AddToHeap(oChannel, EqualizeCounter); @@ -155,7 +155,7 @@ namespace NActors { } ui64 outputBuffersTotalSizeLimit = Proxy->Common->Settings.OutputBuffersTotalSizeLimitInMB * ui64(1 << 20); - if (outputBuffersTotalSizeLimit != 0 && static_cast<ui64>(Proxy->Metrics->GetOutputBuffersTotalSize()) > outputBuffersTotalSizeLimit) { + if (outputBuffersTotalSizeLimit != 0 && static_cast<ui64>(Proxy->Metrics->GetOutputBuffersTotalSize()) > outputBuffersTotalSizeLimit) { LOG_ERROR_IC_SESSION("ICS77", "Exceeded total limit on output buffers size"); if (AtomicTryLock(&Proxy->Common->StartedSessionKiller)) { CreateSessionKillingActor(Proxy->Common); @@ -188,7 +188,7 @@ namespace NActors { LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data()); const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie); if (inserted) { - Proxy->Metrics->IncSubscribersCount(); + Proxy->Metrics->IncSubscribersCount(); } else { it->second = ev->Cookie; } @@ -197,7 +197,7 @@ namespace NActors { void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) { LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data()); - Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender)); + Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender)); } THolder<TEvHandshakeAck> TInterconnectSessionTCP::ProcessHandshakeRequest(TEvHandshakeAsk::TPtr& ev) { @@ -243,7 +243,7 @@ namespace NActors { // create input session actor auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common, - Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params); + Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params); ReceiveContext->UnlockLastProcessedPacketSerial(); ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled); @@ -254,7 +254,7 @@ namespace NActors { ReceiveContext->WriteBlockedByFullSendBuffer = false; LostConnectionWatchdog.Disarm(); - Proxy->Metrics->SetConnected(1); + Proxy->Metrics->SetConnected(1); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] connected", Proxy->PeerNodeId); // arm pinger timer @@ -264,7 +264,7 @@ namespace NActors { // REINITIALIZE SEND QUEUE // // scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other - // auxiliary packets; also reset packet metrics to zero to start sending from the beginning + // auxiliary packets; also reset packet metrics to zero to start sending from the beginning // also reset SendQueuePos // drop confirmed packets first as we do not need unwanted retransmissions @@ -489,17 +489,17 @@ namespace NActors { void TInterconnectSessionTCP::ShutdownSocket(TDisconnectReason reason) { if (Socket) { if (const TString& s = reason.ToString()) { - Proxy->Metrics->IncDisconnectByReason(s); + Proxy->Metrics->IncDisconnectByReason(s); } LOG_INFO_IC_SESSION("ICS25", "shutdown socket, reason# %s", reason.ToString().data()); Proxy->UpdateErrorStateLog(TActivationContext::Now(), "close_socket", reason.ToString().data()); Socket->Shutdown(SHUT_RDWR); Socket.Reset(); - Proxy->Metrics->IncDisconnections(); + Proxy->Metrics->IncDisconnections(); CloseOnIdleWatchdog.Disarm(); LostConnectionWatchdog.Arm(SelfId()); - Proxy->Metrics->SetConnected(0); + Proxy->Metrics->SetConnected(0); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId); } } @@ -519,14 +519,14 @@ namespace NActors { LOG_DEBUG_IC_SESSION("ICS29", "HandleReadyWrite WriteBlockedByFullSendBuffer# %s", ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false"); if (std::exchange(ReceiveContext->WriteBlockedByFullSendBuffer, false)) { - Proxy->Metrics->IncUsefulWriteWakeups(); + Proxy->Metrics->IncUsefulWriteWakeups(); ui64 nowCycles = GetCycleCountFast(); double blockedUs = NHPTimer::GetSeconds(nowCycles - WriteBlockedCycles) * 1000000.0; LWPROBE(ReadyWrite, Proxy->PeerNodeId, NHPTimer::GetSeconds(nowCycles - ev->SendTime) * 1000.0, blockedUs / 1000.0); WriteBlockedTotal += TDuration::MicroSeconds(blockedUs); GenerateTraffic(); } else if (!ev->Cookie) { - Proxy->Metrics->IncSpuriousWriteWakeups(); + Proxy->Metrics->IncSpuriousWriteWakeups(); } if (Params.Encryption && ReceiveContext->ReadPending && !ev->Cookie) { Send(ReceiverId, ev->Release().Release(), 0, 1); @@ -591,7 +591,7 @@ namespace NActors { #else r = Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err); #endif - Proxy->Metrics->IncSendSyscalls(); + Proxy->Metrics->IncSendSyscalls(); } while (r == -EINTR); LOG_DEBUG_IC_SESSION("ICS16", "written# %zd iovcnt# %d err# %s", r, iovcnt, err.data()); @@ -621,7 +621,7 @@ namespace NActors { : Sprintf("socket: %s", strerror(-r)); LOG_NOTICE_NET(Proxy->PeerNodeId, "%s", message.data()); if (written) { - Proxy->Metrics->AddTotalBytesWritten(written); + Proxy->Metrics->AddTotalBytesWritten(written); } return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r)); } else { @@ -651,7 +651,7 @@ namespace NActors { } } if (written) { - Proxy->Metrics->AddTotalBytesWritten(written); + Proxy->Metrics->AddTotalBytesWritten(written); } } @@ -753,9 +753,9 @@ namespace NActors { Y_VERIFY(!packet->IsEmpty()); InflightDataAmount += packet->GetDataSize(); - Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize()); + Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize()); if (InflightDataAmount > GetTotalInflightAmountOfData()) { - Proxy->Metrics->IncInflyLimitReach(); + Proxy->Metrics->IncInflyLimitReach(); } if (AtomicGet(ReceiveContext->ControlPacketId) == 0) { @@ -842,7 +842,7 @@ namespace NActors { PacketsConfirmed += numDropped; InflightDataAmount -= droppedDataAmount; - Proxy->Metrics->SubInflightDataAmount(droppedDataAmount); + Proxy->Metrics->SubInflightDataAmount(droppedDataAmount); LWPROBE(DropConfirmed, Proxy->PeerNodeId, droppedDataAmount, InflightDataAmount); LOG_DEBUG_IC_SESSION("ICS24", "exit InflightDataAmount: %" PRIu64 " bytes droppedDataAmount: %" PRIu64 " bytes" @@ -870,9 +870,9 @@ namespace NActors { Y_VERIFY_DEBUG(netAfter <= netBefore); // net amount should shrink const ui64 net = netBefore - netAfter; // number of net bytes serialized - // adjust metrics for local and global queue size + // adjust metrics for local and global queue size TotalOutputQueueSize -= net; - Proxy->Metrics->SubOutputBuffersTotalSize(net); + Proxy->Metrics->SubOutputBuffersTotalSize(net); bytesGenerated += gross; Y_VERIFY_DEBUG(!!net == !!gross && gross >= net, "net# %" PRIu64 " gross# %" PRIu64, net, gross); @@ -944,7 +944,7 @@ namespace NActors { } while (false); } - callback(Proxy->Metrics->GetHumanFriendlyPeerHostName(), + callback(Proxy->Metrics->GetHumanFriendlyPeerHostName(), connected, flagState == EFlag::GREEN, flagState == EFlag::YELLOW, diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 7fc00dbcc5..7ad7fb84d2 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -183,7 +183,7 @@ namespace NActors { TIntrusivePtr<NInterconnect::TStreamSocket> socket, TIntrusivePtr<TReceiveContext> context, TInterconnectProxyCommon::TPtr common, - std::shared_ptr<IInterconnectMetrics> metrics, + std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId, ui64 lastConfirmed, TDuration deadPeerTimeout, @@ -237,7 +237,7 @@ namespace NActors { ui64 ConfirmedByInput; - std::shared_ptr<IInterconnectMetrics> Metrics; + std::shared_ptr<IInterconnectMetrics> Metrics; bool CloseInputSessionRequested = false; diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index 565a511859..5e9a12a56a 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -9,12 +9,12 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { Y_UNIT_TEST(PriorityTraffic) { auto common = MakeIntrusive<TInterconnectProxyCommon>(); common->MonCounters = MakeIntrusive<NMonitoring::TDynamicCounters>(); - std::shared_ptr<IInterconnectMetrics> ctr = CreateInterconnectCounters(common); - ctr->SetPeerInfo("peer", "1"); + std::shared_ptr<IInterconnectMetrics> ctr = CreateInterconnectCounters(common); + ctr->SetPeerInfo("peer", "1"); auto callback = [](THolder<IEventBase>) {}; TEventHolderPool pool(common, callback); TSessionParams p; - TChannelScheduler scheduler(1, {}, ctr, pool, 64 << 20, p); + TChannelScheduler scheduler(1, {}, ctr, pool, 64 << 20, p); ui32 numEvents = 0; diff --git a/library/cpp/actors/interconnect/ya.make b/library/cpp/actors/interconnect/ya.make index 60d29b0fc0..47fd731e01 100644 --- a/library/cpp/actors/interconnect/ya.make +++ b/library/cpp/actors/interconnect/ya.make @@ -22,7 +22,7 @@ SRCS( interconnect_channel.cpp interconnect_channel.h interconnect_common.h - interconnect_counters.cpp + interconnect_counters.cpp interconnect.h interconnect_handshake.cpp interconnect_handshake.h @@ -85,7 +85,7 @@ PEERDIR( library/cpp/json library/cpp/lwtrace library/cpp/monlib/dynamic_counters - library/cpp/monlib/metrics + library/cpp/monlib/metrics library/cpp/monlib/service/pages/tablesorter library/cpp/openssl/init library/cpp/packedtypes |