diff options
author | hcpp <hcpp@ydb.tech> | 2023-11-20 12:05:49 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-11-20 12:35:49 +0300 |
commit | d7560330c3eadb39eb8f8cdbd42f86e722e82140 (patch) | |
tree | 85726665da62c18e06415fc20a0347813a79726c | |
parent | b045573f5ad2232841962bcefeeb0726cceb228a (diff) | |
download | ydb-d7560330c3eadb39eb8f8cdbd42f86e722e82140.tar.gz |
compute databases cache metrics have been added
3 files changed, 91 insertions, 14 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp index 6937430ce4..7bc6da33a8 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp +++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp @@ -332,7 +332,7 @@ public: const auto& mapping = cmsConfig.GetDatabaseMapping(); for (const auto& config: mapping.GetCommon()) { const auto clientActor = Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()); - const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod).release()); + const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release()); Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor}); } @@ -340,7 +340,7 @@ public: for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) { const auto clientActor = Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()); - const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod).release()); + const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release()); Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor}; } } @@ -349,7 +349,7 @@ public: const auto& mapping = controlPlaneConfig.GetDatabaseMapping(); for (const auto& config: mapping.GetCommon()) { const auto clientActor = Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()); - const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod).release()); + const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release()); Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor}); } @@ -357,7 +357,7 @@ public: for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) { const auto clientActor = Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()); - const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod).release()); + const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release()); Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor}; } } diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h index 7d41307712..9109ca6aad 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h +++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h @@ -26,6 +26,6 @@ std::unique_ptr<NActors::IActor> CreateYdbcpGrpcClientActor(const NCloud::TGrpcC std::unique_ptr<NActors::IActor> CreateCmsGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider); -std::unique_ptr<NActors::IActor> CreateComputeDatabasesCacheActor(const NActors::TActorId& databaseClientActorId, const TString& databasesCacheReloadPeriod); +std::unique_ptr<NActors::IActor> CreateComputeDatabasesCacheActor(const NActors::TActorId& databaseClientActorId, const TString& databasesCacheReloadPeriod, const ::NMonitoring::TDynamicCounterPtr& counters); } diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_databases_cache.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/compute_databases_cache.cpp index 7e308203ff..637925e3b1 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/compute_databases_cache.cpp +++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_databases_cache.cpp @@ -20,10 +20,56 @@ namespace NFq { class TComputeDatabasesCacheActor : public NActors::TActorBootstrapped<TComputeDatabasesCacheActor> { + struct TCounters { + ::NMonitoring::TDynamicCounterPtr Counters; + struct TCommonMetrics { + ::NMonitoring::TDynamicCounters::TCounterPtr InFly; + ::NMonitoring::TDynamicCounters::TCounterPtr Ok; + ::NMonitoring::TDynamicCounters::TCounterPtr Error; + ::NMonitoring::THistogramPtr LatencyMs; + }; + + TCommonMetrics CheckDatabaseRequest; + TCommonMetrics AddDatabaseRequest; + TCommonMetrics CacheReload; + + explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters) + : Counters(counters) + { + Register(); + } + + private: + void Register() { + ::NMonitoring::TDynamicCounterPtr component = Counters->GetSubgroup("component", "ComputeDatabasesCache"); + RegisterCommonMetrics(CheckDatabaseRequest, component->GetSubgroup("subcomponent", "CheckDatabaseRequest")); + RegisterCommonMetrics(AddDatabaseRequest, component->GetSubgroup("subcomponent", "AddDatabaseRequest")); + RegisterCommonMetrics(CacheReload, component->GetSubgroup("subcomponent", "CacheReload")); + } + + void RegisterCommonMetrics(TCommonMetrics& metrics, ::NMonitoring::TDynamicCounterPtr subComponent) { + metrics.Ok = subComponent->GetCounter("Ok", true); + metrics.Error = subComponent->GetCounter("Error", true); + metrics.InFly = subComponent->GetCounter("InFly", false); + metrics.LatencyMs = subComponent->GetHistogram("LatencyMs", GetLatencyHistogramBuckets()); + } + + static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() { + return ::NMonitoring::ExplicitHistogram({0, 1, 2, 5, 10, 20, 50, 100, 500, 1000, 2000, 5000, 10000, 30000, 50000, 500000}); + } + }; + + struct TPendingItem { + TInstant StartTime; + TEvYdbCompute::TEvCheckDatabaseRequest::TPtr Request; + }; + public: using TBase =NActors::TActorBootstrapped<TComputeDatabasesCacheActor>; - TComputeDatabasesCacheActor(const TActorId& databaseClientActorId, const TString& databasesCacheReloadPeriod) - : DatabaseClientActorId(databaseClientActorId) + TComputeDatabasesCacheActor(const TActorId& databaseClientActorId, const TString& databasesCacheReloadPeriod, const ::NMonitoring::TDynamicCounterPtr& counters) + : StartCacheReload(TInstant::Now()) + , DatabaseClientActorId(databaseClientActorId) + , Counters(counters) , DatabasesCacheReloadPeriod(GetDuration(databasesCacheReloadPeriod, TDuration::Seconds(30))) {} @@ -32,6 +78,7 @@ public: void Bootstrap() { LOG_E("Cache Bootstrap, client " << DatabaseClientActorId.ToString()); InFlight = true; + Counters.CacheReload.InFly->Inc(); Send(DatabaseClientActorId, new TEvYdbCompute::TEvListDatabasesRequest()); Become(&TComputeDatabasesCacheActor::StateFunc, DatabasesCacheReloadPeriod, new NActors::TEvents::TEvWakeup()); } @@ -44,13 +91,19 @@ public: ) void Handle(TEvYdbCompute::TEvCheckDatabaseRequest::TPtr& ev) { + TInstant startTime = TInstant::Now(); + Counters.CheckDatabaseRequest.InFly->Inc(); const auto& path = ev->Get()->Path; LOG_D("CheckDatabaseRequest, path: " << path << ", ready: " << Ready); if (!Ready) { - PendingRequests.push_back(ev); + PendingRequests.push_back({startTime, ev}); return; } + Send(ev->Sender, new TEvYdbCompute::TEvCheckDatabaseResponse(Databases.contains(path)), 0, ev->Cookie); + Counters.CheckDatabaseRequest.InFly->Dec(); + Counters.CheckDatabaseRequest.Ok->Inc(); + Counters.CheckDatabaseRequest.LatencyMs->Collect(DeltaMs(startTime)); } void Handle(TEvYdbCompute::TEvListDatabasesResponse::TPtr& ev) { @@ -61,6 +114,9 @@ public: if (issues) { NotifyPendingRequests(issues); LOG_E("ListDatabasesResponse was failed with issues: " << issues.ToOneLineString()); + Counters.CacheReload.Error->Inc(); + Counters.CacheReload.InFly->Dec(); + Counters.CacheReload.LatencyMs->Collect(DeltaMs(StartCacheReload)); return; } @@ -68,45 +124,66 @@ public: Ready = true; NotifyPendingRequests(); LOG_D("Updated list of databases, count = " << Databases.size()); + Counters.CacheReload.Ok->Inc(); + Counters.CacheReload.InFly->Dec(); + Counters.CacheReload.LatencyMs->Collect(DeltaMs(StartCacheReload)); } void Handle(TEvYdbCompute::TEvAddDatabaseRequest::TPtr& ev) { + TInstant startTime = TInstant::Now(); + Counters.AddDatabaseRequest.InFly->Inc(); const auto& path = ev->Get()->Path; LOG_D("AddDatabaseRequest, path: " << path << ", ready: " << Ready); Databases.insert(path); Send(ev->Sender, new TEvYdbCompute::TEvAddDatabaseResponse{}, 0, ev->Cookie); + Counters.AddDatabaseRequest.InFly->Dec(); + Counters.AddDatabaseRequest.Ok->Inc(); + Counters.AddDatabaseRequest.LatencyMs->Collect(DeltaMs(startTime)); } void Wakeup() { if (!InFlight) { + Counters.CacheReload.InFly->Inc(); InFlight = true; + StartCacheReload = TInstant::Now(); Send(DatabaseClientActorId, new TEvYdbCompute::TEvListDatabasesRequest()); } Schedule(DatabasesCacheReloadPeriod, new NActors::TEvents::TEvWakeup()); } void NotifyPendingRequests(const NYql::TIssues& issues = {}) { - for (const auto& request: PendingRequests) { + for (const auto& item: PendingRequests) { if (issues) { - Send(request->Sender, new TEvYdbCompute::TEvCheckDatabaseResponse(issues), 0, request->Cookie); + Send(item.Request->Sender, new TEvYdbCompute::TEvCheckDatabaseResponse(issues), 0, item.Request->Cookie); + Counters.CheckDatabaseRequest.Error->Inc(); } else { - Send(request->Sender, new TEvYdbCompute::TEvCheckDatabaseResponse(Databases.contains(request.Get()->Get()->Path)), 0, request->Cookie); + Send(item.Request->Sender, new TEvYdbCompute::TEvCheckDatabaseResponse(Databases.contains(item.Request.Get()->Get()->Path)), 0, item.Request->Cookie); + Counters.CheckDatabaseRequest.Ok->Inc(); } + Counters.CheckDatabaseRequest.InFly->Dec(); + Counters.CheckDatabaseRequest.LatencyMs->Collect(DeltaMs(item.StartTime)); } PendingRequests.clear(); } private: - TVector<TEvYdbCompute::TEvCheckDatabaseRequest::TPtr> PendingRequests; + static ui64 DeltaMs(const TInstant& startTime) { + return (TInstant::Now() - startTime).MilliSeconds(); + } + +private: + TVector<TPendingItem> PendingRequests; + TInstant StartCacheReload; TActorId DatabaseClientActorId; TSet<TString> Databases; + TCounters Counters; bool InFlight = false; bool Ready = false; const TDuration DatabasesCacheReloadPeriod = TDuration::Seconds(30); }; -std::unique_ptr<NActors::IActor> CreateComputeDatabasesCacheActor(const TActorId& databaseClientActorId, const TString& databasesCacheReloadPeriod) { - return std::make_unique<TComputeDatabasesCacheActor>(databaseClientActorId, databasesCacheReloadPeriod); +std::unique_ptr<NActors::IActor> CreateComputeDatabasesCacheActor(const TActorId& databaseClientActorId, const TString& databasesCacheReloadPeriod, const ::NMonitoring::TDynamicCounterPtr& counters) { + return std::make_unique<TComputeDatabasesCacheActor>(databaseClientActorId, databasesCacheReloadPeriod, counters); } } |