diff options
author | Alexey Efimov <xeno@prnwatch.com> | 2022-03-24 12:43:54 +0300 |
---|---|---|
committer | Alexey Efimov <xeno@prnwatch.com> | 2022-03-24 12:43:54 +0300 |
commit | 43a2b5dd73d8a7aa80d4c1ba4ee70428fffbd17e (patch) | |
tree | abebfb404c82a24c52c2372f497eadefd899dfb3 | |
parent | 057f22cbb5bfa82df4bfde220d5aaaa08f34257b (diff) | |
download | ydb-43a2b5dd73d8a7aa80d4c1ba4ee70428fffbd17e.tar.gz |
switch metrics registry to shared_ptr KIKIMR-14218
ref:5ef16c21e373b05982980dcd3016950f15a884e8
-rw-r--r-- | library/cpp/actors/core/process_stats.cpp | 51 | ||||
-rw-r--r-- | library/cpp/actors/core/process_stats.h | 1 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy.cpp | 62 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/http/http_ut.cpp | 9 | ||||
-rw-r--r-- | library/cpp/monlib/metrics/metric_registry.cpp | 6 | ||||
-rw-r--r-- | library/cpp/monlib/metrics/metric_registry.h | 1 | ||||
-rw-r--r-- | ydb/core/cms/console/net_classifier_updater.cpp | 3 | ||||
-rw-r--r-- | ydb/core/cms/console/net_classifier_updater_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/mind/address_classification/net_classifier_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/yq/libs/init/init.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_cluster_discovery/cluster_discovery_service_ut.cpp | 3 |
13 files changed, 104 insertions, 42 deletions
diff --git a/library/cpp/actors/core/process_stats.cpp b/library/cpp/actors/core/process_stats.cpp index ef793760b5..0fdad83da5 100644 --- a/library/cpp/actors/core/process_stats.cpp +++ b/library/cpp/actors/core/process_stats.cpp @@ -231,7 +231,6 @@ namespace { NMonitoring::TDynamicCounters::TCounterPtr SystemUptimeSeconds; }; - class TRegistryCollector: public TProcStatCollectingActor<TRegistryCollector> { using TBase = TProcStatCollectingActor<TRegistryCollector>; public: @@ -290,6 +289,52 @@ namespace { NMonitoring::TIntGauge* NumThreads; NMonitoring::TIntGauge* SystemUptimeSeconds; }; + + class TRegistryCollectorShared: public TProcStatCollectingActor<TRegistryCollectorShared> { + using TBase = TProcStatCollectingActor<TRegistryCollectorShared>; + public: + TRegistryCollectorShared(TDuration interval, std::weak_ptr<NMonitoring::TMetricRegistry> registry) + : TBase{interval} + , Registry(std::move(registry)) + { + } + + void UpdateCounters(const TProcStat& procStat) { + std::shared_ptr<NMonitoring::TMetricRegistry> registry = Registry.lock(); + if (registry) { + registry->IntGauge({{"sensor", "process.VmSize"}})->Set(procStat.Vsize); + registry->IntGauge({{"sensor", "process.AnonRssSize"}})->Set(procStat.AnonRss); + registry->IntGauge({{"sensor", "process.FileRssSize"}})->Set(procStat.FileRss); + registry->IntGauge({{"sensor", "process.CGroupMemLimit"}})->Set(procStat.CGroupMemLim); + registry->IntGauge({{"sensor", "process.UptimeSeconds"}})->Set(procStat.Uptime.Seconds()); + registry->IntGauge({{"sensor", "process.NumThreads"}})->Set(procStat.NumThreads); + registry->IntGauge({{"sensor", "system.UptimeSeconds"}})->Set(procStat.SystemUptime.Seconds()); + + // it is ok here to reset and add metric value, because mutation + // is performed in siglethreaded context + + NMonitoring::TRate* userTime = registry->Rate({{"sensor", "process.UserTime"}}); + NMonitoring::TRate* sysTime = registry->Rate({{"sensor", "process.SystemTime"}}); + NMonitoring::TRate* minorPageFaults = registry->Rate({{"sensor", "process.MinorPageFaults"}}); + NMonitoring::TRate* majorPageFaults = registry->Rate({{"sensor", "process.MajorPageFaults"}}); + + userTime->Reset(); + userTime->Add(procStat.Utime); + + sysTime->Reset(); + sysTime->Add(procStat.Stime); + + minorPageFaults->Reset(); + minorPageFaults->Add(procStat.MinFlt); + + majorPageFaults->Reset(); + majorPageFaults->Add(procStat.MajFlt); + } + } + + private: + std::weak_ptr<NMonitoring::TMetricRegistry> Registry; + }; } // namespace IActor* CreateProcStatCollector(ui32 intervalSec, NMonitoring::TDynamicCounterPtr counters) { @@ -299,4 +344,8 @@ namespace { IActor* CreateProcStatCollector(TDuration interval, NMonitoring::TMetricRegistry& registry) { return new TRegistryCollector(interval, registry); } + + IActor* CreateProcStatCollector(TDuration interval, std::weak_ptr<NMonitoring::TMetricRegistry> registry) { + return new TRegistryCollectorShared(interval, std::move(registry)); + } } diff --git a/library/cpp/actors/core/process_stats.h b/library/cpp/actors/core/process_stats.h index 66346d0b5a..5681f0eb1a 100644 --- a/library/cpp/actors/core/process_stats.h +++ b/library/cpp/actors/core/process_stats.h @@ -63,4 +63,5 @@ namespace NActors { IActor* CreateProcStatCollector(ui32 intervalSec, NMonitoring::TDynamicCounterPtr counters); IActor* CreateProcStatCollector(TDuration interval, NMonitoring::TMetricRegistry& registry); + IActor* CreateProcStatCollector(TDuration interval, std::weak_ptr<NMonitoring::TMetricRegistry> registry); } diff --git a/library/cpp/actors/http/http_proxy.cpp b/library/cpp/actors/http/http_proxy.cpp index 36c6855d93..e0204f6ed0 100644 --- a/library/cpp/actors/http/http_proxy.cpp +++ b/library/cpp/actors/http/http_proxy.cpp @@ -26,8 +26,8 @@ public: Become(&THttpProxy::StateWork); } - THttpProxy(NMonitoring::TMetricRegistry& sensors) - : Sensors(sensors) + THttpProxy(std::weak_ptr<NMonitoring::TMetricRegistry> registry) + : Registry(std::move(registry)) {} protected: @@ -175,29 +175,35 @@ protected: const static TString urlNotFound = "not-found"; const TString& url = (sensors.Status == "404" ? urlNotFound : sensors.Url); - Sensors.Rate({ - {"sensor", "count"}, - {"direction", sensors.Direction}, - {"peer", sensors.Host}, - {"url", url}, - {"status", sensors.Status} - })->Inc(); - Sensors.HistogramRate({ - {"sensor", "time_us"}, - {"direction", sensors.Direction}, - {"peer", sensors.Host}, - {"url", url}, - {"status", sensors.Status} - }, - NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MicroSeconds()); - Sensors.HistogramRate({ - {"sensor", "time_ms"}, - {"direction", sensors.Direction}, - {"peer", sensors.Host}, - {"url", url}, - {"status", sensors.Status} - }, - NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MilliSeconds()); + std::shared_ptr<NMonitoring::TMetricRegistry> registry = Registry.lock(); + if (registry) { + registry->Rate( + { + {"sensor", "count"}, + {"direction", sensors.Direction}, + {"peer", sensors.Host}, + {"url", url}, + {"status", sensors.Status} + })->Inc(); + registry->HistogramRate( + { + {"sensor", "time_us"}, + {"direction", sensors.Direction}, + {"peer", sensors.Host}, + {"url", url}, + {"status", sensors.Status} + }, + NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MicroSeconds()); + registry->HistogramRate( + { + {"sensor", "time_ms"}, + {"direction", sensors.Direction}, + {"peer", sensors.Host}, + {"url", url}, + {"status", sensors.Status} + }, + NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MilliSeconds()); + } } void Handle(NActors::TEvents::TEvPoison::TPtr, const NActors::TActorContext&) { @@ -217,7 +223,7 @@ protected: THashMap<TString, THostEntry> Hosts; THashMap<TString, TActorId> Handlers; THashSet<TActorId> Connections; // outgoing - NMonitoring::TMetricRegistry& Sensors; + std::weak_ptr<NMonitoring::TMetricRegistry> Registry; }; TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response) { @@ -240,8 +246,8 @@ TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingR ); } -NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors) { - return new THttpProxy(sensors); +NActors::IActor* CreateHttpProxy(std::weak_ptr<NMonitoring::TMetricRegistry> registry) { + return new THttpProxy(std::move(registry)); } bool IsIPv6(const TString& host) { diff --git a/library/cpp/actors/http/http_proxy.h b/library/cpp/actors/http/http_proxy.h index afd0170997..92002b047d 100644 --- a/library/cpp/actors/http/http_proxy.h +++ b/library/cpp/actors/http/http_proxy.h @@ -225,7 +225,7 @@ struct TEndpointInfo { TSslHelpers::TSslHolder<SSL_CTX> SecureContext; }; -NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors); +NActors::IActor* CreateHttpProxy(std::weak_ptr<NMonitoring::TMetricRegistry> registry = NMonitoring::TMetricRegistry::SharedInstance()); NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller); NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller); NActors::IActor* CreateIncomingConnectionActor( diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp index 4c922f8d0f..caa5b3e183 100644 --- a/library/cpp/actors/http/http_ut.cpp +++ b/library/cpp/actors/http/http_ut.cpp @@ -180,9 +180,8 @@ Y_UNIT_TEST_SUITE(HttpProxy) { TIpPort port = portManager.GetTcpPort(); TAutoPtr<NActors::IEventHandle> handle; actorSystem.Initialize(); - NMonitoring::TMetricRegistry sensors; - NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors); + NActors::IActor* proxy = NHttp::CreateHttpProxy(); NActors::TActorId proxyId = actorSystem.Register(proxy); actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true); actorSystem.DispatchEvents(); @@ -213,7 +212,6 @@ Y_UNIT_TEST_SUITE(HttpProxy) { TIpPort port = portManager.GetTcpPort(); TAutoPtr<NActors::IEventHandle> handle; actorSystem.Initialize(); - NMonitoring::TMetricRegistry sensors; TString certificateContent = R"___(-----BEGIN PRIVATE KEY----- MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCzRZjodO7Aqe1w @@ -272,7 +270,7 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V certificateFile.Write(certificateContent.data(), certificateContent.size()); - NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors); + NActors::IActor* proxy = NHttp::CreateHttpProxy(); NActors::TActorId proxyId = actorSystem.Register(proxy); THolder<NHttp::TEvHttpProxy::TEvAddListeningPort> add = MakeHolder<NHttp::TEvHttpProxy::TEvAddListeningPort>(port); @@ -332,9 +330,8 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V TIpPort port = portManager.GetTcpPort(); TAutoPtr<NActors::IEventHandle> handle; actorSystem.Initialize(); - NMonitoring::TMetricRegistry sensors; - NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors); + NActors::IActor* proxy = NHttp::CreateHttpProxy(); NActors::TActorId proxyId = actorSystem.Register(proxy); actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true); actorSystem.DispatchEvents(); diff --git a/library/cpp/monlib/metrics/metric_registry.cpp b/library/cpp/monlib/metrics/metric_registry.cpp index 74260e3069..3d7ffd40cf 100644 --- a/library/cpp/monlib/metrics/metric_registry.cpp +++ b/library/cpp/monlib/metrics/metric_registry.cpp @@ -43,9 +43,15 @@ namespace NMonitoring { TMetricRegistry& TMetricRegistry::operator=(TMetricRegistry&& other) = default; TMetricRegistry* TMetricRegistry::Instance() { + //return SharedInstance().get(); return Singleton<TMetricRegistry>(); } + std::shared_ptr<TMetricRegistry> TMetricRegistry::SharedInstance() { + static auto instance(std::make_shared<TMetricRegistry>()); + return instance; + } + TGauge* TMetricRegistry::Gauge(TLabels labels) { return Metric<TGauge, EMetricType::GAUGE>(std::move(labels)); } diff --git a/library/cpp/monlib/metrics/metric_registry.h b/library/cpp/monlib/metrics/metric_registry.h index 06e61d9991..faba19e845 100644 --- a/library/cpp/monlib/metrics/metric_registry.h +++ b/library/cpp/monlib/metrics/metric_registry.h @@ -67,6 +67,7 @@ namespace NMonitoring { * Get a global metrics registry instance. */ static TMetricRegistry* Instance(); + static std::shared_ptr<TMetricRegistry> SharedInstance(); TGauge* Gauge(TLabels labels); TLazyGauge* LazyGauge(TLabels labels, std::function<double()> supplier); diff --git a/ydb/core/cms/console/net_classifier_updater.cpp b/ydb/core/cms/console/net_classifier_updater.cpp index 121a65efc3..d863c748a0 100644 --- a/ydb/core/cms/console/net_classifier_updater.cpp +++ b/ydb/core/cms/console/net_classifier_updater.cpp @@ -67,6 +67,7 @@ private: public: NetClassifierUpdater(TActorId localConsole) : LocalConsole(localConsole) + , HttpSensors(std::make_shared<NMonitoring::TMetricRegistry>()) { } @@ -328,7 +329,7 @@ private: private: TActorId LocalConsole; TActorId HttpProxyId; - NMonitoring::TMetricRegistry HttpSensors; + std::shared_ptr<NMonitoring::TMetricRegistry> HttpSensors; TString PackedNetData; TString LastUpdateDatetimeUTC; diff --git a/ydb/core/cms/console/net_classifier_updater_ut.cpp b/ydb/core/cms/console/net_classifier_updater_ut.cpp index da41ee51f2..b340b1c317 100644 --- a/ydb/core/cms/console/net_classifier_updater_ut.cpp +++ b/ydb/core/cms/console/net_classifier_updater_ut.cpp @@ -133,7 +133,7 @@ Y_UNIT_TEST_SUITE(TNetClassifierUpdaterTest) { TNetClassifierUpdaterConfig::EFormat format = TNetClassifierUpdaterConfig::TSV, const TVector<TString>& netBoxTags = {} ) { - NMonitoring::TMetricRegistry sensors; + auto sensors(std::make_shared<NMonitoring::TMetricRegistry>()); TPortManager pm; const ui16 port = pm.GetPort(2134); diff --git a/ydb/core/mind/address_classification/net_classifier_ut.cpp b/ydb/core/mind/address_classification/net_classifier_ut.cpp index 088d25b9c7..67262bc42f 100644 --- a/ydb/core/mind/address_classification/net_classifier_ut.cpp +++ b/ydb/core/mind/address_classification/net_classifier_ut.cpp @@ -144,7 +144,7 @@ Y_UNIT_TEST_SUITE(TNetClassifierTest) { } Y_UNIT_TEST(TestInitFromRemoteSource) { - NMonitoring::TMetricRegistry sensors; + auto sensors = std::make_shared<NMonitoring::TMetricRegistry>(); TPortManager pm; const ui16 port = pm.GetPort(2134); diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index ae8c7bf7d2..5e82b87fe8 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -186,7 +186,7 @@ void Init( actorRegistrator(MakeYqlNodesManagerId(), nodesManager); } - auto httpProxy = NHttp::CreateHttpProxy(*NMonitoring::TMetricRegistry::Instance()); + auto httpProxy = NHttp::CreateHttpProxy(NMonitoring::TMetricRegistry::SharedInstance()); actorRegistrator(MakeYqlAnalyticsHttpProxyId(), httpProxy); if (protoConfig.GetPendingFetcher().GetEnabled()) { diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp index fc07635176..826c7d0572 100644 --- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp @@ -334,7 +334,7 @@ private: if (std::holds_alternative<TMetricsToSend>(variant)) { if (Y_UNLIKELY(!HttpProxyId)) { - HttpProxyId = Register(NHttp::CreateHttpProxy(*NMonitoring::TMetricRegistry::Instance())); + HttpProxyId = Register(NHttp::CreateHttpProxy(NMonitoring::TMetricRegistry::SharedInstance())); } const auto metricsToSend = std::get<TMetricsToSend>(variant); diff --git a/ydb/services/persqueue_cluster_discovery/cluster_discovery_service_ut.cpp b/ydb/services/persqueue_cluster_discovery/cluster_discovery_service_ut.cpp index 34721e262d..e731cefc2c 100644 --- a/ydb/services/persqueue_cluster_discovery/cluster_discovery_service_ut.cpp +++ b/ydb/services/persqueue_cluster_discovery/cluster_discovery_service_ut.cpp @@ -250,6 +250,7 @@ public: , BusPort_(PortManager_.GetPort(2134)) , GrpcPort_(PortManager_.GetPort(2135)) , Settings_(NPersQueueTests::PQSettings(BusPort_, 1)) + , Sensors_(std::make_shared<NMonitoring::TMetricRegistry>()) { Settings_.PQConfig.SetClustersUpdateTimeoutSec(1); @@ -379,7 +380,7 @@ private: ui16 GrpcPort_; TServerSettings Settings_; - NMonitoring::TMetricRegistry Sensors_; + std::shared_ptr<NMonitoring::TMetricRegistry> Sensors_; THolder<TServer> Server_; THolder<NPersQueueTests::TFlatMsgBusPQClient> PQClient_; |