diff options
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_counters.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_counters.cpp | 692 |
1 files changed, 692 insertions, 0 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_counters.cpp b/library/cpp/actors/interconnect/interconnect_counters.cpp new file mode 100644 index 0000000000..ba674f664b --- /dev/null +++ b/library/cpp/actors/interconnect/interconnect_counters.cpp @@ -0,0 +1,692 @@ +#include "interconnect_counters.h" + +#include <library/cpp/monlib/metrics/metric_registry.h> +#include <library/cpp/monlib/metrics/metric_sub_registry.h> + +#include <unordered_map> + +namespace NActors { + +namespace { + + class TInterconnectCounters: public IInterconnectMetrics { + public: + struct TOutputChannel { + NMonitoring::TDynamicCounters::TCounterPtr Traffic; + NMonitoring::TDynamicCounters::TCounterPtr Events; + NMonitoring::TDynamicCounters::TCounterPtr OutgoingTraffic; + NMonitoring::TDynamicCounters::TCounterPtr OutgoingEvents; + + TOutputChannel() = default; + + TOutputChannel(const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, + NMonitoring::TDynamicCounters::TCounterPtr traffic, + NMonitoring::TDynamicCounters::TCounterPtr events) + : Traffic(std::move(traffic)) + , Events(std::move(events)) + , OutgoingTraffic(counters->GetCounter("OutgoingTraffic", true)) + , OutgoingEvents(counters->GetCounter("OutgoingEvents", true)) + {} + + TOutputChannel(const TOutputChannel&) = default; + }; + + struct TInputChannel { + NMonitoring::TDynamicCounters::TCounterPtr Traffic; + NMonitoring::TDynamicCounters::TCounterPtr Events; + NMonitoring::TDynamicCounters::TCounterPtr ScopeErrors; + NMonitoring::TDynamicCounters::TCounterPtr IncomingTraffic; + NMonitoring::TDynamicCounters::TCounterPtr IncomingEvents; + + TInputChannel() = default; + + TInputChannel(const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, + NMonitoring::TDynamicCounters::TCounterPtr traffic, + NMonitoring::TDynamicCounters::TCounterPtr events, + NMonitoring::TDynamicCounters::TCounterPtr scopeErrors) + : Traffic(std::move(traffic)) + , Events(std::move(events)) + , ScopeErrors(std::move(scopeErrors)) + , IncomingTraffic(counters->GetCounter("IncomingTraffic", true)) + , IncomingEvents(counters->GetCounter("IncomingEvents", true)) + {} + + TInputChannel(const TInputChannel&) = default; + }; + + struct TInputChannels : std::unordered_map<ui16, TInputChannel> { + TInputChannel OtherInputChannel; + + TInputChannels() = default; + + TInputChannels(const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, + const std::unordered_map<ui16, TString>& names, + NMonitoring::TDynamicCounters::TCounterPtr traffic, + NMonitoring::TDynamicCounters::TCounterPtr events, + NMonitoring::TDynamicCounters::TCounterPtr scopeErrors) + : OtherInputChannel(counters->GetSubgroup("channel", "other"), traffic, events, scopeErrors) + { + for (const auto& [id, name] : names) { + try_emplace(id, counters->GetSubgroup("channel", name), traffic, events, scopeErrors); + } + } + + TInputChannels(const TInputChannels&) = default; + + const TInputChannel& Get(ui16 id) const { + const auto it = find(id); + return it != end() ? it->second : OtherInputChannel; + } + }; + + private: + const TInterconnectProxyCommon::TPtr Common; + const bool MergePerDataCenterCounters; + const bool MergePerPeerCounters; + NMonitoring::TDynamicCounterPtr Counters; + NMonitoring::TDynamicCounterPtr PerSessionCounters; + NMonitoring::TDynamicCounterPtr PerDataCenterCounters; + NMonitoring::TDynamicCounterPtr& AdaptiveCounters; + + bool Initialized = false; + + NMonitoring::TDynamicCounters::TCounterPtr Traffic; + NMonitoring::TDynamicCounters::TCounterPtr Events; + NMonitoring::TDynamicCounters::TCounterPtr ScopeErrors; + + public: + TInterconnectCounters(const TInterconnectProxyCommon::TPtr& common) + : Common(common) + , MergePerDataCenterCounters(common->Settings.MergePerDataCenterCounters) + , MergePerPeerCounters(common->Settings.MergePerPeerCounters) + , Counters(common->MonCounters) + , AdaptiveCounters(MergePerDataCenterCounters + ? PerDataCenterCounters : + MergePerPeerCounters ? Counters : PerSessionCounters) + {} + + void AddInflightDataAmount(ui64 value) override { + *InflightDataAmount += value; + } + + void SubInflightDataAmount(ui64 value) override { + *InflightDataAmount -= value; + } + + void AddTotalBytesWritten(ui64 value) override { + *TotalBytesWritten += value; + } + + void SetClockSkewMicrosec(i64 value) override { + *ClockSkewMicrosec = value; + } + + void IncSessionDeaths() override { + ++*SessionDeaths; + } + + void IncHandshakeFails() override { + ++*HandshakeFails; + } + + void SetConnected(ui32 value) override { + *Connected = value; + } + + void IncSubscribersCount() override { + ++*SubscribersCount; + } + + void SubSubscribersCount(ui32 value) override { + *SubscribersCount -= value; + } + + void SubOutputBuffersTotalSize(ui64 value) override { + *OutputBuffersTotalSize -= value; + } + + void AddOutputBuffersTotalSize(ui64 value) override { + *OutputBuffersTotalSize += value; + } + + ui64 GetOutputBuffersTotalSize() const override { + return *OutputBuffersTotalSize; + } + + void IncDisconnections() override { + ++*Disconnections; + } + + void IncUsefulWriteWakeups() override { + ++*UsefulWriteWakeups; + } + + void IncSpuriousWriteWakeups() override { + ++*SpuriousWriteWakeups; + } + + void IncSendSyscalls() override { + ++*SendSyscalls; + } + + void IncInflyLimitReach() override { + ++*InflyLimitReach; + } + + void IncUsefulReadWakeups() override { + ++*UsefulReadWakeups; + } + + void IncSpuriousReadWakeups() override { + ++*SpuriousReadWakeups; + } + + void IncDisconnectByReason(const TString& s) override { + if (auto it = DisconnectByReason.find(s); it != DisconnectByReason.end()) { + it->second->Inc(); + } + } + + void AddInputChannelsIncomingTraffic(ui16 channel, ui64 incomingTraffic) override { + auto& ch = InputChannels.Get(channel); + *ch.IncomingTraffic += incomingTraffic; + } + + void IncInputChannelsIncomingEvents(ui16 channel) override { + auto& ch = InputChannels.Get(channel); + ++*ch.IncomingEvents; + } + + void IncRecvSyscalls() override { + ++*RecvSyscalls; + } + + void AddTotalBytesRead(ui64 value) override { + *TotalBytesRead += value; + } + + void UpdateLegacyPingTimeHist(ui64 value) override { + LegacyPingTimeHist.Add(value); + PingTimeHistogram->Collect(value); + } + + void UpdateOutputChannelTraffic(ui16 channel, ui64 value) override { + if (GetOutputChannel(channel).OutgoingTraffic) { + *(GetOutputChannel(channel).OutgoingTraffic) += value; + } + if (GetOutputChannel(channel).Traffic) { + *(GetOutputChannel(channel).Traffic) += value; + } + } + + void UpdateOutputChannelEvents(ui16 channel) override { + if (GetOutputChannel(channel).OutgoingEvents) { + ++*(GetOutputChannel(channel).OutgoingEvents); + } + if (GetOutputChannel(channel).Events) { + ++*(GetOutputChannel(channel).Events); + } + } + + void SetPeerInfo(const TString& name, const TString& dataCenterId) override { + if (name != std::exchange(HumanFriendlyPeerHostName, name)) { + PerSessionCounters.Reset(); + } + VALGRIND_MAKE_READABLE(&DataCenterId, sizeof(DataCenterId)); + if (dataCenterId != std::exchange(DataCenterId, dataCenterId)) { + PerDataCenterCounters.Reset(); + } + + const bool updatePerDataCenter = !PerDataCenterCounters && MergePerDataCenterCounters; + if (updatePerDataCenter) { + PerDataCenterCounters = Counters->GetSubgroup("dataCenterId", *DataCenterId); + } + + const bool updatePerSession = !PerSessionCounters || updatePerDataCenter; + if (updatePerSession) { + auto base = MergePerDataCenterCounters ? PerDataCenterCounters : Counters; + PerSessionCounters = base->GetSubgroup("peer", *HumanFriendlyPeerHostName); + } + + const bool updateGlobal = !Initialized; + + const bool updateAdaptive = + &AdaptiveCounters == &Counters ? updateGlobal : + &AdaptiveCounters == &PerSessionCounters ? updatePerSession : + &AdaptiveCounters == &PerDataCenterCounters ? updatePerDataCenter : + false; + + if (updatePerSession) { + Connected = PerSessionCounters->GetCounter("Connected"); + Disconnections = PerSessionCounters->GetCounter("Disconnections", true); + ClockSkewMicrosec = PerSessionCounters->GetCounter("ClockSkewMicrosec"); + Traffic = PerSessionCounters->GetCounter("Traffic", true); + Events = PerSessionCounters->GetCounter("Events", true); + ScopeErrors = PerSessionCounters->GetCounter("ScopeErrors", true); + + for (const auto& [id, name] : Common->ChannelName) { + OutputChannels.try_emplace(id, Counters->GetSubgroup("channel", name), Traffic, Events); + } + OtherOutputChannel = TOutputChannel(Counters->GetSubgroup("channel", "other"), Traffic, Events); + + InputChannels = TInputChannels(Counters, Common->ChannelName, Traffic, Events, ScopeErrors); + } + + if (updateAdaptive) { + SessionDeaths = AdaptiveCounters->GetCounter("Session_Deaths", true); + HandshakeFails = AdaptiveCounters->GetCounter("Handshake_Fails", true); + InflyLimitReach = AdaptiveCounters->GetCounter("InflyLimitReach", true); + InflightDataAmount = AdaptiveCounters->GetCounter("Inflight_Data"); + + LegacyPingTimeHist = {}; + LegacyPingTimeHist.Init(AdaptiveCounters.Get(), "PingTimeHist", "mks", 125, 18); + + PingTimeHistogram = AdaptiveCounters->GetHistogram( + "PingTimeUs", NMonitoring::ExponentialHistogram(18, 2, 125)); + } + + if (updateGlobal) { + OutputBuffersTotalSize = Counters->GetCounter("OutputBuffersTotalSize"); + SendSyscalls = Counters->GetCounter("SendSyscalls", true); + RecvSyscalls = Counters->GetCounter("RecvSyscalls", true); + SpuriousReadWakeups = Counters->GetCounter("SpuriousReadWakeups", true); + UsefulReadWakeups = Counters->GetCounter("UsefulReadWakeups", true); + SpuriousWriteWakeups = Counters->GetCounter("SpuriousWriteWakeups", true); + UsefulWriteWakeups = Counters->GetCounter("UsefulWriteWakeups", true); + SubscribersCount = AdaptiveCounters->GetCounter("SubscribersCount"); + TotalBytesWritten = Counters->GetCounter("TotalBytesWritten", true); + TotalBytesRead = Counters->GetCounter("TotalBytesRead", true); + + auto disconnectReasonGroup = Counters->GetSubgroup("subsystem", "disconnectReason"); + for (const char *reason : TDisconnectReason::Reasons) { + DisconnectByReason[reason] = disconnectReasonGroup->GetNamedCounter("reason", reason, true); + } + } + + Initialized = true; + } + + TOutputChannel GetOutputChannel(ui16 index) const { + Y_VERIFY(Initialized); + const auto it = OutputChannels.find(index); + return it != OutputChannels.end() ? it->second : OtherOutputChannel; + } + + private: + NMonitoring::TDynamicCounters::TCounterPtr SessionDeaths; + NMonitoring::TDynamicCounters::TCounterPtr HandshakeFails; + NMonitoring::TDynamicCounters::TCounterPtr Connected; + NMonitoring::TDynamicCounters::TCounterPtr Disconnections; + NMonitoring::TDynamicCounters::TCounterPtr InflightDataAmount; + NMonitoring::TDynamicCounters::TCounterPtr InflyLimitReach; + NMonitoring::TDynamicCounters::TCounterPtr OutputBuffersTotalSize; + NMonitoring::TDynamicCounters::TCounterPtr QueueUtilization; + NMonitoring::TDynamicCounters::TCounterPtr SubscribersCount; + NMonitoring::TDynamicCounters::TCounterPtr SendSyscalls; + NMonitoring::TDynamicCounters::TCounterPtr ClockSkewMicrosec; + NMonitoring::TDynamicCounters::TCounterPtr RecvSyscalls; + NMonitoring::TDynamicCounters::TCounterPtr UsefulReadWakeups; + NMonitoring::TDynamicCounters::TCounterPtr SpuriousReadWakeups; + NMonitoring::TDynamicCounters::TCounterPtr UsefulWriteWakeups; + NMonitoring::TDynamicCounters::TCounterPtr SpuriousWriteWakeups; + NMon::THistogramCounterHelper LegacyPingTimeHist; + NMonitoring::THistogramPtr PingTimeHistogram; + + std::unordered_map<ui16, TOutputChannel> OutputChannels; + TOutputChannel OtherOutputChannel; + TInputChannels InputChannels; + THashMap<TString, NMonitoring::TDynamicCounters::TCounterPtr> DisconnectByReason; + + NMonitoring::TDynamicCounters::TCounterPtr TotalBytesWritten, TotalBytesRead; + }; + + class TInterconnectMetrics: public IInterconnectMetrics { + public: + struct TOutputChannel { + NMonitoring::IRate* Traffic; + NMonitoring::IRate* Events; + NMonitoring::IRate* OutgoingTraffic; + NMonitoring::IRate* OutgoingEvents; + + TOutputChannel() = default; + + TOutputChannel(const std::shared_ptr<NMonitoring::IMetricRegistry>& metrics, + NMonitoring::IRate* traffic, + NMonitoring::IRate* events) + : Traffic(traffic) + , Events(events) + , OutgoingTraffic(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.outgoing_traffic"}}))) + , OutgoingEvents(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.outgoing_events"}}))) + {} + + TOutputChannel(const TOutputChannel&) = default; + }; + + struct TInputChannel { + NMonitoring::IRate* Traffic; + NMonitoring::IRate* Events; + NMonitoring::IRate* ScopeErrors; + NMonitoring::IRate* IncomingTraffic; + NMonitoring::IRate* IncomingEvents; + + TInputChannel() = default; + + TInputChannel(const std::shared_ptr<NMonitoring::IMetricRegistry>& metrics, + NMonitoring::IRate* traffic, NMonitoring::IRate* events, + NMonitoring::IRate* scopeErrors) + : Traffic(traffic) + , Events(events) + , ScopeErrors(scopeErrors) + , IncomingTraffic(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.incoming_traffic"}}))) + , IncomingEvents(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.incoming_events"}}))) + {} + + TInputChannel(const TInputChannel&) = default; + }; + + struct TInputChannels : std::unordered_map<ui16, TInputChannel> { + TInputChannel OtherInputChannel; + + TInputChannels() = default; + + TInputChannels(const std::shared_ptr<NMonitoring::IMetricRegistry>& metrics, + const std::unordered_map<ui16, TString>& names, + NMonitoring::IRate* traffic, NMonitoring::IRate* events, + NMonitoring::IRate* scopeErrors) + : OtherInputChannel(std::make_shared<NMonitoring::TMetricSubRegistry>( + NMonitoring::TLabels{{"channel", "other"}}, metrics), traffic, events, scopeErrors) + { + for (const auto& [id, name] : names) { + try_emplace(id, std::make_shared<NMonitoring::TMetricSubRegistry>(NMonitoring::TLabels{{"channel", name}}, metrics), + traffic, events, scopeErrors); + } + } + + TInputChannels(const TInputChannels&) = default; + + const TInputChannel& Get(ui16 id) const { + const auto it = find(id); + return it != end() ? it->second : OtherInputChannel; + } + }; + + TInterconnectMetrics(const TInterconnectProxyCommon::TPtr& common) + : Common(common) + , MergePerDataCenterMetrics_(common->Settings.MergePerDataCenterCounters) + , MergePerPeerMetrics_(common->Settings.MergePerPeerCounters) + , Metrics_(common->Metrics) + , AdaptiveMetrics_(MergePerDataCenterMetrics_ + ? PerDataCenterMetrics_ : + MergePerPeerMetrics_ ? Metrics_ : PerSessionMetrics_) + {} + + void AddInflightDataAmount(ui64 value) override { + InflightDataAmount_->Add(value); + } + + void SubInflightDataAmount(ui64 value) override { + InflightDataAmount_->Add(-value); + } + + void AddTotalBytesWritten(ui64 value) override { + TotalBytesWritten_->Add(value); + } + + void SetClockSkewMicrosec(i64 value) override { + ClockSkewMicrosec_->Set(value); + } + + void IncSessionDeaths() override { + SessionDeaths_->Inc(); + } + + void IncHandshakeFails() override { + HandshakeFails_->Inc(); + } + + void SetConnected(ui32 value) override { + Connected_->Set(value); + } + + void IncSubscribersCount() override { + SubscribersCount_->Inc(); + } + + void SubSubscribersCount(ui32 value) override { + SubscribersCount_->Add(-value); + } + + void SubOutputBuffersTotalSize(ui64 value) override { + OutputBuffersTotalSize_->Add(-value); + } + + void AddOutputBuffersTotalSize(ui64 value) override { + OutputBuffersTotalSize_->Add(value); + } + + ui64 GetOutputBuffersTotalSize() const override { + return OutputBuffersTotalSize_->Get(); + } + + void IncDisconnections() override { + Disconnections_->Inc(); + } + + void IncUsefulWriteWakeups() override { + UsefulWriteWakeups_->Inc(); + } + + void IncSpuriousWriteWakeups() override { + SpuriousWriteWakeups_->Inc(); + } + + void IncSendSyscalls() override { + SendSyscalls_->Inc(); + } + + void IncInflyLimitReach() override { + InflyLimitReach_->Inc(); + } + + void IncUsefulReadWakeups() override { + UsefulReadWakeups_->Inc(); + } + + void IncSpuriousReadWakeups() override { + SpuriousReadWakeups_->Inc(); + } + + void IncDisconnectByReason(const TString& s) override { + if (auto it = DisconnectByReason_.find(s); it != DisconnectByReason_.end()) { + it->second->Inc(); + } + } + + void AddInputChannelsIncomingTraffic(ui16 channel, ui64 incomingTraffic) override { + auto& ch = InputChannels_.Get(channel); + ch.IncomingTraffic->Add(incomingTraffic); + } + + void IncInputChannelsIncomingEvents(ui16 channel) override { + auto& ch = InputChannels_.Get(channel); + ch.IncomingEvents->Inc(); + } + + void IncRecvSyscalls() override { + RecvSyscalls_->Inc(); + } + + void AddTotalBytesRead(ui64 value) override { + TotalBytesRead_->Add(value); + } + + void UpdateLegacyPingTimeHist(ui64 value) override { + PingTimeHistogram_->Record(value); + } + + void UpdateOutputChannelTraffic(ui16 channel, ui64 value) override { + if (GetOutputChannel(channel).OutgoingTraffic) { + GetOutputChannel(channel).OutgoingTraffic->Add(value); + } + if (GetOutputChannel(channel).Traffic) { + GetOutputChannel(channel).Traffic->Add(value); + } + } + + void UpdateOutputChannelEvents(ui16 channel) override { + if (GetOutputChannel(channel).OutgoingEvents) { + GetOutputChannel(channel).OutgoingEvents->Inc(); + } + if (GetOutputChannel(channel).Events) { + GetOutputChannel(channel).Events->Inc(); + } + } + + void SetPeerInfo(const TString& name, const TString& dataCenterId) override { + if (name != std::exchange(HumanFriendlyPeerHostName, name)) { + PerSessionMetrics_.reset(); + } + VALGRIND_MAKE_READABLE(&DataCenterId, sizeof(DataCenterId)); + if (dataCenterId != std::exchange(DataCenterId, dataCenterId)) { + PerDataCenterMetrics_.reset(); + } + + const bool updatePerDataCenter = !PerDataCenterMetrics_ && MergePerDataCenterMetrics_; + if (updatePerDataCenter) { + PerDataCenterMetrics_ = std::make_shared<NMonitoring::TMetricSubRegistry>( + NMonitoring::TLabels{{"datacenter_id", *DataCenterId}}, Metrics_); + } + + const bool updatePerSession = !PerSessionMetrics_ || updatePerDataCenter; + if (updatePerSession) { + auto base = MergePerDataCenterMetrics_ ? PerDataCenterMetrics_ : Metrics_; + PerSessionMetrics_ = std::make_shared<NMonitoring::TMetricSubRegistry>( + NMonitoring::TLabels{{"peer", *HumanFriendlyPeerHostName}}, base); + } + + const bool updateGlobal = !Initialized_; + + const bool updateAdaptive = + &AdaptiveMetrics_ == &Metrics_ ? updateGlobal : + &AdaptiveMetrics_ == &PerSessionMetrics_ ? updatePerSession : + &AdaptiveMetrics_ == &PerDataCenterMetrics_ ? updatePerDataCenter : + false; + + auto createRate = [](std::shared_ptr<NMonitoring::IMetricRegistry> metrics, TStringBuf name) mutable { + return metrics->Rate(NMonitoring::MakeLabels(NMonitoring::TLabels{{"sensor", name}})); + }; + auto createIntGauge = [](std::shared_ptr<NMonitoring::IMetricRegistry> metrics, TStringBuf name) mutable { + return metrics->IntGauge(NMonitoring::MakeLabels(NMonitoring::TLabels{{"sensor", name}})); + }; + + if (updatePerSession) { + Connected_ = createIntGauge(PerSessionMetrics_, "interconnect.connected"); + Disconnections_ = createRate(PerSessionMetrics_, "interconnect.disconnections"); + ClockSkewMicrosec_ = createIntGauge(PerSessionMetrics_, "interconnect.clock_skew_microsec"); + Traffic_ = createRate(PerSessionMetrics_, "interconnect.traffic"); + Events_ = createRate(PerSessionMetrics_, "interconnect.events"); + ScopeErrors_ = createRate(PerSessionMetrics_, "interconnect.scope_errors"); + + for (const auto& [id, name] : Common->ChannelName) { + OutputChannels_.try_emplace(id, std::make_shared<NMonitoring::TMetricSubRegistry>( + NMonitoring::TLabels{{"channel", name}}, Metrics_), Traffic_, Events_); + } + OtherOutputChannel_ = TOutputChannel(std::make_shared<NMonitoring::TMetricSubRegistry>( + NMonitoring::TLabels{{"channel", "other"}}, Metrics_), Traffic_, Events_); + + InputChannels_ = TInputChannels(Metrics_, Common->ChannelName, Traffic_, Events_, ScopeErrors_); + } + + if (updateAdaptive) { + SessionDeaths_ = createRate(AdaptiveMetrics_, "interconnect.session_deaths"); + HandshakeFails_ = createRate(AdaptiveMetrics_, "interconnect.handshake_fails"); + InflyLimitReach_ = createRate(AdaptiveMetrics_, "interconnect.infly_limit_reach"); + InflightDataAmount_ = createRate(AdaptiveMetrics_, "interconnect.inflight_data"); + PingTimeHistogram_ = AdaptiveMetrics_->HistogramRate( + NMonitoring::MakeLabels({{"sensor", "interconnect.ping_time_us"}}), NMonitoring::ExponentialHistogram(18, 2, 125)); + } + + if (updateGlobal) { + OutputBuffersTotalSize_ = createRate(Metrics_, "interconnect.output_buffers_total_size"); + SendSyscalls_ = createRate(Metrics_, "interconnect.send_syscalls"); + RecvSyscalls_ = createRate(Metrics_, "interconnect.recv_syscalls"); + SpuriousReadWakeups_ = createRate(Metrics_, "interconnect.spurious_read_wakeups"); + UsefulReadWakeups_ = createRate(Metrics_, "interconnect.useful_read_wakeups"); + SpuriousWriteWakeups_ = createRate(Metrics_, "interconnect.spurious_write_wakeups"); + UsefulWriteWakeups_ = createRate(Metrics_, "interconnect.useful_write_wakeups"); + SubscribersCount_ = createIntGauge(AdaptiveMetrics_, "interconnect.subscribers_count"); + TotalBytesWritten_ = createRate(Metrics_, "interconnect.total_bytes_written"); + TotalBytesRead_ = createRate(Metrics_, "interconnect.total_bytes_read"); + + for (const char *reason : TDisconnectReason::Reasons) { + DisconnectByReason_[reason] = Metrics_->Rate( + NMonitoring::MakeLabels({ + {"sensor", "interconnect.disconnect_reason"}, + {"reason", reason}, + })); + } + } + + Initialized_ = true; + } + + TOutputChannel GetOutputChannel(ui16 index) const { + Y_VERIFY(Initialized_); + const auto it = OutputChannels_.find(index); + return it != OutputChannels_.end() ? it->second : OtherOutputChannel_; + } + + private: + const TInterconnectProxyCommon::TPtr Common; + const bool MergePerDataCenterMetrics_; + const bool MergePerPeerMetrics_; + std::shared_ptr<NMonitoring::IMetricRegistry> Metrics_; + std::shared_ptr<NMonitoring::IMetricRegistry> PerSessionMetrics_; + std::shared_ptr<NMonitoring::IMetricRegistry> PerDataCenterMetrics_; + std::shared_ptr<NMonitoring::IMetricRegistry>& AdaptiveMetrics_; + bool Initialized_ = false; + + NMonitoring::IRate* Traffic_; + + NMonitoring::IRate* Events_; + NMonitoring::IRate* ScopeErrors_; + NMonitoring::IRate* Disconnections_; + NMonitoring::IIntGauge* Connected_; + + NMonitoring::IRate* SessionDeaths_; + NMonitoring::IRate* HandshakeFails_; + NMonitoring::IRate* InflyLimitReach_; + NMonitoring::IRate* InflightDataAmount_; + NMonitoring::IRate* OutputBuffersTotalSize_; + NMonitoring::IIntGauge* SubscribersCount_; + NMonitoring::IRate* SendSyscalls_; + NMonitoring::IRate* RecvSyscalls_; + NMonitoring::IRate* SpuriousWriteWakeups_; + NMonitoring::IRate* UsefulWriteWakeups_; + NMonitoring::IRate* SpuriousReadWakeups_; + NMonitoring::IRate* UsefulReadWakeups_; + NMonitoring::IIntGauge* ClockSkewMicrosec_; + + NMonitoring::IHistogram* PingTimeHistogram_; + + std::unordered_map<ui16, TOutputChannel> OutputChannels_; + TOutputChannel OtherOutputChannel_; + TInputChannels InputChannels_; + + THashMap<TString, NMonitoring::IRate*> DisconnectByReason_; + + NMonitoring::IRate* TotalBytesWritten_; + NMonitoring::IRate* TotalBytesRead_; + }; + +} // namespace + +std::unique_ptr<IInterconnectMetrics> CreateInterconnectCounters(const TInterconnectProxyCommon::TPtr& common) { + return std::make_unique<TInterconnectCounters>(common); +} + +std::unique_ptr<IInterconnectMetrics> CreateInterconnectMetrics(const TInterconnectProxyCommon::TPtr& common) { + return std::make_unique<TInterconnectMetrics>(common); +} + +} // NActors |