aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Efimov <xeno@prnwatch.com>2022-03-24 12:43:54 +0300
committerAlexey Efimov <xeno@prnwatch.com>2022-03-24 12:43:54 +0300
commit43a2b5dd73d8a7aa80d4c1ba4ee70428fffbd17e (patch)
treeabebfb404c82a24c52c2372f497eadefd899dfb3
parent057f22cbb5bfa82df4bfde220d5aaaa08f34257b (diff)
downloadydb-43a2b5dd73d8a7aa80d4c1ba4ee70428fffbd17e.tar.gz
switch metrics registry to shared_ptr KIKIMR-14218
ref:5ef16c21e373b05982980dcd3016950f15a884e8
-rw-r--r--library/cpp/actors/core/process_stats.cpp51
-rw-r--r--library/cpp/actors/core/process_stats.h1
-rw-r--r--library/cpp/actors/http/http_proxy.cpp62
-rw-r--r--library/cpp/actors/http/http_proxy.h2
-rw-r--r--library/cpp/actors/http/http_ut.cpp9
-rw-r--r--library/cpp/monlib/metrics/metric_registry.cpp6
-rw-r--r--library/cpp/monlib/metrics/metric_registry.h1
-rw-r--r--ydb/core/cms/console/net_classifier_updater.cpp3
-rw-r--r--ydb/core/cms/console/net_classifier_updater_ut.cpp2
-rw-r--r--ydb/core/mind/address_classification/net_classifier_ut.cpp2
-rw-r--r--ydb/core/yq/libs/init/init.cpp2
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp2
-rw-r--r--ydb/services/persqueue_cluster_discovery/cluster_discovery_service_ut.cpp3
13 files changed, 104 insertions, 42 deletions
diff --git a/library/cpp/actors/core/process_stats.cpp b/library/cpp/actors/core/process_stats.cpp
index ef793760b5..0fdad83da5 100644
--- a/library/cpp/actors/core/process_stats.cpp
+++ b/library/cpp/actors/core/process_stats.cpp
@@ -231,7 +231,6 @@ namespace {
NMonitoring::TDynamicCounters::TCounterPtr SystemUptimeSeconds;
};
-
class TRegistryCollector: public TProcStatCollectingActor<TRegistryCollector> {
using TBase = TProcStatCollectingActor<TRegistryCollector>;
public:
@@ -290,6 +289,52 @@ namespace {
NMonitoring::TIntGauge* NumThreads;
NMonitoring::TIntGauge* SystemUptimeSeconds;
};
+
+ class TRegistryCollectorShared: public TProcStatCollectingActor<TRegistryCollectorShared> {
+ using TBase = TProcStatCollectingActor<TRegistryCollectorShared>;
+ public:
+ TRegistryCollectorShared(TDuration interval, std::weak_ptr<NMonitoring::TMetricRegistry> registry)
+ : TBase{interval}
+ , Registry(std::move(registry))
+ {
+ }
+
+ void UpdateCounters(const TProcStat& procStat) {
+ std::shared_ptr<NMonitoring::TMetricRegistry> registry = Registry.lock();
+ if (registry) {
+ registry->IntGauge({{"sensor", "process.VmSize"}})->Set(procStat.Vsize);
+ registry->IntGauge({{"sensor", "process.AnonRssSize"}})->Set(procStat.AnonRss);
+ registry->IntGauge({{"sensor", "process.FileRssSize"}})->Set(procStat.FileRss);
+ registry->IntGauge({{"sensor", "process.CGroupMemLimit"}})->Set(procStat.CGroupMemLim);
+ registry->IntGauge({{"sensor", "process.UptimeSeconds"}})->Set(procStat.Uptime.Seconds());
+ registry->IntGauge({{"sensor", "process.NumThreads"}})->Set(procStat.NumThreads);
+ registry->IntGauge({{"sensor", "system.UptimeSeconds"}})->Set(procStat.SystemUptime.Seconds());
+
+ // it is ok here to reset and add metric value, because mutation
+ // is performed in siglethreaded context
+
+ NMonitoring::TRate* userTime = registry->Rate({{"sensor", "process.UserTime"}});
+ NMonitoring::TRate* sysTime = registry->Rate({{"sensor", "process.SystemTime"}});
+ NMonitoring::TRate* minorPageFaults = registry->Rate({{"sensor", "process.MinorPageFaults"}});
+ NMonitoring::TRate* majorPageFaults = registry->Rate({{"sensor", "process.MajorPageFaults"}});
+
+ userTime->Reset();
+ userTime->Add(procStat.Utime);
+
+ sysTime->Reset();
+ sysTime->Add(procStat.Stime);
+
+ minorPageFaults->Reset();
+ minorPageFaults->Add(procStat.MinFlt);
+
+ majorPageFaults->Reset();
+ majorPageFaults->Add(procStat.MajFlt);
+ }
+ }
+
+ private:
+ std::weak_ptr<NMonitoring::TMetricRegistry> Registry;
+ };
} // namespace
IActor* CreateProcStatCollector(ui32 intervalSec, NMonitoring::TDynamicCounterPtr counters) {
@@ -299,4 +344,8 @@ namespace {
IActor* CreateProcStatCollector(TDuration interval, NMonitoring::TMetricRegistry& registry) {
return new TRegistryCollector(interval, registry);
}
+
+ IActor* CreateProcStatCollector(TDuration interval, std::weak_ptr<NMonitoring::TMetricRegistry> registry) {
+ return new TRegistryCollectorShared(interval, std::move(registry));
+ }
}
diff --git a/library/cpp/actors/core/process_stats.h b/library/cpp/actors/core/process_stats.h
index 66346d0b5a..5681f0eb1a 100644
--- a/library/cpp/actors/core/process_stats.h
+++ b/library/cpp/actors/core/process_stats.h
@@ -63,4 +63,5 @@ namespace NActors {
IActor* CreateProcStatCollector(ui32 intervalSec, NMonitoring::TDynamicCounterPtr counters);
IActor* CreateProcStatCollector(TDuration interval, NMonitoring::TMetricRegistry& registry);
+ IActor* CreateProcStatCollector(TDuration interval, std::weak_ptr<NMonitoring::TMetricRegistry> registry);
}
diff --git a/library/cpp/actors/http/http_proxy.cpp b/library/cpp/actors/http/http_proxy.cpp
index 36c6855d93..e0204f6ed0 100644
--- a/library/cpp/actors/http/http_proxy.cpp
+++ b/library/cpp/actors/http/http_proxy.cpp
@@ -26,8 +26,8 @@ public:
Become(&THttpProxy::StateWork);
}
- THttpProxy(NMonitoring::TMetricRegistry& sensors)
- : Sensors(sensors)
+ THttpProxy(std::weak_ptr<NMonitoring::TMetricRegistry> registry)
+ : Registry(std::move(registry))
{}
protected:
@@ -175,29 +175,35 @@ protected:
const static TString urlNotFound = "not-found";
const TString& url = (sensors.Status == "404" ? urlNotFound : sensors.Url);
- Sensors.Rate({
- {"sensor", "count"},
- {"direction", sensors.Direction},
- {"peer", sensors.Host},
- {"url", url},
- {"status", sensors.Status}
- })->Inc();
- Sensors.HistogramRate({
- {"sensor", "time_us"},
- {"direction", sensors.Direction},
- {"peer", sensors.Host},
- {"url", url},
- {"status", sensors.Status}
- },
- NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MicroSeconds());
- Sensors.HistogramRate({
- {"sensor", "time_ms"},
- {"direction", sensors.Direction},
- {"peer", sensors.Host},
- {"url", url},
- {"status", sensors.Status}
- },
- NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MilliSeconds());
+ std::shared_ptr<NMonitoring::TMetricRegistry> registry = Registry.lock();
+ if (registry) {
+ registry->Rate(
+ {
+ {"sensor", "count"},
+ {"direction", sensors.Direction},
+ {"peer", sensors.Host},
+ {"url", url},
+ {"status", sensors.Status}
+ })->Inc();
+ registry->HistogramRate(
+ {
+ {"sensor", "time_us"},
+ {"direction", sensors.Direction},
+ {"peer", sensors.Host},
+ {"url", url},
+ {"status", sensors.Status}
+ },
+ NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MicroSeconds());
+ registry->HistogramRate(
+ {
+ {"sensor", "time_ms"},
+ {"direction", sensors.Direction},
+ {"peer", sensors.Host},
+ {"url", url},
+ {"status", sensors.Status}
+ },
+ NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MilliSeconds());
+ }
}
void Handle(NActors::TEvents::TEvPoison::TPtr, const NActors::TActorContext&) {
@@ -217,7 +223,7 @@ protected:
THashMap<TString, THostEntry> Hosts;
THashMap<TString, TActorId> Handlers;
THashSet<TActorId> Connections; // outgoing
- NMonitoring::TMetricRegistry& Sensors;
+ std::weak_ptr<NMonitoring::TMetricRegistry> Registry;
};
TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response) {
@@ -240,8 +246,8 @@ TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingR
);
}
-NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors) {
- return new THttpProxy(sensors);
+NActors::IActor* CreateHttpProxy(std::weak_ptr<NMonitoring::TMetricRegistry> registry) {
+ return new THttpProxy(std::move(registry));
}
bool IsIPv6(const TString& host) {
diff --git a/library/cpp/actors/http/http_proxy.h b/library/cpp/actors/http/http_proxy.h
index afd0170997..92002b047d 100644
--- a/library/cpp/actors/http/http_proxy.h
+++ b/library/cpp/actors/http/http_proxy.h
@@ -225,7 +225,7 @@ struct TEndpointInfo {
TSslHelpers::TSslHolder<SSL_CTX> SecureContext;
};
-NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors);
+NActors::IActor* CreateHttpProxy(std::weak_ptr<NMonitoring::TMetricRegistry> registry = NMonitoring::TMetricRegistry::SharedInstance());
NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller);
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller);
NActors::IActor* CreateIncomingConnectionActor(
diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp
index 4c922f8d0f..caa5b3e183 100644
--- a/library/cpp/actors/http/http_ut.cpp
+++ b/library/cpp/actors/http/http_ut.cpp
@@ -180,9 +180,8 @@ Y_UNIT_TEST_SUITE(HttpProxy) {
TIpPort port = portManager.GetTcpPort();
TAutoPtr<NActors::IEventHandle> handle;
actorSystem.Initialize();
- NMonitoring::TMetricRegistry sensors;
- NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors);
+ NActors::IActor* proxy = NHttp::CreateHttpProxy();
NActors::TActorId proxyId = actorSystem.Register(proxy);
actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true);
actorSystem.DispatchEvents();
@@ -213,7 +212,6 @@ Y_UNIT_TEST_SUITE(HttpProxy) {
TIpPort port = portManager.GetTcpPort();
TAutoPtr<NActors::IEventHandle> handle;
actorSystem.Initialize();
- NMonitoring::TMetricRegistry sensors;
TString certificateContent = R"___(-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCzRZjodO7Aqe1w
@@ -272,7 +270,7 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V
certificateFile.Write(certificateContent.data(), certificateContent.size());
- NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors);
+ NActors::IActor* proxy = NHttp::CreateHttpProxy();
NActors::TActorId proxyId = actorSystem.Register(proxy);
THolder<NHttp::TEvHttpProxy::TEvAddListeningPort> add = MakeHolder<NHttp::TEvHttpProxy::TEvAddListeningPort>(port);
@@ -332,9 +330,8 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V
TIpPort port = portManager.GetTcpPort();
TAutoPtr<NActors::IEventHandle> handle;
actorSystem.Initialize();
- NMonitoring::TMetricRegistry sensors;
- NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors);
+ NActors::IActor* proxy = NHttp::CreateHttpProxy();
NActors::TActorId proxyId = actorSystem.Register(proxy);
actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true);
actorSystem.DispatchEvents();
diff --git a/library/cpp/monlib/metrics/metric_registry.cpp b/library/cpp/monlib/metrics/metric_registry.cpp
index 74260e3069..3d7ffd40cf 100644
--- a/library/cpp/monlib/metrics/metric_registry.cpp
+++ b/library/cpp/monlib/metrics/metric_registry.cpp
@@ -43,9 +43,15 @@ namespace NMonitoring {
TMetricRegistry& TMetricRegistry::operator=(TMetricRegistry&& other) = default;
TMetricRegistry* TMetricRegistry::Instance() {
+ //return SharedInstance().get();
return Singleton<TMetricRegistry>();
}
+ std::shared_ptr<TMetricRegistry> TMetricRegistry::SharedInstance() {
+ static auto instance(std::make_shared<TMetricRegistry>());
+ return instance;
+ }
+
TGauge* TMetricRegistry::Gauge(TLabels labels) {
return Metric<TGauge, EMetricType::GAUGE>(std::move(labels));
}
diff --git a/library/cpp/monlib/metrics/metric_registry.h b/library/cpp/monlib/metrics/metric_registry.h
index 06e61d9991..faba19e845 100644
--- a/library/cpp/monlib/metrics/metric_registry.h
+++ b/library/cpp/monlib/metrics/metric_registry.h
@@ -67,6 +67,7 @@ namespace NMonitoring {
* Get a global metrics registry instance.
*/
static TMetricRegistry* Instance();
+ static std::shared_ptr<TMetricRegistry> SharedInstance();
TGauge* Gauge(TLabels labels);
TLazyGauge* LazyGauge(TLabels labels, std::function<double()> supplier);
diff --git a/ydb/core/cms/console/net_classifier_updater.cpp b/ydb/core/cms/console/net_classifier_updater.cpp
index 121a65efc3..d863c748a0 100644
--- a/ydb/core/cms/console/net_classifier_updater.cpp
+++ b/ydb/core/cms/console/net_classifier_updater.cpp
@@ -67,6 +67,7 @@ private:
public:
NetClassifierUpdater(TActorId localConsole)
: LocalConsole(localConsole)
+ , HttpSensors(std::make_shared<NMonitoring::TMetricRegistry>())
{
}
@@ -328,7 +329,7 @@ private:
private:
TActorId LocalConsole;
TActorId HttpProxyId;
- NMonitoring::TMetricRegistry HttpSensors;
+ std::shared_ptr<NMonitoring::TMetricRegistry> HttpSensors;
TString PackedNetData;
TString LastUpdateDatetimeUTC;
diff --git a/ydb/core/cms/console/net_classifier_updater_ut.cpp b/ydb/core/cms/console/net_classifier_updater_ut.cpp
index da41ee51f2..b340b1c317 100644
--- a/ydb/core/cms/console/net_classifier_updater_ut.cpp
+++ b/ydb/core/cms/console/net_classifier_updater_ut.cpp
@@ -133,7 +133,7 @@ Y_UNIT_TEST_SUITE(TNetClassifierUpdaterTest) {
TNetClassifierUpdaterConfig::EFormat format = TNetClassifierUpdaterConfig::TSV,
const TVector<TString>& netBoxTags = {}
) {
- NMonitoring::TMetricRegistry sensors;
+ auto sensors(std::make_shared<NMonitoring::TMetricRegistry>());
TPortManager pm;
const ui16 port = pm.GetPort(2134);
diff --git a/ydb/core/mind/address_classification/net_classifier_ut.cpp b/ydb/core/mind/address_classification/net_classifier_ut.cpp
index 088d25b9c7..67262bc42f 100644
--- a/ydb/core/mind/address_classification/net_classifier_ut.cpp
+++ b/ydb/core/mind/address_classification/net_classifier_ut.cpp
@@ -144,7 +144,7 @@ Y_UNIT_TEST_SUITE(TNetClassifierTest) {
}
Y_UNIT_TEST(TestInitFromRemoteSource) {
- NMonitoring::TMetricRegistry sensors;
+ auto sensors = std::make_shared<NMonitoring::TMetricRegistry>();
TPortManager pm;
const ui16 port = pm.GetPort(2134);
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index ae8c7bf7d2..5e82b87fe8 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -186,7 +186,7 @@ void Init(
actorRegistrator(MakeYqlNodesManagerId(), nodesManager);
}
- auto httpProxy = NHttp::CreateHttpProxy(*NMonitoring::TMetricRegistry::Instance());
+ auto httpProxy = NHttp::CreateHttpProxy(NMonitoring::TMetricRegistry::SharedInstance());
actorRegistrator(MakeYqlAnalyticsHttpProxyId(), httpProxy);
if (protoConfig.GetPendingFetcher().GetEnabled()) {
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
index fc07635176..826c7d0572 100644
--- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
@@ -334,7 +334,7 @@ private:
if (std::holds_alternative<TMetricsToSend>(variant)) {
if (Y_UNLIKELY(!HttpProxyId)) {
- HttpProxyId = Register(NHttp::CreateHttpProxy(*NMonitoring::TMetricRegistry::Instance()));
+ HttpProxyId = Register(NHttp::CreateHttpProxy(NMonitoring::TMetricRegistry::SharedInstance()));
}
const auto metricsToSend = std::get<TMetricsToSend>(variant);
diff --git a/ydb/services/persqueue_cluster_discovery/cluster_discovery_service_ut.cpp b/ydb/services/persqueue_cluster_discovery/cluster_discovery_service_ut.cpp
index 34721e262d..e731cefc2c 100644
--- a/ydb/services/persqueue_cluster_discovery/cluster_discovery_service_ut.cpp
+++ b/ydb/services/persqueue_cluster_discovery/cluster_discovery_service_ut.cpp
@@ -250,6 +250,7 @@ public:
, BusPort_(PortManager_.GetPort(2134))
, GrpcPort_(PortManager_.GetPort(2135))
, Settings_(NPersQueueTests::PQSettings(BusPort_, 1))
+ , Sensors_(std::make_shared<NMonitoring::TMetricRegistry>())
{
Settings_.PQConfig.SetClustersUpdateTimeoutSec(1);
@@ -379,7 +380,7 @@ private:
ui16 GrpcPort_;
TServerSettings Settings_;
- NMonitoring::TMetricRegistry Sensors_;
+ std::shared_ptr<NMonitoring::TMetricRegistry> Sensors_;
THolder<TServer> Server_;
THolder<NPersQueueTests::TFlatMsgBusPQClient> PQClient_;