aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorakchernikov <akchernikov@yandex-team.ru>2022-02-10 16:50:44 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:44 +0300
commitea46c401e7900b229add3e6074dbf89adc84ebfc (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp
parent87cccadbd489f00bc6d81b27ad182277cbb25826 (diff)
downloadydb-ea46c401e7900b229add3e6074dbf89adc84ebfc.tar.gz
Restoring authorship annotation for <akchernikov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/core/log.cpp316
-rw-r--r--library/cpp/actors/core/log.h46
-rw-r--r--library/cpp/actors/interconnect/channel_scheduler.h14
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp2
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h8
-rw-r--r--library/cpp/actors/interconnect/interconnect_common.h4
-rw-r--r--library/cpp/actors/interconnect/interconnect_counters.cpp920
-rw-r--r--library/cpp/actors/interconnect/interconnect_counters.h98
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp24
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp14
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp54
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h4
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp6
-rw-r--r--library/cpp/actors/interconnect/ya.make4
15 files changed, 758 insertions, 758 deletions
diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp
index 90beec0407..5f63b5af58 100644
--- a/library/cpp/actors/core/log.cpp
+++ b/library/cpp/actors/core/log.cpp
@@ -33,137 +33,137 @@ namespace {
}
namespace NActors {
-
- class TLoggerCounters : public ILoggerMetrics {
- public:
- TLoggerCounters(TIntrusivePtr<NMonitoring::TDynamicCounters> counters)
- : DynamicCounters(counters)
- {
- ActorMsgs_ = DynamicCounters->GetCounter("ActorMsgs", true);
- DirectMsgs_ = DynamicCounters->GetCounter("DirectMsgs", true);
- LevelRequests_ = DynamicCounters->GetCounter("LevelRequests", true);
- IgnoredMsgs_ = DynamicCounters->GetCounter("IgnoredMsgs", true);
- DroppedMsgs_ = DynamicCounters->GetCounter("DroppedMsgs", true);
-
- AlertMsgs_ = DynamicCounters->GetCounter("AlertMsgs", true);
- EmergMsgs_ = DynamicCounters->GetCounter("EmergMsgs", true);
- }
-
- ~TLoggerCounters() = default;
-
- void IncActorMsgs() override {
- ++*ActorMsgs_;
- }
- void IncDirectMsgs() override {
- ++*DirectMsgs_;
- }
- void IncLevelRequests() override {
- ++*LevelRequests_;
- }
- void IncIgnoredMsgs() override {
- ++*IgnoredMsgs_;
- }
- void IncAlertMsgs() override {
- ++*AlertMsgs_;
- }
- void IncEmergMsgs() override {
- ++*EmergMsgs_;
- }
- void IncDroppedMsgs() override {
- DroppedMsgs_->Inc();
- };
-
- void GetOutputHtml(IOutputStream& str) override {
- HTML(str) {
- DIV_CLASS("row") {
- DIV_CLASS("col-md-12") {
- H4() {
- str << "Counters" << Endl;
- }
- DynamicCounters->OutputHtml(str);
- }
- }
- }
- }
-
- private:
- NMonitoring::TDynamicCounters::TCounterPtr ActorMsgs_;
- NMonitoring::TDynamicCounters::TCounterPtr DirectMsgs_;
- NMonitoring::TDynamicCounters::TCounterPtr LevelRequests_;
- NMonitoring::TDynamicCounters::TCounterPtr IgnoredMsgs_;
- NMonitoring::TDynamicCounters::TCounterPtr AlertMsgs_;
- NMonitoring::TDynamicCounters::TCounterPtr EmergMsgs_;
- // Dropped while the logger backend was unavailable
- NMonitoring::TDynamicCounters::TCounterPtr DroppedMsgs_;
-
- TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters;
- };
-
- class TLoggerMetrics : public ILoggerMetrics {
- public:
- TLoggerMetrics(std::shared_ptr<NMonitoring::TMetricRegistry> metrics)
- : Metrics(metrics)
- {
- ActorMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.actor_msgs"}});
- DirectMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.direct_msgs"}});
- LevelRequests_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.level_requests"}});
- IgnoredMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.ignored_msgs"}});
- DroppedMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.dropped_msgs"}});
-
- AlertMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.alert_msgs"}});
- EmergMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.emerg_msgs"}});
- }
-
- ~TLoggerMetrics() = default;
-
- void IncActorMsgs() override {
- ActorMsgs_->Inc();
- }
- void IncDirectMsgs() override {
- DirectMsgs_->Inc();
- }
- void IncLevelRequests() override {
- LevelRequests_->Inc();
- }
- void IncIgnoredMsgs() override {
- IgnoredMsgs_->Inc();
- }
- void IncAlertMsgs() override {
- AlertMsgs_->Inc();
- }
- void IncEmergMsgs() override {
- EmergMsgs_->Inc();
- }
- void IncDroppedMsgs() override {
- DroppedMsgs_->Inc();
- };
-
- void GetOutputHtml(IOutputStream& str) override {
- HTML(str) {
- DIV_CLASS("row") {
- DIV_CLASS("col-md-12") {
- H4() {
- str << "Metrics" << Endl;
- }
- // TODO: Now, TMetricRegistry does not have the GetOutputHtml function
- }
- }
- }
- }
-
- private:
- NMonitoring::TRate* ActorMsgs_;
- NMonitoring::TRate* DirectMsgs_;
- NMonitoring::TRate* LevelRequests_;
- NMonitoring::TRate* IgnoredMsgs_;
- NMonitoring::TRate* AlertMsgs_;
- NMonitoring::TRate* EmergMsgs_;
- // Dropped while the logger backend was unavailable
- NMonitoring::TRate* DroppedMsgs_;
-
- std::shared_ptr<NMonitoring::TMetricRegistry> Metrics;
- };
-
+
+ class TLoggerCounters : public ILoggerMetrics {
+ public:
+ TLoggerCounters(TIntrusivePtr<NMonitoring::TDynamicCounters> counters)
+ : DynamicCounters(counters)
+ {
+ ActorMsgs_ = DynamicCounters->GetCounter("ActorMsgs", true);
+ DirectMsgs_ = DynamicCounters->GetCounter("DirectMsgs", true);
+ LevelRequests_ = DynamicCounters->GetCounter("LevelRequests", true);
+ IgnoredMsgs_ = DynamicCounters->GetCounter("IgnoredMsgs", true);
+ DroppedMsgs_ = DynamicCounters->GetCounter("DroppedMsgs", true);
+
+ AlertMsgs_ = DynamicCounters->GetCounter("AlertMsgs", true);
+ EmergMsgs_ = DynamicCounters->GetCounter("EmergMsgs", true);
+ }
+
+ ~TLoggerCounters() = default;
+
+ void IncActorMsgs() override {
+ ++*ActorMsgs_;
+ }
+ void IncDirectMsgs() override {
+ ++*DirectMsgs_;
+ }
+ void IncLevelRequests() override {
+ ++*LevelRequests_;
+ }
+ void IncIgnoredMsgs() override {
+ ++*IgnoredMsgs_;
+ }
+ void IncAlertMsgs() override {
+ ++*AlertMsgs_;
+ }
+ void IncEmergMsgs() override {
+ ++*EmergMsgs_;
+ }
+ void IncDroppedMsgs() override {
+ DroppedMsgs_->Inc();
+ };
+
+ void GetOutputHtml(IOutputStream& str) override {
+ HTML(str) {
+ DIV_CLASS("row") {
+ DIV_CLASS("col-md-12") {
+ H4() {
+ str << "Counters" << Endl;
+ }
+ DynamicCounters->OutputHtml(str);
+ }
+ }
+ }
+ }
+
+ private:
+ NMonitoring::TDynamicCounters::TCounterPtr ActorMsgs_;
+ NMonitoring::TDynamicCounters::TCounterPtr DirectMsgs_;
+ NMonitoring::TDynamicCounters::TCounterPtr LevelRequests_;
+ NMonitoring::TDynamicCounters::TCounterPtr IgnoredMsgs_;
+ NMonitoring::TDynamicCounters::TCounterPtr AlertMsgs_;
+ NMonitoring::TDynamicCounters::TCounterPtr EmergMsgs_;
+ // Dropped while the logger backend was unavailable
+ NMonitoring::TDynamicCounters::TCounterPtr DroppedMsgs_;
+
+ TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters;
+ };
+
+ class TLoggerMetrics : public ILoggerMetrics {
+ public:
+ TLoggerMetrics(std::shared_ptr<NMonitoring::TMetricRegistry> metrics)
+ : Metrics(metrics)
+ {
+ ActorMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.actor_msgs"}});
+ DirectMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.direct_msgs"}});
+ LevelRequests_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.level_requests"}});
+ IgnoredMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.ignored_msgs"}});
+ DroppedMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.dropped_msgs"}});
+
+ AlertMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.alert_msgs"}});
+ EmergMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.emerg_msgs"}});
+ }
+
+ ~TLoggerMetrics() = default;
+
+ void IncActorMsgs() override {
+ ActorMsgs_->Inc();
+ }
+ void IncDirectMsgs() override {
+ DirectMsgs_->Inc();
+ }
+ void IncLevelRequests() override {
+ LevelRequests_->Inc();
+ }
+ void IncIgnoredMsgs() override {
+ IgnoredMsgs_->Inc();
+ }
+ void IncAlertMsgs() override {
+ AlertMsgs_->Inc();
+ }
+ void IncEmergMsgs() override {
+ EmergMsgs_->Inc();
+ }
+ void IncDroppedMsgs() override {
+ DroppedMsgs_->Inc();
+ };
+
+ void GetOutputHtml(IOutputStream& str) override {
+ HTML(str) {
+ DIV_CLASS("row") {
+ DIV_CLASS("col-md-12") {
+ H4() {
+ str << "Metrics" << Endl;
+ }
+ // TODO: Now, TMetricRegistry does not have the GetOutputHtml function
+ }
+ }
+ }
+ }
+
+ private:
+ NMonitoring::TRate* ActorMsgs_;
+ NMonitoring::TRate* DirectMsgs_;
+ NMonitoring::TRate* LevelRequests_;
+ NMonitoring::TRate* IgnoredMsgs_;
+ NMonitoring::TRate* AlertMsgs_;
+ NMonitoring::TRate* EmergMsgs_;
+ // Dropped while the logger backend was unavailable
+ NMonitoring::TRate* DroppedMsgs_;
+
+ std::shared_ptr<NMonitoring::TMetricRegistry> Metrics;
+ };
+
TAtomic TLoggerActor::IsOverflow = 0;
TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings,
@@ -172,7 +172,7 @@ namespace NActors {
: TActor(&TLoggerActor::StateFunc)
, Settings(settings)
, LogBackend(logBackend.Release())
- , Metrics(std::make_unique<TLoggerCounters>(counters))
+ , Metrics(std::make_unique<TLoggerCounters>(counters))
{
}
@@ -182,35 +182,35 @@ namespace NActors {
: TActor(&TLoggerActor::StateFunc)
, Settings(settings)
, LogBackend(logBackend)
- , Metrics(std::make_unique<TLoggerCounters>(counters))
+ , Metrics(std::make_unique<TLoggerCounters>(counters))
+ {
+ }
+
+ TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings,
+ TAutoPtr<TLogBackend> logBackend,
+ std::shared_ptr<NMonitoring::TMetricRegistry> metrics)
+ : TActor(&TLoggerActor::StateFunc)
+ , Settings(settings)
+ , LogBackend(logBackend.Release())
+ , Metrics(std::make_unique<TLoggerMetrics>(metrics))
{
}
- TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings,
- TAutoPtr<TLogBackend> logBackend,
- std::shared_ptr<NMonitoring::TMetricRegistry> metrics)
- : TActor(&TLoggerActor::StateFunc)
- , Settings(settings)
- , LogBackend(logBackend.Release())
- , Metrics(std::make_unique<TLoggerMetrics>(metrics))
+ TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings,
+ std::shared_ptr<TLogBackend> logBackend,
+ std::shared_ptr<NMonitoring::TMetricRegistry> metrics)
+ : TActor(&TLoggerActor::StateFunc)
+ , Settings(settings)
+ , LogBackend(logBackend)
+ , Metrics(std::make_unique<TLoggerMetrics>(metrics))
{
- }
-
- TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings,
- std::shared_ptr<TLogBackend> logBackend,
- std::shared_ptr<NMonitoring::TMetricRegistry> metrics)
- : TActor(&TLoggerActor::StateFunc)
- , Settings(settings)
- , LogBackend(logBackend)
- , Metrics(std::make_unique<TLoggerMetrics>(metrics))
- {
}
TLoggerActor::~TLoggerActor() {
}
void TLoggerActor::Log(TInstant time, NLog::EPriority priority, NLog::EComponent component, const char* c, ...) {
- Metrics->IncDirectMsgs();
+ Metrics->IncDirectMsgs();
if (Settings && Settings->Satisfies(priority, component, 0ull)) {
va_list params;
va_start(params, c);
@@ -247,16 +247,16 @@ namespace NActors {
}
void TLoggerActor::WriteMessageStat(const NLog::TEvLog& ev) {
- Metrics->IncActorMsgs();
+ Metrics->IncActorMsgs();
const auto prio = ev.Level.ToPrio();
switch (prio) {
case ::NActors::NLog::EPrio::Alert:
- Metrics->IncAlertMsgs();
+ Metrics->IncAlertMsgs();
break;
case ::NActors::NLog::EPrio::Emerg:
- Metrics->IncEmergMsgs();
+ Metrics->IncEmergMsgs();
break;
default:
break;
@@ -274,7 +274,7 @@ namespace NActors {
// Check if some records have to be dropped
if ((PassedCount > 10 && delayMillisec > (i64)Settings->TimeThresholdMs) || IgnoredCount > 0) {
- Metrics->IncIgnoredMsgs();
+ Metrics->IncIgnoredMsgs();
if (IgnoredCount == 0) {
ctx.Send(ctx.SelfID, new TLogIgnored());
}
@@ -303,7 +303,7 @@ namespace NActors {
}
void TLoggerActor::HandleLogComponentLevelRequest(TLogComponentLevelRequest::TPtr& ev, const NActors::TActorContext& ctx) {
- Metrics->IncLevelRequests();
+ Metrics->IncLevelRequests();
TString explanation;
int code = Settings->SetLevel(ev->Get()->Priority, ev->Get()->Component, explanation);
ctx.Send(ev->Sender, new TLogComponentLevelResponse(code, explanation));
@@ -565,7 +565,7 @@ namespace NActors {
str << "</form>" << Endl;
}
}
- Metrics->GetOutputHtml(str);
+ Metrics->GetOutputHtml(str);
}
}
@@ -642,7 +642,7 @@ namespace NActors {
void TLoggerActor::HandleLogEventDrop(const NLog::TEvLog::TPtr& ev) {
WriteMessageStat(*ev->Get());
- Metrics->IncDroppedMsgs();
+ Metrics->IncDroppedMsgs();
}
void TLoggerActor::HandleWakeup() {
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index f5f11db614..c11a7cf3c1 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -15,7 +15,7 @@
#include <util/string/builder.h>
#include <library/cpp/logger/all.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
-#include <library/cpp/monlib/metrics/metric_registry.h>
+#include <library/cpp/monlib/metrics/metric_registry.h>
#include <library/cpp/json/writer/json.h>
#include <library/cpp/svnversion/svnversion.h>
@@ -175,21 +175,21 @@ namespace NActors {
////////////////////////////////////////////////////////////////////////////////
// LOGGER ACTOR
////////////////////////////////////////////////////////////////////////////////
- class ILoggerMetrics {
- public:
- virtual ~ILoggerMetrics() = default;
-
- virtual void IncActorMsgs() = 0;
- virtual void IncDirectMsgs() = 0;
- virtual void IncLevelRequests() = 0;
- virtual void IncIgnoredMsgs() = 0;
- virtual void IncAlertMsgs() = 0;
- virtual void IncEmergMsgs() = 0;
- virtual void IncDroppedMsgs() = 0;
-
- virtual void GetOutputHtml(IOutputStream&) = 0;
- };
-
+ class ILoggerMetrics {
+ public:
+ virtual ~ILoggerMetrics() = default;
+
+ virtual void IncActorMsgs() = 0;
+ virtual void IncDirectMsgs() = 0;
+ virtual void IncLevelRequests() = 0;
+ virtual void IncIgnoredMsgs() = 0;
+ virtual void IncAlertMsgs() = 0;
+ virtual void IncEmergMsgs() = 0;
+ virtual void IncDroppedMsgs() = 0;
+
+ virtual void GetOutputHtml(IOutputStream&) = 0;
+ };
+
class TLoggerActor: public TActor<TLoggerActor> {
public:
static constexpr IActor::EActivityType ActorActivityType() {
@@ -202,12 +202,12 @@ namespace NActors {
TLoggerActor(TIntrusivePtr<NLog::TSettings> settings,
std::shared_ptr<TLogBackend> logBackend,
TIntrusivePtr<NMonitoring::TDynamicCounters> counters);
- TLoggerActor(TIntrusivePtr<NLog::TSettings> settings,
- TAutoPtr<TLogBackend> logBackend,
- std::shared_ptr<NMonitoring::TMetricRegistry> metrics);
- TLoggerActor(TIntrusivePtr<NLog::TSettings> settings,
- std::shared_ptr<TLogBackend> logBackend,
- std::shared_ptr<NMonitoring::TMetricRegistry> metrics);
+ TLoggerActor(TIntrusivePtr<NLog::TSettings> settings,
+ TAutoPtr<TLogBackend> logBackend,
+ std::shared_ptr<NMonitoring::TMetricRegistry> metrics);
+ TLoggerActor(TIntrusivePtr<NLog::TSettings> settings,
+ std::shared_ptr<TLogBackend> logBackend,
+ std::shared_ptr<NMonitoring::TMetricRegistry> metrics);
~TLoggerActor();
void StateFunc(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
@@ -241,7 +241,7 @@ namespace NActors {
ui64 PassedCount = 0;
static TAtomic IsOverflow;
TDuration WakeupInterval{TDuration::Seconds(5)};
- std::unique_ptr<ILoggerMetrics> Metrics;
+ std::unique_ptr<ILoggerMetrics> Metrics;
void BecomeDefunct();
void HandleIgnoredEvent(TLogIgnored::TPtr& ev, const NActors::TActorContext& ctx);
diff --git a/library/cpp/actors/interconnect/channel_scheduler.h b/library/cpp/actors/interconnect/channel_scheduler.h
index 803135e839..551a4cb61a 100644
--- a/library/cpp/actors/interconnect/channel_scheduler.h
+++ b/library/cpp/actors/interconnect/channel_scheduler.h
@@ -3,15 +3,15 @@
#include "interconnect_channel.h"
#include "event_holder_pool.h"
-#include <memory>
-
+#include <memory>
+
namespace NActors {
class TChannelScheduler {
const ui32 PeerNodeId;
std::array<std::optional<TEventOutputChannel>, 16> ChannelArray;
THashMap<ui16, TEventOutputChannel> ChannelMap;
- std::shared_ptr<IInterconnectMetrics> Metrics;
+ std::shared_ptr<IInterconnectMetrics> Metrics;
TEventHolderPool& Pool;
const ui32 MaxSerializedEventSize;
const TSessionParams Params;
@@ -29,10 +29,10 @@ namespace NActors {
public:
TChannelScheduler(ui32 peerNodeId, const TChannelsConfig& predefinedChannels,
- std::shared_ptr<IInterconnectMetrics> metrics, TEventHolderPool& pool, ui32 maxSerializedEventSize,
+ std::shared_ptr<IInterconnectMetrics> metrics, TEventHolderPool& pool, ui32 maxSerializedEventSize,
TSessionParams params)
: PeerNodeId(peerNodeId)
- , Metrics(std::move(metrics))
+ , Metrics(std::move(metrics))
, Pool(pool)
, MaxSerializedEventSize(maxSerializedEventSize)
, Params(std::move(params))
@@ -73,7 +73,7 @@ namespace NActors {
if (channel < ChannelArray.size()) {
auto& res = ChannelArray[channel];
if (Y_UNLIKELY(!res)) {
- res.emplace(Pool, channel, PeerNodeId, MaxSerializedEventSize, Metrics,
+ res.emplace(Pool, channel, PeerNodeId, MaxSerializedEventSize, Metrics,
Params);
}
return *res;
@@ -82,7 +82,7 @@ namespace NActors {
if (Y_UNLIKELY(it == ChannelMap.end())) {
it = ChannelMap.emplace(std::piecewise_construct, std::forward_as_tuple(channel),
std::forward_as_tuple(Pool, channel, PeerNodeId, MaxSerializedEventSize,
- Metrics, Params)).first;
+ Metrics, Params)).first;
}
return it->second;
}
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index dc366d590c..a66ba2a154 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -38,7 +38,7 @@ namespace NActors {
task.AppendBuf(part, amount);
*weightConsumed += amount;
OutputQueueSize -= part->Size;
- Metrics->UpdateOutputChannelEvents(ChannelId);
+ Metrics->UpdateOutputChannelEvents(ChannelId);
return true;
}
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index 5a9cc206d7..e4a0ae3cda 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -43,12 +43,12 @@ namespace NActors {
class TEventOutputChannel : public TInterconnectLoggingBase {
public:
TEventOutputChannel(TEventHolderPool& pool, ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize,
- std::shared_ptr<IInterconnectMetrics> metrics, TSessionParams params)
+ std::shared_ptr<IInterconnectMetrics> metrics, TSessionParams params)
: TInterconnectLoggingBase(Sprintf("OutputChannel %" PRIu16 " [node %" PRIu32 "]", id, peerNodeId))
, Pool(pool)
, PeerNodeId(peerNodeId)
, ChannelId(id)
- , Metrics(std::move(metrics))
+ , Metrics(std::move(metrics))
, Params(std::move(params))
, MaxSerializedEventSize(maxSerializedEventSize)
{}
@@ -88,7 +88,7 @@ namespace NActors {
TEventHolderPool& Pool;
const ui32 PeerNodeId;
const ui16 ChannelId;
- std::shared_ptr<IInterconnectMetrics> Metrics;
+ std::shared_ptr<IInterconnectMetrics> Metrics;
const TSessionParams Params;
const ui32 MaxSerializedEventSize;
ui64 UnaccountedTraffic = 0;
@@ -118,7 +118,7 @@ namespace NActors {
void AccountTraffic() {
if (const ui64 amount = std::exchange(UnaccountedTraffic, 0)) {
- Metrics->UpdateOutputChannelTraffic(ChannelId, amount);
+ Metrics->UpdateOutputChannelTraffic(ChannelId, amount);
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h
index a039312fb6..285709a00c 100644
--- a/library/cpp/actors/interconnect/interconnect_common.h
+++ b/library/cpp/actors/interconnect/interconnect_common.h
@@ -4,7 +4,7 @@
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
-#include <library/cpp/monlib/metrics/metric_registry.h>
+#include <library/cpp/monlib/metrics/metric_registry.h>
#include <util/generic/map.h>
#include <util/generic/set.h>
#include <util/system/datetime.h>
@@ -73,7 +73,7 @@ namespace NActors {
struct TInterconnectProxyCommon : TAtomicRefCount<TInterconnectProxyCommon> {
TActorId NameserviceId;
NMonitoring::TDynamicCounterPtr MonCounters;
- std::shared_ptr<NMonitoring::IMetricRegistry> Metrics;
+ std::shared_ptr<NMonitoring::IMetricRegistry> Metrics;
TChannelsConfig ChannelsConfig;
TInterconnectSettings Settings;
TRegisterMonPageCallback RegisterMonPage;
diff --git a/library/cpp/actors/interconnect/interconnect_counters.cpp b/library/cpp/actors/interconnect/interconnect_counters.cpp
index a206818160..224160d4b4 100644
--- a/library/cpp/actors/interconnect/interconnect_counters.cpp
+++ b/library/cpp/actors/interconnect/interconnect_counters.cpp
@@ -1,15 +1,15 @@
-#include "interconnect_counters.h"
+#include "interconnect_counters.h"
-#include <library/cpp/monlib/metrics/metric_registry.h>
-#include <library/cpp/monlib/metrics/metric_sub_registry.h>
+#include <library/cpp/monlib/metrics/metric_registry.h>
+#include <library/cpp/monlib/metrics/metric_sub_registry.h>
-#include <unordered_map>
+#include <unordered_map>
namespace NActors {
-namespace {
-
- class TInterconnectCounters: public IInterconnectMetrics {
+namespace {
+
+ class TInterconnectCounters: public IInterconnectMetrics {
public:
struct TOutputChannel {
NMonitoring::TDynamicCounters::TCounterPtr Traffic;
@@ -87,7 +87,7 @@ namespace {
NMonitoring::TDynamicCounterPtr PerSessionCounters;
NMonitoring::TDynamicCounterPtr PerDataCenterCounters;
NMonitoring::TDynamicCounterPtr& AdaptiveCounters;
-
+
bool Initialized = false;
NMonitoring::TDynamicCounters::TCounterPtr Traffic;
@@ -100,135 +100,135 @@ namespace {
, MergePerDataCenterCounters(common->Settings.MergePerDataCenterCounters)
, MergePerPeerCounters(common->Settings.MergePerPeerCounters)
, Counters(common->MonCounters)
- , AdaptiveCounters(MergePerDataCenterCounters
- ? PerDataCenterCounters :
- MergePerPeerCounters ? Counters : PerSessionCounters)
+ , AdaptiveCounters(MergePerDataCenterCounters
+ ? PerDataCenterCounters :
+ MergePerPeerCounters ? Counters : PerSessionCounters)
{}
void AddInflightDataAmount(ui64 value) override {
- *InflightDataAmount += value;
- }
-
+ *InflightDataAmount += value;
+ }
+
void SubInflightDataAmount(ui64 value) override {
- *InflightDataAmount -= value;
- }
-
+ *InflightDataAmount -= value;
+ }
+
void AddTotalBytesWritten(ui64 value) override {
- *TotalBytesWritten += value;
- }
-
+ *TotalBytesWritten += value;
+ }
+
void SetClockSkewMicrosec(i64 value) override {
- *ClockSkewMicrosec = value;
- }
-
- void IncSessionDeaths() override {
- ++*SessionDeaths;
- }
-
- void IncHandshakeFails() override {
- ++*HandshakeFails;
- }
-
- void SetConnected(ui32 value) override {
- *Connected = value;
- }
-
- void IncSubscribersCount() override {
- ++*SubscribersCount;
- }
-
- void SubSubscribersCount(ui32 value) override {
- *SubscribersCount -= value;
- }
-
+ *ClockSkewMicrosec = value;
+ }
+
+ void IncSessionDeaths() override {
+ ++*SessionDeaths;
+ }
+
+ void IncHandshakeFails() override {
+ ++*HandshakeFails;
+ }
+
+ void SetConnected(ui32 value) override {
+ *Connected = value;
+ }
+
+ void IncSubscribersCount() override {
+ ++*SubscribersCount;
+ }
+
+ void SubSubscribersCount(ui32 value) override {
+ *SubscribersCount -= value;
+ }
+
void SubOutputBuffersTotalSize(ui64 value) override {
- *OutputBuffersTotalSize -= value;
- }
-
+ *OutputBuffersTotalSize -= value;
+ }
+
void AddOutputBuffersTotalSize(ui64 value) override {
- *OutputBuffersTotalSize += value;
- }
-
+ *OutputBuffersTotalSize += value;
+ }
+
ui64 GetOutputBuffersTotalSize() const override {
- return *OutputBuffersTotalSize;
- }
-
- void IncDisconnections() override {
- ++*Disconnections;
- }
-
- void IncUsefulWriteWakeups() override {
- ++*UsefulWriteWakeups;
- }
-
- void IncSpuriousWriteWakeups() override {
- ++*SpuriousWriteWakeups;
- }
-
- void IncSendSyscalls() override {
- ++*SendSyscalls;
- }
-
- void IncInflyLimitReach() override {
- ++*InflyLimitReach;
- }
-
- void IncUsefulReadWakeups() override {
- ++*UsefulReadWakeups;
- }
-
- void IncSpuriousReadWakeups() override {
- ++*SpuriousReadWakeups;
- }
-
- void IncDisconnectByReason(const TString& s) override {
- if (auto it = DisconnectByReason.find(s); it != DisconnectByReason.end()) {
- it->second->Inc();
- }
- }
-
+ return *OutputBuffersTotalSize;
+ }
+
+ void IncDisconnections() override {
+ ++*Disconnections;
+ }
+
+ void IncUsefulWriteWakeups() override {
+ ++*UsefulWriteWakeups;
+ }
+
+ void IncSpuriousWriteWakeups() override {
+ ++*SpuriousWriteWakeups;
+ }
+
+ void IncSendSyscalls() override {
+ ++*SendSyscalls;
+ }
+
+ void IncInflyLimitReach() override {
+ ++*InflyLimitReach;
+ }
+
+ void IncUsefulReadWakeups() override {
+ ++*UsefulReadWakeups;
+ }
+
+ void IncSpuriousReadWakeups() override {
+ ++*SpuriousReadWakeups;
+ }
+
+ void IncDisconnectByReason(const TString& s) override {
+ if (auto it = DisconnectByReason.find(s); it != DisconnectByReason.end()) {
+ it->second->Inc();
+ }
+ }
+
void AddInputChannelsIncomingTraffic(ui16 channel, ui64 incomingTraffic) override {
- auto& ch = InputChannels.Get(channel);
- *ch.IncomingTraffic += incomingTraffic;
- }
-
- void IncInputChannelsIncomingEvents(ui16 channel) override {
- auto& ch = InputChannels.Get(channel);
- ++*ch.IncomingEvents;
- }
-
- void IncRecvSyscalls() override {
- ++*RecvSyscalls;
- }
-
+ auto& ch = InputChannels.Get(channel);
+ *ch.IncomingTraffic += incomingTraffic;
+ }
+
+ void IncInputChannelsIncomingEvents(ui16 channel) override {
+ auto& ch = InputChannels.Get(channel);
+ ++*ch.IncomingEvents;
+ }
+
+ void IncRecvSyscalls() override {
+ ++*RecvSyscalls;
+ }
+
void AddTotalBytesRead(ui64 value) override {
- *TotalBytesRead += value;
- }
-
- void UpdateLegacyPingTimeHist(ui64 value) override {
- LegacyPingTimeHist.Add(value);
- PingTimeHistogram->Collect(value);
- }
-
+ *TotalBytesRead += value;
+ }
+
+ void UpdateLegacyPingTimeHist(ui64 value) override {
+ LegacyPingTimeHist.Add(value);
+ PingTimeHistogram->Collect(value);
+ }
+
void UpdateOutputChannelTraffic(ui16 channel, ui64 value) override {
- if (GetOutputChannel(channel).OutgoingTraffic) {
- *(GetOutputChannel(channel).OutgoingTraffic) += value;
- }
- if (GetOutputChannel(channel).Traffic) {
- *(GetOutputChannel(channel).Traffic) += value;
- }
- }
-
- void UpdateOutputChannelEvents(ui16 channel) override {
- if (GetOutputChannel(channel).OutgoingEvents) {
- ++*(GetOutputChannel(channel).OutgoingEvents);
- }
- if (GetOutputChannel(channel).Events) {
- ++*(GetOutputChannel(channel).Events);
- }
- }
-
- void SetPeerInfo(const TString& name, const TString& dataCenterId) override {
+ if (GetOutputChannel(channel).OutgoingTraffic) {
+ *(GetOutputChannel(channel).OutgoingTraffic) += value;
+ }
+ if (GetOutputChannel(channel).Traffic) {
+ *(GetOutputChannel(channel).Traffic) += value;
+ }
+ }
+
+ void UpdateOutputChannelEvents(ui16 channel) override {
+ if (GetOutputChannel(channel).OutgoingEvents) {
+ ++*(GetOutputChannel(channel).OutgoingEvents);
+ }
+ if (GetOutputChannel(channel).Events) {
+ ++*(GetOutputChannel(channel).Events);
+ }
+ }
+
+ void SetPeerInfo(const TString& name, const TString& dataCenterId) override {
if (name != std::exchange(HumanFriendlyPeerHostName, name)) {
PerSessionCounters.Reset();
}
@@ -312,7 +312,7 @@ namespace {
return it != OutputChannels.end() ? it->second : OtherOutputChannel;
}
- private:
+ private:
NMonitoring::TDynamicCounters::TCounterPtr SessionDeaths;
NMonitoring::TDynamicCounters::TCounterPtr HandshakeFails;
NMonitoring::TDynamicCounters::TCounterPtr Connected;
@@ -340,353 +340,353 @@ namespace {
NMonitoring::TDynamicCounters::TCounterPtr TotalBytesWritten, TotalBytesRead;
};
- class TInterconnectMetrics: public IInterconnectMetrics {
- public:
- struct TOutputChannel {
- NMonitoring::IRate* Traffic;
- NMonitoring::IRate* Events;
- NMonitoring::IRate* OutgoingTraffic;
- NMonitoring::IRate* OutgoingEvents;
-
- TOutputChannel() = default;
-
- TOutputChannel(const std::shared_ptr<NMonitoring::IMetricRegistry>& metrics,
- NMonitoring::IRate* traffic,
- NMonitoring::IRate* events)
- : Traffic(traffic)
- , Events(events)
- , OutgoingTraffic(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.outgoing_traffic"}})))
- , OutgoingEvents(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.outgoing_events"}})))
- {}
-
- TOutputChannel(const TOutputChannel&) = default;
- };
-
- struct TInputChannel {
- NMonitoring::IRate* Traffic;
- NMonitoring::IRate* Events;
- NMonitoring::IRate* ScopeErrors;
- NMonitoring::IRate* IncomingTraffic;
- NMonitoring::IRate* IncomingEvents;
-
- TInputChannel() = default;
-
- TInputChannel(const std::shared_ptr<NMonitoring::IMetricRegistry>& metrics,
- NMonitoring::IRate* traffic, NMonitoring::IRate* events,
- NMonitoring::IRate* scopeErrors)
- : Traffic(traffic)
- , Events(events)
- , ScopeErrors(scopeErrors)
- , IncomingTraffic(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.incoming_traffic"}})))
- , IncomingEvents(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.incoming_events"}})))
- {}
-
- TInputChannel(const TInputChannel&) = default;
- };
-
- struct TInputChannels : std::unordered_map<ui16, TInputChannel> {
- TInputChannel OtherInputChannel;
-
- TInputChannels() = default;
-
- TInputChannels(const std::shared_ptr<NMonitoring::IMetricRegistry>& metrics,
- const std::unordered_map<ui16, TString>& names,
- NMonitoring::IRate* traffic, NMonitoring::IRate* events,
- NMonitoring::IRate* scopeErrors)
- : OtherInputChannel(std::make_shared<NMonitoring::TMetricSubRegistry>(
- NMonitoring::TLabels{{"channel", "other"}}, metrics), traffic, events, scopeErrors)
- {
- for (const auto& [id, name] : names) {
- try_emplace(id, std::make_shared<NMonitoring::TMetricSubRegistry>(NMonitoring::TLabels{{"channel", name}}, metrics),
- traffic, events, scopeErrors);
- }
- }
-
- TInputChannels(const TInputChannels&) = default;
-
- const TInputChannel& Get(ui16 id) const {
- const auto it = find(id);
- return it != end() ? it->second : OtherInputChannel;
- }
- };
-
- TInterconnectMetrics(const TInterconnectProxyCommon::TPtr& common)
- : Common(common)
- , MergePerDataCenterMetrics_(common->Settings.MergePerDataCenterCounters)
- , MergePerPeerMetrics_(common->Settings.MergePerPeerCounters)
- , Metrics_(common->Metrics)
- , AdaptiveMetrics_(MergePerDataCenterMetrics_
- ? PerDataCenterMetrics_ :
- MergePerPeerMetrics_ ? Metrics_ : PerSessionMetrics_)
- {}
-
+ class TInterconnectMetrics: public IInterconnectMetrics {
+ public:
+ struct TOutputChannel {
+ NMonitoring::IRate* Traffic;
+ NMonitoring::IRate* Events;
+ NMonitoring::IRate* OutgoingTraffic;
+ NMonitoring::IRate* OutgoingEvents;
+
+ TOutputChannel() = default;
+
+ TOutputChannel(const std::shared_ptr<NMonitoring::IMetricRegistry>& metrics,
+ NMonitoring::IRate* traffic,
+ NMonitoring::IRate* events)
+ : Traffic(traffic)
+ , Events(events)
+ , OutgoingTraffic(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.outgoing_traffic"}})))
+ , OutgoingEvents(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.outgoing_events"}})))
+ {}
+
+ TOutputChannel(const TOutputChannel&) = default;
+ };
+
+ struct TInputChannel {
+ NMonitoring::IRate* Traffic;
+ NMonitoring::IRate* Events;
+ NMonitoring::IRate* ScopeErrors;
+ NMonitoring::IRate* IncomingTraffic;
+ NMonitoring::IRate* IncomingEvents;
+
+ TInputChannel() = default;
+
+ TInputChannel(const std::shared_ptr<NMonitoring::IMetricRegistry>& metrics,
+ NMonitoring::IRate* traffic, NMonitoring::IRate* events,
+ NMonitoring::IRate* scopeErrors)
+ : Traffic(traffic)
+ , Events(events)
+ , ScopeErrors(scopeErrors)
+ , IncomingTraffic(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.incoming_traffic"}})))
+ , IncomingEvents(metrics->Rate(NMonitoring::MakeLabels({{"sensor", "interconnect.incoming_events"}})))
+ {}
+
+ TInputChannel(const TInputChannel&) = default;
+ };
+
+ struct TInputChannels : std::unordered_map<ui16, TInputChannel> {
+ TInputChannel OtherInputChannel;
+
+ TInputChannels() = default;
+
+ TInputChannels(const std::shared_ptr<NMonitoring::IMetricRegistry>& metrics,
+ const std::unordered_map<ui16, TString>& names,
+ NMonitoring::IRate* traffic, NMonitoring::IRate* events,
+ NMonitoring::IRate* scopeErrors)
+ : OtherInputChannel(std::make_shared<NMonitoring::TMetricSubRegistry>(
+ NMonitoring::TLabels{{"channel", "other"}}, metrics), traffic, events, scopeErrors)
+ {
+ for (const auto& [id, name] : names) {
+ try_emplace(id, std::make_shared<NMonitoring::TMetricSubRegistry>(NMonitoring::TLabels{{"channel", name}}, metrics),
+ traffic, events, scopeErrors);
+ }
+ }
+
+ TInputChannels(const TInputChannels&) = default;
+
+ const TInputChannel& Get(ui16 id) const {
+ const auto it = find(id);
+ return it != end() ? it->second : OtherInputChannel;
+ }
+ };
+
+ TInterconnectMetrics(const TInterconnectProxyCommon::TPtr& common)
+ : Common(common)
+ , MergePerDataCenterMetrics_(common->Settings.MergePerDataCenterCounters)
+ , MergePerPeerMetrics_(common->Settings.MergePerPeerCounters)
+ , Metrics_(common->Metrics)
+ , AdaptiveMetrics_(MergePerDataCenterMetrics_
+ ? PerDataCenterMetrics_ :
+ MergePerPeerMetrics_ ? Metrics_ : PerSessionMetrics_)
+ {}
+
void AddInflightDataAmount(ui64 value) override {
- InflightDataAmount_->Add(value);
- }
-
+ InflightDataAmount_->Add(value);
+ }
+
void SubInflightDataAmount(ui64 value) override {
- InflightDataAmount_->Add(-value);
- }
-
+ InflightDataAmount_->Add(-value);
+ }
+
void AddTotalBytesWritten(ui64 value) override {
- TotalBytesWritten_->Add(value);
- }
-
+ TotalBytesWritten_->Add(value);
+ }
+
void SetClockSkewMicrosec(i64 value) override {
- ClockSkewMicrosec_->Set(value);
- }
-
- void IncSessionDeaths() override {
- SessionDeaths_->Inc();
- }
-
- void IncHandshakeFails() override {
- HandshakeFails_->Inc();
- }
-
- void SetConnected(ui32 value) override {
- Connected_->Set(value);
- }
-
- void IncSubscribersCount() override {
- SubscribersCount_->Inc();
- }
-
- void SubSubscribersCount(ui32 value) override {
- SubscribersCount_->Add(-value);
- }
-
+ ClockSkewMicrosec_->Set(value);
+ }
+
+ void IncSessionDeaths() override {
+ SessionDeaths_->Inc();
+ }
+
+ void IncHandshakeFails() override {
+ HandshakeFails_->Inc();
+ }
+
+ void SetConnected(ui32 value) override {
+ Connected_->Set(value);
+ }
+
+ void IncSubscribersCount() override {
+ SubscribersCount_->Inc();
+ }
+
+ void SubSubscribersCount(ui32 value) override {
+ SubscribersCount_->Add(-value);
+ }
+
void SubOutputBuffersTotalSize(ui64 value) override {
- OutputBuffersTotalSize_->Add(-value);
- }
-
+ OutputBuffersTotalSize_->Add(-value);
+ }
+
void AddOutputBuffersTotalSize(ui64 value) override {
- OutputBuffersTotalSize_->Add(value);
- }
-
+ OutputBuffersTotalSize_->Add(value);
+ }
+
ui64 GetOutputBuffersTotalSize() const override {
- return OutputBuffersTotalSize_->Get();
- }
-
- void IncDisconnections() override {
- Disconnections_->Inc();
- }
-
- void IncUsefulWriteWakeups() override {
- UsefulWriteWakeups_->Inc();
- }
-
- void IncSpuriousWriteWakeups() override {
- SpuriousWriteWakeups_->Inc();
- }
-
- void IncSendSyscalls() override {
- SendSyscalls_->Inc();
- }
-
- void IncInflyLimitReach() override {
- InflyLimitReach_->Inc();
- }
-
- void IncUsefulReadWakeups() override {
- UsefulReadWakeups_->Inc();
- }
-
- void IncSpuriousReadWakeups() override {
- SpuriousReadWakeups_->Inc();
- }
-
- void IncDisconnectByReason(const TString& s) override {
- if (auto it = DisconnectByReason_.find(s); it != DisconnectByReason_.end()) {
- it->second->Inc();
- }
- }
-
+ return OutputBuffersTotalSize_->Get();
+ }
+
+ void IncDisconnections() override {
+ Disconnections_->Inc();
+ }
+
+ void IncUsefulWriteWakeups() override {
+ UsefulWriteWakeups_->Inc();
+ }
+
+ void IncSpuriousWriteWakeups() override {
+ SpuriousWriteWakeups_->Inc();
+ }
+
+ void IncSendSyscalls() override {
+ SendSyscalls_->Inc();
+ }
+
+ void IncInflyLimitReach() override {
+ InflyLimitReach_->Inc();
+ }
+
+ void IncUsefulReadWakeups() override {
+ UsefulReadWakeups_->Inc();
+ }
+
+ void IncSpuriousReadWakeups() override {
+ SpuriousReadWakeups_->Inc();
+ }
+
+ void IncDisconnectByReason(const TString& s) override {
+ if (auto it = DisconnectByReason_.find(s); it != DisconnectByReason_.end()) {
+ it->second->Inc();
+ }
+ }
+
void AddInputChannelsIncomingTraffic(ui16 channel, ui64 incomingTraffic) override {
- auto& ch = InputChannels_.Get(channel);
- ch.IncomingTraffic->Add(incomingTraffic);
- }
-
- void IncInputChannelsIncomingEvents(ui16 channel) override {
- auto& ch = InputChannels_.Get(channel);
- ch.IncomingEvents->Inc();
- }
-
- void IncRecvSyscalls() override {
- RecvSyscalls_->Inc();
- }
-
+ auto& ch = InputChannels_.Get(channel);
+ ch.IncomingTraffic->Add(incomingTraffic);
+ }
+
+ void IncInputChannelsIncomingEvents(ui16 channel) override {
+ auto& ch = InputChannels_.Get(channel);
+ ch.IncomingEvents->Inc();
+ }
+
+ void IncRecvSyscalls() override {
+ RecvSyscalls_->Inc();
+ }
+
void AddTotalBytesRead(ui64 value) override {
- TotalBytesRead_->Add(value);
- }
-
- void UpdateLegacyPingTimeHist(ui64 value) override {
- PingTimeHistogram_->Record(value);
- }
-
+ TotalBytesRead_->Add(value);
+ }
+
+ void UpdateLegacyPingTimeHist(ui64 value) override {
+ PingTimeHistogram_->Record(value);
+ }
+
void UpdateOutputChannelTraffic(ui16 channel, ui64 value) override {
- if (GetOutputChannel(channel).OutgoingTraffic) {
- GetOutputChannel(channel).OutgoingTraffic->Add(value);
- }
- if (GetOutputChannel(channel).Traffic) {
- GetOutputChannel(channel).Traffic->Add(value);
- }
- }
-
- void UpdateOutputChannelEvents(ui16 channel) override {
- if (GetOutputChannel(channel).OutgoingEvents) {
- GetOutputChannel(channel).OutgoingEvents->Inc();
- }
- if (GetOutputChannel(channel).Events) {
- GetOutputChannel(channel).Events->Inc();
- }
- }
-
- void SetPeerInfo(const TString& name, const TString& dataCenterId) override {
- if (name != std::exchange(HumanFriendlyPeerHostName, name)) {
- PerSessionMetrics_.reset();
- }
- VALGRIND_MAKE_READABLE(&DataCenterId, sizeof(DataCenterId));
- if (dataCenterId != std::exchange(DataCenterId, dataCenterId)) {
- PerDataCenterMetrics_.reset();
- }
-
- const bool updatePerDataCenter = !PerDataCenterMetrics_ && MergePerDataCenterMetrics_;
- if (updatePerDataCenter) {
- PerDataCenterMetrics_ = std::make_shared<NMonitoring::TMetricSubRegistry>(
- NMonitoring::TLabels{{"datacenter_id", *DataCenterId}}, Metrics_);
- }
-
- const bool updatePerSession = !PerSessionMetrics_ || updatePerDataCenter;
- if (updatePerSession) {
- auto base = MergePerDataCenterMetrics_ ? PerDataCenterMetrics_ : Metrics_;
- PerSessionMetrics_ = std::make_shared<NMonitoring::TMetricSubRegistry>(
- NMonitoring::TLabels{{"peer", *HumanFriendlyPeerHostName}}, base);
- }
-
- const bool updateGlobal = !Initialized_;
-
- const bool updateAdaptive =
- &AdaptiveMetrics_ == &Metrics_ ? updateGlobal :
- &AdaptiveMetrics_ == &PerSessionMetrics_ ? updatePerSession :
- &AdaptiveMetrics_ == &PerDataCenterMetrics_ ? updatePerDataCenter :
- false;
-
- auto createRate = [](std::shared_ptr<NMonitoring::IMetricRegistry> metrics, TStringBuf name) mutable {
- return metrics->Rate(NMonitoring::MakeLabels(NMonitoring::TLabels{{"sensor", name}}));
- };
- auto createIntGauge = [](std::shared_ptr<NMonitoring::IMetricRegistry> metrics, TStringBuf name) mutable {
- return metrics->IntGauge(NMonitoring::MakeLabels(NMonitoring::TLabels{{"sensor", name}}));
- };
-
- if (updatePerSession) {
- Connected_ = createIntGauge(PerSessionMetrics_, "interconnect.connected");
- Disconnections_ = createRate(PerSessionMetrics_, "interconnect.disconnections");
- ClockSkewMicrosec_ = createIntGauge(PerSessionMetrics_, "interconnect.clock_skew_microsec");
- Traffic_ = createRate(PerSessionMetrics_, "interconnect.traffic");
- Events_ = createRate(PerSessionMetrics_, "interconnect.events");
- ScopeErrors_ = createRate(PerSessionMetrics_, "interconnect.scope_errors");
-
- for (const auto& [id, name] : Common->ChannelName) {
- OutputChannels_.try_emplace(id, std::make_shared<NMonitoring::TMetricSubRegistry>(
- NMonitoring::TLabels{{"channel", name}}, Metrics_), Traffic_, Events_);
- }
- OtherOutputChannel_ = TOutputChannel(std::make_shared<NMonitoring::TMetricSubRegistry>(
- NMonitoring::TLabels{{"channel", "other"}}, Metrics_), Traffic_, Events_);
-
- InputChannels_ = TInputChannels(Metrics_, Common->ChannelName, Traffic_, Events_, ScopeErrors_);
- }
-
- if (updateAdaptive) {
- SessionDeaths_ = createRate(AdaptiveMetrics_, "interconnect.session_deaths");
- HandshakeFails_ = createRate(AdaptiveMetrics_, "interconnect.handshake_fails");
- InflyLimitReach_ = createRate(AdaptiveMetrics_, "interconnect.infly_limit_reach");
- InflightDataAmount_ = createRate(AdaptiveMetrics_, "interconnect.inflight_data");
- PingTimeHistogram_ = AdaptiveMetrics_->HistogramRate(
- NMonitoring::MakeLabels({{"sensor", "interconnect.ping_time_us"}}), NMonitoring::ExponentialHistogram(18, 2, 125));
- }
-
- if (updateGlobal) {
- OutputBuffersTotalSize_ = createRate(Metrics_, "interconnect.output_buffers_total_size");
- SendSyscalls_ = createRate(Metrics_, "interconnect.send_syscalls");
- RecvSyscalls_ = createRate(Metrics_, "interconnect.recv_syscalls");
- SpuriousReadWakeups_ = createRate(Metrics_, "interconnect.spurious_read_wakeups");
- UsefulReadWakeups_ = createRate(Metrics_, "interconnect.useful_read_wakeups");
- SpuriousWriteWakeups_ = createRate(Metrics_, "interconnect.spurious_write_wakeups");
- UsefulWriteWakeups_ = createRate(Metrics_, "interconnect.useful_write_wakeups");
- SubscribersCount_ = createIntGauge(AdaptiveMetrics_, "interconnect.subscribers_count");
- TotalBytesWritten_ = createRate(Metrics_, "interconnect.total_bytes_written");
- TotalBytesRead_ = createRate(Metrics_, "interconnect.total_bytes_read");
-
- for (const char *reason : TDisconnectReason::Reasons) {
+ if (GetOutputChannel(channel).OutgoingTraffic) {
+ GetOutputChannel(channel).OutgoingTraffic->Add(value);
+ }
+ if (GetOutputChannel(channel).Traffic) {
+ GetOutputChannel(channel).Traffic->Add(value);
+ }
+ }
+
+ void UpdateOutputChannelEvents(ui16 channel) override {
+ if (GetOutputChannel(channel).OutgoingEvents) {
+ GetOutputChannel(channel).OutgoingEvents->Inc();
+ }
+ if (GetOutputChannel(channel).Events) {
+ GetOutputChannel(channel).Events->Inc();
+ }
+ }
+
+ void SetPeerInfo(const TString& name, const TString& dataCenterId) override {
+ if (name != std::exchange(HumanFriendlyPeerHostName, name)) {
+ PerSessionMetrics_.reset();
+ }
+ VALGRIND_MAKE_READABLE(&DataCenterId, sizeof(DataCenterId));
+ if (dataCenterId != std::exchange(DataCenterId, dataCenterId)) {
+ PerDataCenterMetrics_.reset();
+ }
+
+ const bool updatePerDataCenter = !PerDataCenterMetrics_ && MergePerDataCenterMetrics_;
+ if (updatePerDataCenter) {
+ PerDataCenterMetrics_ = std::make_shared<NMonitoring::TMetricSubRegistry>(
+ NMonitoring::TLabels{{"datacenter_id", *DataCenterId}}, Metrics_);
+ }
+
+ const bool updatePerSession = !PerSessionMetrics_ || updatePerDataCenter;
+ if (updatePerSession) {
+ auto base = MergePerDataCenterMetrics_ ? PerDataCenterMetrics_ : Metrics_;
+ PerSessionMetrics_ = std::make_shared<NMonitoring::TMetricSubRegistry>(
+ NMonitoring::TLabels{{"peer", *HumanFriendlyPeerHostName}}, base);
+ }
+
+ const bool updateGlobal = !Initialized_;
+
+ const bool updateAdaptive =
+ &AdaptiveMetrics_ == &Metrics_ ? updateGlobal :
+ &AdaptiveMetrics_ == &PerSessionMetrics_ ? updatePerSession :
+ &AdaptiveMetrics_ == &PerDataCenterMetrics_ ? updatePerDataCenter :
+ false;
+
+ auto createRate = [](std::shared_ptr<NMonitoring::IMetricRegistry> metrics, TStringBuf name) mutable {
+ return metrics->Rate(NMonitoring::MakeLabels(NMonitoring::TLabels{{"sensor", name}}));
+ };
+ auto createIntGauge = [](std::shared_ptr<NMonitoring::IMetricRegistry> metrics, TStringBuf name) mutable {
+ return metrics->IntGauge(NMonitoring::MakeLabels(NMonitoring::TLabels{{"sensor", name}}));
+ };
+
+ if (updatePerSession) {
+ Connected_ = createIntGauge(PerSessionMetrics_, "interconnect.connected");
+ Disconnections_ = createRate(PerSessionMetrics_, "interconnect.disconnections");
+ ClockSkewMicrosec_ = createIntGauge(PerSessionMetrics_, "interconnect.clock_skew_microsec");
+ Traffic_ = createRate(PerSessionMetrics_, "interconnect.traffic");
+ Events_ = createRate(PerSessionMetrics_, "interconnect.events");
+ ScopeErrors_ = createRate(PerSessionMetrics_, "interconnect.scope_errors");
+
+ for (const auto& [id, name] : Common->ChannelName) {
+ OutputChannels_.try_emplace(id, std::make_shared<NMonitoring::TMetricSubRegistry>(
+ NMonitoring::TLabels{{"channel", name}}, Metrics_), Traffic_, Events_);
+ }
+ OtherOutputChannel_ = TOutputChannel(std::make_shared<NMonitoring::TMetricSubRegistry>(
+ NMonitoring::TLabels{{"channel", "other"}}, Metrics_), Traffic_, Events_);
+
+ InputChannels_ = TInputChannels(Metrics_, Common->ChannelName, Traffic_, Events_, ScopeErrors_);
+ }
+
+ if (updateAdaptive) {
+ SessionDeaths_ = createRate(AdaptiveMetrics_, "interconnect.session_deaths");
+ HandshakeFails_ = createRate(AdaptiveMetrics_, "interconnect.handshake_fails");
+ InflyLimitReach_ = createRate(AdaptiveMetrics_, "interconnect.infly_limit_reach");
+ InflightDataAmount_ = createRate(AdaptiveMetrics_, "interconnect.inflight_data");
+ PingTimeHistogram_ = AdaptiveMetrics_->HistogramRate(
+ NMonitoring::MakeLabels({{"sensor", "interconnect.ping_time_us"}}), NMonitoring::ExponentialHistogram(18, 2, 125));
+ }
+
+ if (updateGlobal) {
+ OutputBuffersTotalSize_ = createRate(Metrics_, "interconnect.output_buffers_total_size");
+ SendSyscalls_ = createRate(Metrics_, "interconnect.send_syscalls");
+ RecvSyscalls_ = createRate(Metrics_, "interconnect.recv_syscalls");
+ SpuriousReadWakeups_ = createRate(Metrics_, "interconnect.spurious_read_wakeups");
+ UsefulReadWakeups_ = createRate(Metrics_, "interconnect.useful_read_wakeups");
+ SpuriousWriteWakeups_ = createRate(Metrics_, "interconnect.spurious_write_wakeups");
+ UsefulWriteWakeups_ = createRate(Metrics_, "interconnect.useful_write_wakeups");
+ SubscribersCount_ = createIntGauge(AdaptiveMetrics_, "interconnect.subscribers_count");
+ TotalBytesWritten_ = createRate(Metrics_, "interconnect.total_bytes_written");
+ TotalBytesRead_ = createRate(Metrics_, "interconnect.total_bytes_read");
+
+ for (const char *reason : TDisconnectReason::Reasons) {
DisconnectByReason_[reason] = Metrics_->Rate(
NMonitoring::MakeLabels({
{"sensor", "interconnect.disconnect_reason"},
{"reason", reason},
}));
- }
- }
-
- Initialized_ = true;
- }
-
- TOutputChannel GetOutputChannel(ui16 index) const {
- Y_VERIFY(Initialized_);
- const auto it = OutputChannels_.find(index);
- return it != OutputChannels_.end() ? it->second : OtherOutputChannel_;
- }
-
- private:
- const TInterconnectProxyCommon::TPtr Common;
- const bool MergePerDataCenterMetrics_;
- const bool MergePerPeerMetrics_;
- std::shared_ptr<NMonitoring::IMetricRegistry> Metrics_;
- std::shared_ptr<NMonitoring::IMetricRegistry> PerSessionMetrics_;
- std::shared_ptr<NMonitoring::IMetricRegistry> PerDataCenterMetrics_;
- std::shared_ptr<NMonitoring::IMetricRegistry>& AdaptiveMetrics_;
- bool Initialized_ = false;
-
- NMonitoring::IRate* Traffic_;
-
- NMonitoring::IRate* Events_;
- NMonitoring::IRate* ScopeErrors_;
- NMonitoring::IRate* Disconnections_;
- NMonitoring::IIntGauge* Connected_;
-
- NMonitoring::IRate* SessionDeaths_;
- NMonitoring::IRate* HandshakeFails_;
- NMonitoring::IRate* InflyLimitReach_;
- NMonitoring::IRate* InflightDataAmount_;
- NMonitoring::IRate* OutputBuffersTotalSize_;
- NMonitoring::IIntGauge* SubscribersCount_;
- NMonitoring::IRate* SendSyscalls_;
- NMonitoring::IRate* RecvSyscalls_;
- NMonitoring::IRate* SpuriousWriteWakeups_;
- NMonitoring::IRate* UsefulWriteWakeups_;
- NMonitoring::IRate* SpuriousReadWakeups_;
- NMonitoring::IRate* UsefulReadWakeups_;
- NMonitoring::IIntGauge* ClockSkewMicrosec_;
-
- NMonitoring::IHistogram* PingTimeHistogram_;
-
- std::unordered_map<ui16, TOutputChannel> OutputChannels_;
- TOutputChannel OtherOutputChannel_;
- TInputChannels InputChannels_;
-
- THashMap<TString, NMonitoring::IRate*> DisconnectByReason_;
-
- NMonitoring::IRate* TotalBytesWritten_;
- NMonitoring::IRate* TotalBytesRead_;
- };
-
-} // namespace
-
-std::unique_ptr<IInterconnectMetrics> CreateInterconnectCounters(const TInterconnectProxyCommon::TPtr& common) {
- return std::make_unique<TInterconnectCounters>(common);
-}
-
-std::unique_ptr<IInterconnectMetrics> CreateInterconnectMetrics(const TInterconnectProxyCommon::TPtr& common) {
- return std::make_unique<TInterconnectMetrics>(common);
-}
-
+ }
+ }
+
+ Initialized_ = true;
+ }
+
+ TOutputChannel GetOutputChannel(ui16 index) const {
+ Y_VERIFY(Initialized_);
+ const auto it = OutputChannels_.find(index);
+ return it != OutputChannels_.end() ? it->second : OtherOutputChannel_;
+ }
+
+ private:
+ const TInterconnectProxyCommon::TPtr Common;
+ const bool MergePerDataCenterMetrics_;
+ const bool MergePerPeerMetrics_;
+ std::shared_ptr<NMonitoring::IMetricRegistry> Metrics_;
+ std::shared_ptr<NMonitoring::IMetricRegistry> PerSessionMetrics_;
+ std::shared_ptr<NMonitoring::IMetricRegistry> PerDataCenterMetrics_;
+ std::shared_ptr<NMonitoring::IMetricRegistry>& AdaptiveMetrics_;
+ bool Initialized_ = false;
+
+ NMonitoring::IRate* Traffic_;
+
+ NMonitoring::IRate* Events_;
+ NMonitoring::IRate* ScopeErrors_;
+ NMonitoring::IRate* Disconnections_;
+ NMonitoring::IIntGauge* Connected_;
+
+ NMonitoring::IRate* SessionDeaths_;
+ NMonitoring::IRate* HandshakeFails_;
+ NMonitoring::IRate* InflyLimitReach_;
+ NMonitoring::IRate* InflightDataAmount_;
+ NMonitoring::IRate* OutputBuffersTotalSize_;
+ NMonitoring::IIntGauge* SubscribersCount_;
+ NMonitoring::IRate* SendSyscalls_;
+ NMonitoring::IRate* RecvSyscalls_;
+ NMonitoring::IRate* SpuriousWriteWakeups_;
+ NMonitoring::IRate* UsefulWriteWakeups_;
+ NMonitoring::IRate* SpuriousReadWakeups_;
+ NMonitoring::IRate* UsefulReadWakeups_;
+ NMonitoring::IIntGauge* ClockSkewMicrosec_;
+
+ NMonitoring::IHistogram* PingTimeHistogram_;
+
+ std::unordered_map<ui16, TOutputChannel> OutputChannels_;
+ TOutputChannel OtherOutputChannel_;
+ TInputChannels InputChannels_;
+
+ THashMap<TString, NMonitoring::IRate*> DisconnectByReason_;
+
+ NMonitoring::IRate* TotalBytesWritten_;
+ NMonitoring::IRate* TotalBytesRead_;
+ };
+
+} // namespace
+
+std::unique_ptr<IInterconnectMetrics> CreateInterconnectCounters(const TInterconnectProxyCommon::TPtr& common) {
+ return std::make_unique<TInterconnectCounters>(common);
+}
+
+std::unique_ptr<IInterconnectMetrics> CreateInterconnectMetrics(const TInterconnectProxyCommon::TPtr& common) {
+ return std::make_unique<TInterconnectMetrics>(common);
+}
+
} // NActors
diff --git a/library/cpp/actors/interconnect/interconnect_counters.h b/library/cpp/actors/interconnect/interconnect_counters.h
index 6f19f343e3..e30f03a0bc 100644
--- a/library/cpp/actors/interconnect/interconnect_counters.h
+++ b/library/cpp/actors/interconnect/interconnect_counters.h
@@ -1,59 +1,59 @@
-#pragma once
-
-#include <library/cpp/actors/helpers/mon_histogram_helper.h>
-
-#include <util/system/valgrind.h>
-
-#include "types.h"
-
-#include "interconnect_common.h"
-
-#include <memory>
-#include <optional>
-
-namespace NActors {
-
-class IInterconnectMetrics {
-public:
- virtual ~IInterconnectMetrics() = default;
-
+#pragma once
+
+#include <library/cpp/actors/helpers/mon_histogram_helper.h>
+
+#include <util/system/valgrind.h>
+
+#include "types.h"
+
+#include "interconnect_common.h"
+
+#include <memory>
+#include <optional>
+
+namespace NActors {
+
+class IInterconnectMetrics {
+public:
+ virtual ~IInterconnectMetrics() = default;
+
virtual void AddInflightDataAmount(ui64 value) = 0;
virtual void SubInflightDataAmount(ui64 value) = 0;
virtual void AddTotalBytesWritten(ui64 value) = 0;
virtual void SetClockSkewMicrosec(i64 value) = 0;
- virtual void IncSessionDeaths() = 0;
- virtual void IncHandshakeFails() = 0;
- virtual void SetConnected(ui32 value) = 0;
- virtual void IncSubscribersCount() = 0;
- virtual void SubSubscribersCount(ui32 value) = 0;
+ virtual void IncSessionDeaths() = 0;
+ virtual void IncHandshakeFails() = 0;
+ virtual void SetConnected(ui32 value) = 0;
+ virtual void IncSubscribersCount() = 0;
+ virtual void SubSubscribersCount(ui32 value) = 0;
virtual void SubOutputBuffersTotalSize(ui64 value) = 0;
virtual void AddOutputBuffersTotalSize(ui64 value) = 0;
virtual ui64 GetOutputBuffersTotalSize() const = 0;
- virtual void IncDisconnections() = 0;
- virtual void IncUsefulWriteWakeups() = 0;
- virtual void IncSpuriousWriteWakeups() = 0;
- virtual void IncSendSyscalls() = 0;
- virtual void IncInflyLimitReach() = 0;
- virtual void IncDisconnectByReason(const TString& s) = 0;
- virtual void IncUsefulReadWakeups() = 0;
- virtual void IncSpuriousReadWakeups() = 0;
- virtual void SetPeerInfo(const TString& name, const TString& dataCenterId) = 0;
+ virtual void IncDisconnections() = 0;
+ virtual void IncUsefulWriteWakeups() = 0;
+ virtual void IncSpuriousWriteWakeups() = 0;
+ virtual void IncSendSyscalls() = 0;
+ virtual void IncInflyLimitReach() = 0;
+ virtual void IncDisconnectByReason(const TString& s) = 0;
+ virtual void IncUsefulReadWakeups() = 0;
+ virtual void IncSpuriousReadWakeups() = 0;
+ virtual void SetPeerInfo(const TString& name, const TString& dataCenterId) = 0;
virtual void AddInputChannelsIncomingTraffic(ui16 channel, ui64 incomingTraffic) = 0;
- virtual void IncInputChannelsIncomingEvents(ui16 channel) = 0;
- virtual void IncRecvSyscalls() = 0;
+ virtual void IncInputChannelsIncomingEvents(ui16 channel) = 0;
+ virtual void IncRecvSyscalls() = 0;
virtual void AddTotalBytesRead(ui64 value) = 0;
- virtual void UpdateLegacyPingTimeHist(ui64 value) = 0;
+ virtual void UpdateLegacyPingTimeHist(ui64 value) = 0;
virtual void UpdateOutputChannelTraffic(ui16 channel, ui64 value) = 0;
- virtual void UpdateOutputChannelEvents(ui16 channel) = 0;
- TString GetHumanFriendlyPeerHostName() const {
- return HumanFriendlyPeerHostName.value_or(TString());
- }
-
-protected:
- std::optional<TString> DataCenterId;
- std::optional<TString> HumanFriendlyPeerHostName;
-};
-
-std::unique_ptr<IInterconnectMetrics> CreateInterconnectCounters(const NActors::TInterconnectProxyCommon::TPtr& common);
-std::unique_ptr<IInterconnectMetrics> CreateInterconnectMetrics(const NActors::TInterconnectProxyCommon::TPtr& common);
-} // NActors
+ virtual void UpdateOutputChannelEvents(ui16 channel) = 0;
+ TString GetHumanFriendlyPeerHostName() const {
+ return HumanFriendlyPeerHostName.value_or(TString());
+ }
+
+protected:
+ std::optional<TString> DataCenterId;
+ std::optional<TString> HumanFriendlyPeerHostName;
+};
+
+std::unique_ptr<IInterconnectMetrics> CreateInterconnectCounters(const NActors::TInterconnectProxyCommon::TPtr& common);
+std::unique_ptr<IInterconnectMetrics> CreateInterconnectMetrics(const NActors::TInterconnectProxyCommon::TPtr& common);
+} // NActors
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index c97969815c..0abe9fe659 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -8,7 +8,7 @@ namespace NActors {
TInputSessionTCP::TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket,
TIntrusivePtr<TReceiveContext> context, TInterconnectProxyCommon::TPtr common,
- std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId, ui64 lastConfirmed,
+ std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId, ui64 lastConfirmed,
TDuration deadPeerTimeout, TSessionParams params)
: SessionId(sessionId)
, Socket(std::move(socket))
@@ -17,7 +17,7 @@ namespace NActors {
, NodeId(nodeId)
, Params(std::move(params))
, ConfirmedByInput(lastConfirmed)
- , Metrics(std::move(metrics))
+ , Metrics(std::move(metrics))
, DeadPeerTimeout(deadPeerTimeout)
{
Y_VERIFY(Context);
@@ -26,7 +26,7 @@ namespace NActors {
AtomicSet(Context->PacketsReadFromSocket, 0);
- Metrics->SetClockSkewMicrosec(0);
+ Metrics->SetClockSkewMicrosec(0);
Context->UpdateState = EUpdateState::NONE;
@@ -50,9 +50,9 @@ namespace NActors {
void TInputSessionTCP::Handle(TEvPollerReady::TPtr ev) {
if (Context->ReadPending) {
- Metrics->IncUsefulReadWakeups();
+ Metrics->IncUsefulReadWakeups();
} else if (!ev->Cookie) {
- Metrics->IncSpuriousReadWakeups();
+ Metrics->IncSpuriousReadWakeups();
}
Context->ReadPending = false;
ReceiveData();
@@ -197,7 +197,7 @@ namespace NActors {
ui64 sendTime = AtomicGet(Context->ControlPacketSendTimer);
TDuration duration = CyclesToDuration(GetCycleCountFast() - sendTime);
const auto durationUs = duration.MicroSeconds();
- Metrics->UpdateLegacyPingTimeHist(durationUs);
+ Metrics->UpdateLegacyPingTimeHist(durationUs);
PingQ.push_back(duration);
if (PingQ.size() > 16) {
PingQ.pop_front();
@@ -268,7 +268,7 @@ namespace NActors {
? &Context->ChannelArray[channel]
: &Context->ChannelMap[channel];
- Metrics->AddInputChannelsIncomingTraffic(channel, sizeof(part) + part.Size);
+ Metrics->AddInputChannelsIncomingTraffic(channel, sizeof(part) + part.Size);
TEventDescr descr;
if (~part.Channel & TChannelPart::LastPartFlag) {
@@ -277,7 +277,7 @@ namespace NActors {
LOG_CRIT_IC_SESSION("ICIS11", "incorrect last part of an event");
return DestroySession(TDisconnectReason::FormatError());
} else if (Payload.ExtractFrontPlain(&descr, sizeof(descr))) {
- Metrics->IncInputChannelsIncomingEvents(channel);
+ Metrics->IncInputChannelsIncomingEvents(channel);
ProcessEvent(*eventData, descr);
*eventData = TRope();
} else {
@@ -359,7 +359,7 @@ namespace NActors {
#else
recvres = Socket->Recv(iovec[0].iov_base, iovec[0].iov_len, &err);
#endif
- Metrics->IncRecvSyscalls();
+ Metrics->IncRecvSyscalls();
} while (recvres == -EINTR);
}
@@ -388,7 +388,7 @@ namespace NActors {
}
Y_VERIFY(recvres > 0);
- Metrics->AddTotalBytesRead(recvres);
+ Metrics->AddTotalBytesRead(recvres);
TDeque<TIntrusivePtr<TRopeAlignedBuffer>>::iterator it;
for (it = Buffers.begin(); recvres; ++it) {
Y_VERIFY(it != Buffers.end());
@@ -451,7 +451,7 @@ namespace NActors {
const auto pingUs = ping.MicroSeconds();
Context->PingRTT_us = pingUs;
NewPingProtocol = true;
- Metrics->UpdateLegacyPingTimeHist(pingUs);
+ Metrics->UpdateLegacyPingTimeHist(pingUs);
}
void TInputSessionTCP::HandleClock(TInstant clock) {
@@ -469,7 +469,7 @@ namespace NActors {
}
}
Context->ClockSkew_us = clockSkew;
- Metrics->SetClockSkewMicrosec(clockSkew);
+ Metrics->SetClockSkewMicrosec(clockSkew);
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
index ea3777b1a1..7e2d8ccb94 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
@@ -108,10 +108,10 @@ namespace NActors {
auto& info = *ev->Get()->Node;
TString name = PeerNameForHuman(PeerNodeId, info.Host, info.Port);
TechnicalPeerHostName = info.Host;
- if (!Metrics) {
- Metrics = Common->Metrics ? CreateInterconnectMetrics(Common) : CreateInterconnectCounters(Common);
+ if (!Metrics) {
+ Metrics = Common->Metrics ? CreateInterconnectMetrics(Common) : CreateInterconnectCounters(Common);
}
- Metrics->SetPeerInfo(name, info.Location.GetDataCenterId());
+ Metrics->SetPeerInfo(name, info.Location.GetDataCenterId());
LOG_DEBUG_IC("ICP02", "configured for host %s", name.data());
@@ -416,8 +416,8 @@ namespace NActors {
return;
}
- if (Metrics) {
- Metrics->IncHandshakeFails();
+ if (Metrics) {
+ Metrics->IncHandshakeFails();
}
if (IncomingHandshakeActor || OutgoingHandshakeActor) {
@@ -545,8 +545,8 @@ namespace NActors {
SessionVirtualId = TActorId();
RemoteSessionVirtualId = TActorId();
- if (Metrics) {
- Metrics->IncSessionDeaths();
+ if (Metrics) {
+ Metrics->IncSessionDeaths();
}
LastSessionDieTime = TActivationContext::Now();
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
index 2ef30a20e8..023e5bd1ee 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
@@ -372,7 +372,7 @@ namespace NActors {
TString TechnicalPeerHostName;
- std::shared_ptr<IInterconnectMetrics> Metrics;
+ std::shared_ptr<IInterconnectMetrics> Metrics;
void HandleClosePeerSocket();
void HandleCloseInputSession();
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 74e0be4b3e..2ded7f9f53 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -40,7 +40,7 @@ namespace NActors {
, OutputQueueUtilization(16)
, OutputCounter(0ULL)
{
- Proxy->Metrics->SetConnected(0);
+ Proxy->Metrics->SetConnected(0);
ReceiveContext.Reset(new TReceiveContext);
}
@@ -56,7 +56,7 @@ namespace NActors {
as->Send(id, event.Release());
};
Pool.ConstructInPlace(Proxy->Common, std::move(destroyCallback));
- ChannelScheduler.ConstructInPlace(Proxy->PeerNodeId, Proxy->Common->ChannelsConfig, Proxy->Metrics, *Pool,
+ ChannelScheduler.ConstructInPlace(Proxy->PeerNodeId, Proxy->Common->ChannelsConfig, Proxy->Metrics, *Pool,
Proxy->Common->Settings.MaxSerializedEventSize, Params);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session created", Proxy->PeerNodeId);
@@ -85,7 +85,7 @@ namespace NActors {
for (const auto& kv : Subscribers) {
Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second);
}
- Proxy->Metrics->SubSubscribersCount(Subscribers.size());
+ Proxy->Metrics->SubSubscribersCount(Subscribers.size());
Subscribers.clear();
ChannelScheduler->ForEach([&](TEventOutputChannel& channel) {
@@ -98,13 +98,13 @@ namespace NActors {
SendUpdateToWhiteboard(false);
- Proxy->Metrics->SubOutputBuffersTotalSize(TotalOutputQueueSize);
- Proxy->Metrics->SubInflightDataAmount(InflightDataAmount);
+ Proxy->Metrics->SubOutputBuffersTotalSize(TotalOutputQueueSize);
+ Proxy->Metrics->SubInflightDataAmount(InflightDataAmount);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session destroyed", Proxy->PeerNodeId);
if (!Subscribers.empty()) {
- Proxy->Metrics->SubSubscribersCount(Subscribers.size());
+ Proxy->Metrics->SubSubscribersCount(Subscribers.size());
}
TActor::PassAway();
@@ -132,7 +132,7 @@ namespace NActors {
LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize);
TotalOutputQueueSize += dataSize;
- Proxy->Metrics->AddOutputBuffersTotalSize(dataSize);
+ Proxy->Metrics->AddOutputBuffersTotalSize(dataSize);
if (!wasWorking) {
// this channel has returned to work -- it was empty and this we have just put first event in the queue
ChannelScheduler->AddToHeap(oChannel, EqualizeCounter);
@@ -155,7 +155,7 @@ namespace NActors {
}
ui64 outputBuffersTotalSizeLimit = Proxy->Common->Settings.OutputBuffersTotalSizeLimitInMB * ui64(1 << 20);
- if (outputBuffersTotalSizeLimit != 0 && static_cast<ui64>(Proxy->Metrics->GetOutputBuffersTotalSize()) > outputBuffersTotalSizeLimit) {
+ if (outputBuffersTotalSizeLimit != 0 && static_cast<ui64>(Proxy->Metrics->GetOutputBuffersTotalSize()) > outputBuffersTotalSizeLimit) {
LOG_ERROR_IC_SESSION("ICS77", "Exceeded total limit on output buffers size");
if (AtomicTryLock(&Proxy->Common->StartedSessionKiller)) {
CreateSessionKillingActor(Proxy->Common);
@@ -188,7 +188,7 @@ namespace NActors {
LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data());
const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie);
if (inserted) {
- Proxy->Metrics->IncSubscribersCount();
+ Proxy->Metrics->IncSubscribersCount();
} else {
it->second = ev->Cookie;
}
@@ -197,7 +197,7 @@ namespace NActors {
void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) {
LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data());
- Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender));
+ Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender));
}
THolder<TEvHandshakeAck> TInterconnectSessionTCP::ProcessHandshakeRequest(TEvHandshakeAsk::TPtr& ev) {
@@ -243,7 +243,7 @@ namespace NActors {
// create input session actor
auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common,
- Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params);
+ Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params);
ReceiveContext->UnlockLastProcessedPacketSerial();
ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled);
@@ -254,7 +254,7 @@ namespace NActors {
ReceiveContext->WriteBlockedByFullSendBuffer = false;
LostConnectionWatchdog.Disarm();
- Proxy->Metrics->SetConnected(1);
+ Proxy->Metrics->SetConnected(1);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] connected", Proxy->PeerNodeId);
// arm pinger timer
@@ -264,7 +264,7 @@ namespace NActors {
// REINITIALIZE SEND QUEUE
//
// scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other
- // auxiliary packets; also reset packet metrics to zero to start sending from the beginning
+ // auxiliary packets; also reset packet metrics to zero to start sending from the beginning
// also reset SendQueuePos
// drop confirmed packets first as we do not need unwanted retransmissions
@@ -489,17 +489,17 @@ namespace NActors {
void TInterconnectSessionTCP::ShutdownSocket(TDisconnectReason reason) {
if (Socket) {
if (const TString& s = reason.ToString()) {
- Proxy->Metrics->IncDisconnectByReason(s);
+ Proxy->Metrics->IncDisconnectByReason(s);
}
LOG_INFO_IC_SESSION("ICS25", "shutdown socket, reason# %s", reason.ToString().data());
Proxy->UpdateErrorStateLog(TActivationContext::Now(), "close_socket", reason.ToString().data());
Socket->Shutdown(SHUT_RDWR);
Socket.Reset();
- Proxy->Metrics->IncDisconnections();
+ Proxy->Metrics->IncDisconnections();
CloseOnIdleWatchdog.Disarm();
LostConnectionWatchdog.Arm(SelfId());
- Proxy->Metrics->SetConnected(0);
+ Proxy->Metrics->SetConnected(0);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId);
}
}
@@ -519,14 +519,14 @@ namespace NActors {
LOG_DEBUG_IC_SESSION("ICS29", "HandleReadyWrite WriteBlockedByFullSendBuffer# %s",
ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false");
if (std::exchange(ReceiveContext->WriteBlockedByFullSendBuffer, false)) {
- Proxy->Metrics->IncUsefulWriteWakeups();
+ Proxy->Metrics->IncUsefulWriteWakeups();
ui64 nowCycles = GetCycleCountFast();
double blockedUs = NHPTimer::GetSeconds(nowCycles - WriteBlockedCycles) * 1000000.0;
LWPROBE(ReadyWrite, Proxy->PeerNodeId, NHPTimer::GetSeconds(nowCycles - ev->SendTime) * 1000.0, blockedUs / 1000.0);
WriteBlockedTotal += TDuration::MicroSeconds(blockedUs);
GenerateTraffic();
} else if (!ev->Cookie) {
- Proxy->Metrics->IncSpuriousWriteWakeups();
+ Proxy->Metrics->IncSpuriousWriteWakeups();
}
if (Params.Encryption && ReceiveContext->ReadPending && !ev->Cookie) {
Send(ReceiverId, ev->Release().Release(), 0, 1);
@@ -591,7 +591,7 @@ namespace NActors {
#else
r = Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err);
#endif
- Proxy->Metrics->IncSendSyscalls();
+ Proxy->Metrics->IncSendSyscalls();
} while (r == -EINTR);
LOG_DEBUG_IC_SESSION("ICS16", "written# %zd iovcnt# %d err# %s", r, iovcnt, err.data());
@@ -621,7 +621,7 @@ namespace NActors {
: Sprintf("socket: %s", strerror(-r));
LOG_NOTICE_NET(Proxy->PeerNodeId, "%s", message.data());
if (written) {
- Proxy->Metrics->AddTotalBytesWritten(written);
+ Proxy->Metrics->AddTotalBytesWritten(written);
}
return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r));
} else {
@@ -651,7 +651,7 @@ namespace NActors {
}
}
if (written) {
- Proxy->Metrics->AddTotalBytesWritten(written);
+ Proxy->Metrics->AddTotalBytesWritten(written);
}
}
@@ -753,9 +753,9 @@ namespace NActors {
Y_VERIFY(!packet->IsEmpty());
InflightDataAmount += packet->GetDataSize();
- Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize());
+ Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize());
if (InflightDataAmount > GetTotalInflightAmountOfData()) {
- Proxy->Metrics->IncInflyLimitReach();
+ Proxy->Metrics->IncInflyLimitReach();
}
if (AtomicGet(ReceiveContext->ControlPacketId) == 0) {
@@ -842,7 +842,7 @@ namespace NActors {
PacketsConfirmed += numDropped;
InflightDataAmount -= droppedDataAmount;
- Proxy->Metrics->SubInflightDataAmount(droppedDataAmount);
+ Proxy->Metrics->SubInflightDataAmount(droppedDataAmount);
LWPROBE(DropConfirmed, Proxy->PeerNodeId, droppedDataAmount, InflightDataAmount);
LOG_DEBUG_IC_SESSION("ICS24", "exit InflightDataAmount: %" PRIu64 " bytes droppedDataAmount: %" PRIu64 " bytes"
@@ -870,9 +870,9 @@ namespace NActors {
Y_VERIFY_DEBUG(netAfter <= netBefore); // net amount should shrink
const ui64 net = netBefore - netAfter; // number of net bytes serialized
- // adjust metrics for local and global queue size
+ // adjust metrics for local and global queue size
TotalOutputQueueSize -= net;
- Proxy->Metrics->SubOutputBuffersTotalSize(net);
+ Proxy->Metrics->SubOutputBuffersTotalSize(net);
bytesGenerated += gross;
Y_VERIFY_DEBUG(!!net == !!gross && gross >= net, "net# %" PRIu64 " gross# %" PRIu64, net, gross);
@@ -944,7 +944,7 @@ namespace NActors {
} while (false);
}
- callback(Proxy->Metrics->GetHumanFriendlyPeerHostName(),
+ callback(Proxy->Metrics->GetHumanFriendlyPeerHostName(),
connected,
flagState == EFlag::GREEN,
flagState == EFlag::YELLOW,
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 7ad7fb84d2..7fc00dbcc5 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -183,7 +183,7 @@ namespace NActors {
TIntrusivePtr<NInterconnect::TStreamSocket> socket,
TIntrusivePtr<TReceiveContext> context,
TInterconnectProxyCommon::TPtr common,
- std::shared_ptr<IInterconnectMetrics> metrics,
+ std::shared_ptr<IInterconnectMetrics> metrics,
ui32 nodeId,
ui64 lastConfirmed,
TDuration deadPeerTimeout,
@@ -237,7 +237,7 @@ namespace NActors {
ui64 ConfirmedByInput;
- std::shared_ptr<IInterconnectMetrics> Metrics;
+ std::shared_ptr<IInterconnectMetrics> Metrics;
bool CloseInputSessionRequested = false;
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
index 5e9a12a56a..565a511859 100644
--- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -9,12 +9,12 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
Y_UNIT_TEST(PriorityTraffic) {
auto common = MakeIntrusive<TInterconnectProxyCommon>();
common->MonCounters = MakeIntrusive<NMonitoring::TDynamicCounters>();
- std::shared_ptr<IInterconnectMetrics> ctr = CreateInterconnectCounters(common);
- ctr->SetPeerInfo("peer", "1");
+ std::shared_ptr<IInterconnectMetrics> ctr = CreateInterconnectCounters(common);
+ ctr->SetPeerInfo("peer", "1");
auto callback = [](THolder<IEventBase>) {};
TEventHolderPool pool(common, callback);
TSessionParams p;
- TChannelScheduler scheduler(1, {}, ctr, pool, 64 << 20, p);
+ TChannelScheduler scheduler(1, {}, ctr, pool, 64 << 20, p);
ui32 numEvents = 0;
diff --git a/library/cpp/actors/interconnect/ya.make b/library/cpp/actors/interconnect/ya.make
index 47fd731e01..60d29b0fc0 100644
--- a/library/cpp/actors/interconnect/ya.make
+++ b/library/cpp/actors/interconnect/ya.make
@@ -22,7 +22,7 @@ SRCS(
interconnect_channel.cpp
interconnect_channel.h
interconnect_common.h
- interconnect_counters.cpp
+ interconnect_counters.cpp
interconnect.h
interconnect_handshake.cpp
interconnect_handshake.h
@@ -85,7 +85,7 @@ PEERDIR(
library/cpp/json
library/cpp/lwtrace
library/cpp/monlib/dynamic_counters
- library/cpp/monlib/metrics
+ library/cpp/monlib/metrics
library/cpp/monlib/service/pages/tablesorter
library/cpp/openssl/init
library/cpp/packedtypes