diff options
| author | Hor911 <[email protected]> | 2024-02-26 18:31:42 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-02-26 18:31:42 +0300 |
| commit | c3a87cf628f4b90ca0aa2626a485f246e1c5e9a9 (patch) | |
| tree | 140ffab12be0ed0c2069de894915d280fbe98a08 | |
| parent | d6e9f19f7d81a684caf833458d95d16019900aab (diff) | |
REST Monitoring API Support for CPU Load Control (#2229)
7 files changed, 210 insertions, 7 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp index 08382442f6d..73d7d7355bc 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp +++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp @@ -343,7 +343,14 @@ public: void CreateSingleClientActors(const NConfig::TYdbComputeControlPlane::TSingle& singleConfig) { auto globalLoadConfig = Config.GetYdb().GetLoadControlConfig(); if (globalLoadConfig.GetEnable()) { - auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(singleConfig.GetConnection()), CredentialsProviderFactory(GetYdbCredentialSettings(singleConfig.GetConnection()))->CreateProvider()).release()); + TActorId clientActor; + auto monitoringEndpoint = globalLoadConfig.GetMonitoringEndpoint(); + auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(singleConfig.GetConnection()))->CreateProvider(); + if (monitoringEndpoint) { + clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, singleConfig.GetConnection().GetDatabase(), credentialsProvider).release()); + } else { + clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(singleConfig.GetConnection()), credentialsProvider).release()); + } MonitoringActorId = Register(CreateDatabaseMonitoringActor(clientActor, globalLoadConfig, Counters).release()); } } @@ -359,7 +366,14 @@ public: ? Config.GetYdb().GetLoadControlConfig() : globalLoadConfig; if (loadConfig.GetEnable()) { - auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()); + TActorId clientActor; + auto monitoringEndpoint = loadConfig.GetMonitoringEndpoint(); + auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider(); + if (monitoringEndpoint) { + clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, config.GetControlPlaneConnection().GetDatabase(), credentialsProvider).release()); + } else { + clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), credentialsProvider).release()); + } databaseMonitoringActor = Register(CreateDatabaseMonitoringActor(clientActor, loadConfig, Counters).release()); } Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor, databaseMonitoringActor}); @@ -375,7 +389,14 @@ public: ? Config.GetYdb().GetLoadControlConfig() : globalLoadConfig; if (loadConfig.GetEnable()) { - auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()); + TActorId clientActor; + auto monitoringEndpoint = loadConfig.GetMonitoringEndpoint(); + auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider(); + if (monitoringEndpoint) { + clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, config.GetControlPlaneConnection().GetDatabase(), credentialsProvider).release()); + } else { + clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), credentialsProvider).release()); + } databaseMonitoringActor = Register(CreateDatabaseMonitoringActor(clientActor, loadConfig, Counters).release()); } Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor, databaseMonitoringActor}; diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h index 509d72ada94..07ef06c3d20 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h +++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h @@ -29,6 +29,7 @@ std::unique_ptr<NActors::IActor> CreateCmsGrpcClientActor(const NCloud::TGrpcCli std::unique_ptr<NActors::IActor> CreateComputeDatabasesCacheActor(const NActors::TActorId& databaseClientActorId, const TString& databasesCacheReloadPeriod, const ::NMonitoring::TDynamicCounterPtr& counters); std::unique_ptr<NActors::IActor> CreateMonitoringGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider); +std::unique_ptr<NActors::IActor> CreateMonitoringRestClientActor(const TString& endpoint, const TString& database, const NYdb::TCredentialsProviderPtr& credentialsProvider); std::unique_ptr<NActors::IActor> CreateDatabaseMonitoringActor(const NActors::TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters); diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp index 19963aa9fc2..2c1f9b533aa 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp +++ b/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp @@ -113,6 +113,10 @@ public: auto delta = now - LastCpuLoad; LastCpuLoad = now; + if (response.CpuNumber) { + CpuNumber = response.CpuNumber; + } + InstantLoad = response.InstantLoad; // exponential moving average if (!Ready || delta >= AverageLoadInterval) { @@ -252,7 +256,7 @@ private: const double DefaultQueryLoad; const ui32 PendingQueueSize; const bool Strict; - const ui32 CpuNumber; + ui32 CpuNumber = 0; TQueue<TEvYdbCompute::TEvCpuQuotaRequest::TPtr> PendingQueue; }; diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/monitoring_rest_client_actor.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/monitoring_rest_client_actor.cpp new file mode 100644 index 00000000000..a8fb27b271f --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/control_plane/monitoring_rest_client_actor.cpp @@ -0,0 +1,171 @@ +#include <ydb/core/fq/libs/compute/ydb/events/events.h> +#include <ydb/library/services/services.pb.h> + +#include <ydb/library/security/ydb_credentials_provider_factory.h> + +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/event.h> +#include <ydb/library/actors/core/hfunc.h> +#include <ydb/library/actors/core/log.h> + +#include <ydb/library/actors/http/http_proxy.h> + +#include <ydb/library/yql/utils/actors/http_sender.h> +#include <ydb/library/yql/utils/actors/http_sender_actor.h> +#include <ydb/library/yql/utils/url_builder.h> + +#include <library/cpp/json/json_reader.h> + +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringRestClient]: " << stream) +#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringRestClient]: " << stream) +#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringRestClient]: " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringRestClient]: " << stream) +#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringRestClient]: " << stream) + +namespace NFq { + +using namespace NActors; + +namespace { + +auto RetryPolicy = NYql::NDq::THttpSenderRetryPolicy::GetExponentialBackoffPolicy( + [](const NHttp::TEvHttpProxy::TEvHttpIncomingResponse* resp){ + if (!resp || !resp->Response) { + // Connection wasn't established. Should retry. + return ERetryErrorClass::ShortRetry; + } + + if (resp->Response->Status == "401") { + return ERetryErrorClass::NoRetry; + } + + return ERetryErrorClass::ShortRetry; + }); + +} + +class TMonitoringRestServiceActor : public NActors::TActor<TMonitoringRestServiceActor> { +public: + using TBase = NActors::TActor<TMonitoringRestServiceActor>; + + TMonitoringRestServiceActor(const TString& endpoint, const TString& database, const NYdb::TCredentialsProviderPtr& credentialsProvider) + : TBase(&TMonitoringRestServiceActor::StateFunc) + , Endpoint(endpoint) + , Database(database) + , CredentialsProvider(credentialsProvider) + {} + + STRICT_STFUNC(StateFunc, + hFunc(TEvYdbCompute::TEvCpuLoadRequest, Handle); + hFunc(NYql::NDq::TEvHttpBase::TEvSendResult, Handle); + ) + + void Handle(TEvYdbCompute::TEvCpuLoadRequest::TPtr& ev) { + if (Y_UNLIKELY(!HttpProxyId)) { + HttpProxyId = Register(NHttp::CreateHttpProxy(NMonitoring::TMetricRegistry::SharedInstance())); + } + + auto httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet( + NYql::TUrlBuilder(Endpoint) + .AddPathComponent("viewer") + .AddPathComponent("json") + .AddPathComponent("tenantinfo") + .AddUrlParam("path", Database) + .Build() + ); + LOG_D(httpRequest->GetRawData()); + httpRequest->Set("Authorization", CredentialsProvider->GetAuthInfo()); + + auto httpSenderId = Register(NYql::NDq::CreateHttpSenderActor(SelfId(), HttpProxyId, RetryPolicy)); + Send(httpSenderId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest), 0, Cookie); + Requests[Cookie++] = ev; + } + + void Handle(NYql::NDq::TEvHttpBase::TEvSendResult::TPtr& ev) { + auto it = Requests.find(ev->Cookie); + if (it == Requests.end()) { + LOG_E("Request doesn't exist (TEvSendResult). Need to fix this bug urgently"); + return; + } + auto request = it->second; + Requests.erase(it); + + const auto& result = *ev->Get(); + const auto& response = *result.HttpIncomingResponse->Get(); + + auto forwardResponse = std::make_unique<TEvYdbCompute::TEvCpuLoadResponse>(); + + const TString& error = response.GetError(); + if (!error.empty()) { + forwardResponse->Issues.AddIssue(error); + Send(request->Sender, forwardResponse.release(), 0, request->Cookie); + return; + } + + try { + NJson::TJsonReaderConfig jsonConfig; + NJson::TJsonValue info; + if (NJson::ReadJsonTree(response.Response->Body, &jsonConfig, &info)) { + bool usageFound = false; + if (auto* tenantNode = info.GetValueByPath("TenantInfo")) { + if (tenantNode->GetType() == NJson::JSON_ARRAY) { + for (auto tenantItem : tenantNode->GetArray()) { + if (auto* nameNode = tenantItem.GetValueByPath("Name")) { + if (nameNode->GetStringSafe() != Database) { + continue; + } + } + if (auto* poolNode = tenantItem.GetValueByPath("PoolStats")) { + if (poolNode->GetType() == NJson::JSON_ARRAY) { + for (auto poolItem : poolNode->GetArray()) { + if (auto* nameNode = poolItem.GetValueByPath("Name")) { + if (nameNode->GetStringSafe() == "User") { + if (auto* usageNode = poolItem.GetValueByPath("Usage")) { + forwardResponse->InstantLoad = usageNode->GetDoubleSafe(); + usageFound = true; + break; + } + if (auto* threadsNode = poolItem.GetValueByPath("Threads")) { + forwardResponse->CpuNumber = threadsNode->GetIntegerSafe(); + } + } + } + } + } + } + if (usageFound) { + break; + } + } + } + } + if (!usageFound) { + forwardResponse->Issues.AddIssue(TStringBuilder() << "MISSED User pool node load for database \"" << Database << '"'); + } + } else { + forwardResponse->Issues.AddIssue("Malformed JSON"); + } + } catch(const std::exception& e) { + forwardResponse->Issues.AddIssue(TStringBuilder() << "Error on JSON parsing: '" << e.what() << "'"); + } + + if (forwardResponse->Issues) { + LOG_E(response.Response->Body); + } + Send(request->Sender, forwardResponse.release(), 0, request->Cookie); + } + +private: + TString Endpoint; + TString Database; + TMap<uint64_t, TEvYdbCompute::TEvCpuLoadRequest::TPtr> Requests; + NYdb::TCredentialsProviderPtr CredentialsProvider; + int64_t Cookie = 0; + TActorId HttpProxyId; +}; + +std::unique_ptr<NActors::IActor> CreateMonitoringRestClientActor(const TString& endpoint, const TString& database, const NYdb::TCredentialsProviderPtr& credentialsProvider) { + return std::make_unique<TMonitoringRestServiceActor>(endpoint, database, credentialsProvider); +} + +} diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/ya.make b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make index daedf40cdff..523a26c2bfe 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/ya.make +++ b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make @@ -6,10 +6,12 @@ SRCS( compute_databases_cache.cpp database_monitoring.cpp monitoring_grpc_client_actor.cpp + monitoring_rest_client_actor.cpp ydbcp_grpc_client_actor.cpp ) PEERDIR( + library/cpp/json ydb/library/actors/core ydb/library/actors/protos ydb/core/fq/libs/compute/ydb/synchronization_service @@ -18,6 +20,8 @@ PEERDIR( ydb/core/protos ydb/library/db_pool/protos ydb/library/yql/public/issue + ydb/library/yql/utils + ydb/library/yql/utils/actors ydb/public/api/grpc ydb/public/api/grpc/draft ydb/public/lib/operation_id/protos diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h index 5996774d73f..442ff42e2cf 100644 --- a/ydb/core/fq/libs/compute/ydb/events/events.h +++ b/ydb/core/fq/libs/compute/ydb/events/events.h @@ -457,16 +457,17 @@ struct TEvYdbCompute { }; struct TEvCpuLoadResponse : public NActors::TEventLocal<TEvCpuLoadResponse, EvCpuLoadResponse> { - TEvCpuLoadResponse(double instantLoad = 0.0, double averageLoad = 0.0) - : InstantLoad(instantLoad), AverageLoad(averageLoad) + TEvCpuLoadResponse(double instantLoad = 0.0, double averageLoad = 0.0, ui32 cpuNumber = 0) + : InstantLoad(instantLoad), AverageLoad(averageLoad), CpuNumber(cpuNumber) {} TEvCpuLoadResponse(NYql::TIssues issues) - : InstantLoad(0.0), AverageLoad(0.0), Issues(std::move(issues)) + : InstantLoad(0.0), AverageLoad(0.0), CpuNumber(0), Issues(std::move(issues)) {} double InstantLoad; double AverageLoad; + ui32 CpuNumber; NYql::TIssues Issues; }; diff --git a/ydb/core/fq/libs/config/protos/compute.proto b/ydb/core/fq/libs/config/protos/compute.proto index aff608e2773..60bc59125dd 100644 --- a/ydb/core/fq/libs/config/protos/compute.proto +++ b/ydb/core/fq/libs/config/protos/compute.proto @@ -26,6 +26,7 @@ message TLoadControlConfig { uint32 PendingQueueSize = 6; // default 0 == instant decline if overloaded bool Strict = 7; // default false, whether to deny execution in load level unavailable uint32 CpuNumber = 8; + string MonitoringEndpoint = 9; // if defined, will be used as REST API instead of default GRPC } message TComputeDatabaseConfig { |
