summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp27
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h1
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp6
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/monitoring_rest_client_actor.cpp171
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/ya.make4
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/events.h7
-rw-r--r--ydb/core/fq/libs/config/protos/compute.proto1
7 files changed, 210 insertions, 7 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp
index 08382442f6d..73d7d7355bc 100644
--- a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp
@@ -343,7 +343,14 @@ public:
void CreateSingleClientActors(const NConfig::TYdbComputeControlPlane::TSingle& singleConfig) {
auto globalLoadConfig = Config.GetYdb().GetLoadControlConfig();
if (globalLoadConfig.GetEnable()) {
- auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(singleConfig.GetConnection()), CredentialsProviderFactory(GetYdbCredentialSettings(singleConfig.GetConnection()))->CreateProvider()).release());
+ TActorId clientActor;
+ auto monitoringEndpoint = globalLoadConfig.GetMonitoringEndpoint();
+ auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(singleConfig.GetConnection()))->CreateProvider();
+ if (monitoringEndpoint) {
+ clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, singleConfig.GetConnection().GetDatabase(), credentialsProvider).release());
+ } else {
+ clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(singleConfig.GetConnection()), credentialsProvider).release());
+ }
MonitoringActorId = Register(CreateDatabaseMonitoringActor(clientActor, globalLoadConfig, Counters).release());
}
}
@@ -359,7 +366,14 @@ public:
? Config.GetYdb().GetLoadControlConfig()
: globalLoadConfig;
if (loadConfig.GetEnable()) {
- auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
+ TActorId clientActor;
+ auto monitoringEndpoint = loadConfig.GetMonitoringEndpoint();
+ auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider();
+ if (monitoringEndpoint) {
+ clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, config.GetControlPlaneConnection().GetDatabase(), credentialsProvider).release());
+ } else {
+ clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), credentialsProvider).release());
+ }
databaseMonitoringActor = Register(CreateDatabaseMonitoringActor(clientActor, loadConfig, Counters).release());
}
Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor, databaseMonitoringActor});
@@ -375,7 +389,14 @@ public:
? Config.GetYdb().GetLoadControlConfig()
: globalLoadConfig;
if (loadConfig.GetEnable()) {
- auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
+ TActorId clientActor;
+ auto monitoringEndpoint = loadConfig.GetMonitoringEndpoint();
+ auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider();
+ if (monitoringEndpoint) {
+ clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, config.GetControlPlaneConnection().GetDatabase(), credentialsProvider).release());
+ } else {
+ clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), credentialsProvider).release());
+ }
databaseMonitoringActor = Register(CreateDatabaseMonitoringActor(clientActor, loadConfig, Counters).release());
}
Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor, databaseMonitoringActor};
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h
index 509d72ada94..07ef06c3d20 100644
--- a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h
@@ -29,6 +29,7 @@ std::unique_ptr<NActors::IActor> CreateCmsGrpcClientActor(const NCloud::TGrpcCli
std::unique_ptr<NActors::IActor> CreateComputeDatabasesCacheActor(const NActors::TActorId& databaseClientActorId, const TString& databasesCacheReloadPeriod, const ::NMonitoring::TDynamicCounterPtr& counters);
std::unique_ptr<NActors::IActor> CreateMonitoringGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider);
+std::unique_ptr<NActors::IActor> CreateMonitoringRestClientActor(const TString& endpoint, const TString& database, const NYdb::TCredentialsProviderPtr& credentialsProvider);
std::unique_ptr<NActors::IActor> CreateDatabaseMonitoringActor(const NActors::TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters);
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp
index 19963aa9fc2..2c1f9b533aa 100644
--- a/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp
@@ -113,6 +113,10 @@ public:
auto delta = now - LastCpuLoad;
LastCpuLoad = now;
+ if (response.CpuNumber) {
+ CpuNumber = response.CpuNumber;
+ }
+
InstantLoad = response.InstantLoad;
// exponential moving average
if (!Ready || delta >= AverageLoadInterval) {
@@ -252,7 +256,7 @@ private:
const double DefaultQueryLoad;
const ui32 PendingQueueSize;
const bool Strict;
- const ui32 CpuNumber;
+ ui32 CpuNumber = 0;
TQueue<TEvYdbCompute::TEvCpuQuotaRequest::TPtr> PendingQueue;
};
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/monitoring_rest_client_actor.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/monitoring_rest_client_actor.cpp
new file mode 100644
index 00000000000..a8fb27b271f
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/monitoring_rest_client_actor.cpp
@@ -0,0 +1,171 @@
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+#include <ydb/library/services/services.pb.h>
+
+#include <ydb/library/security/ydb_credentials_provider_factory.h>
+
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/event.h>
+#include <ydb/library/actors/core/hfunc.h>
+#include <ydb/library/actors/core/log.h>
+
+#include <ydb/library/actors/http/http_proxy.h>
+
+#include <ydb/library/yql/utils/actors/http_sender.h>
+#include <ydb/library/yql/utils/actors/http_sender_actor.h>
+#include <ydb/library/yql/utils/url_builder.h>
+
+#include <library/cpp/json/json_reader.h>
+
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringRestClient]: " << stream)
+#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringRestClient]: " << stream)
+#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringRestClient]: " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringRestClient]: " << stream)
+#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringRestClient]: " << stream)
+
+namespace NFq {
+
+using namespace NActors;
+
+namespace {
+
+auto RetryPolicy = NYql::NDq::THttpSenderRetryPolicy::GetExponentialBackoffPolicy(
+ [](const NHttp::TEvHttpProxy::TEvHttpIncomingResponse* resp){
+ if (!resp || !resp->Response) {
+ // Connection wasn't established. Should retry.
+ return ERetryErrorClass::ShortRetry;
+ }
+
+ if (resp->Response->Status == "401") {
+ return ERetryErrorClass::NoRetry;
+ }
+
+ return ERetryErrorClass::ShortRetry;
+ });
+
+}
+
+class TMonitoringRestServiceActor : public NActors::TActor<TMonitoringRestServiceActor> {
+public:
+ using TBase = NActors::TActor<TMonitoringRestServiceActor>;
+
+ TMonitoringRestServiceActor(const TString& endpoint, const TString& database, const NYdb::TCredentialsProviderPtr& credentialsProvider)
+ : TBase(&TMonitoringRestServiceActor::StateFunc)
+ , Endpoint(endpoint)
+ , Database(database)
+ , CredentialsProvider(credentialsProvider)
+ {}
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvYdbCompute::TEvCpuLoadRequest, Handle);
+ hFunc(NYql::NDq::TEvHttpBase::TEvSendResult, Handle);
+ )
+
+ void Handle(TEvYdbCompute::TEvCpuLoadRequest::TPtr& ev) {
+ if (Y_UNLIKELY(!HttpProxyId)) {
+ HttpProxyId = Register(NHttp::CreateHttpProxy(NMonitoring::TMetricRegistry::SharedInstance()));
+ }
+
+ auto httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet(
+ NYql::TUrlBuilder(Endpoint)
+ .AddPathComponent("viewer")
+ .AddPathComponent("json")
+ .AddPathComponent("tenantinfo")
+ .AddUrlParam("path", Database)
+ .Build()
+ );
+ LOG_D(httpRequest->GetRawData());
+ httpRequest->Set("Authorization", CredentialsProvider->GetAuthInfo());
+
+ auto httpSenderId = Register(NYql::NDq::CreateHttpSenderActor(SelfId(), HttpProxyId, RetryPolicy));
+ Send(httpSenderId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest), 0, Cookie);
+ Requests[Cookie++] = ev;
+ }
+
+ void Handle(NYql::NDq::TEvHttpBase::TEvSendResult::TPtr& ev) {
+ auto it = Requests.find(ev->Cookie);
+ if (it == Requests.end()) {
+ LOG_E("Request doesn't exist (TEvSendResult). Need to fix this bug urgently");
+ return;
+ }
+ auto request = it->second;
+ Requests.erase(it);
+
+ const auto& result = *ev->Get();
+ const auto& response = *result.HttpIncomingResponse->Get();
+
+ auto forwardResponse = std::make_unique<TEvYdbCompute::TEvCpuLoadResponse>();
+
+ const TString& error = response.GetError();
+ if (!error.empty()) {
+ forwardResponse->Issues.AddIssue(error);
+ Send(request->Sender, forwardResponse.release(), 0, request->Cookie);
+ return;
+ }
+
+ try {
+ NJson::TJsonReaderConfig jsonConfig;
+ NJson::TJsonValue info;
+ if (NJson::ReadJsonTree(response.Response->Body, &jsonConfig, &info)) {
+ bool usageFound = false;
+ if (auto* tenantNode = info.GetValueByPath("TenantInfo")) {
+ if (tenantNode->GetType() == NJson::JSON_ARRAY) {
+ for (auto tenantItem : tenantNode->GetArray()) {
+ if (auto* nameNode = tenantItem.GetValueByPath("Name")) {
+ if (nameNode->GetStringSafe() != Database) {
+ continue;
+ }
+ }
+ if (auto* poolNode = tenantItem.GetValueByPath("PoolStats")) {
+ if (poolNode->GetType() == NJson::JSON_ARRAY) {
+ for (auto poolItem : poolNode->GetArray()) {
+ if (auto* nameNode = poolItem.GetValueByPath("Name")) {
+ if (nameNode->GetStringSafe() == "User") {
+ if (auto* usageNode = poolItem.GetValueByPath("Usage")) {
+ forwardResponse->InstantLoad = usageNode->GetDoubleSafe();
+ usageFound = true;
+ break;
+ }
+ if (auto* threadsNode = poolItem.GetValueByPath("Threads")) {
+ forwardResponse->CpuNumber = threadsNode->GetIntegerSafe();
+ }
+ }
+ }
+ }
+ }
+ }
+ if (usageFound) {
+ break;
+ }
+ }
+ }
+ }
+ if (!usageFound) {
+ forwardResponse->Issues.AddIssue(TStringBuilder() << "MISSED User pool node load for database \"" << Database << '"');
+ }
+ } else {
+ forwardResponse->Issues.AddIssue("Malformed JSON");
+ }
+ } catch(const std::exception& e) {
+ forwardResponse->Issues.AddIssue(TStringBuilder() << "Error on JSON parsing: '" << e.what() << "'");
+ }
+
+ if (forwardResponse->Issues) {
+ LOG_E(response.Response->Body);
+ }
+ Send(request->Sender, forwardResponse.release(), 0, request->Cookie);
+ }
+
+private:
+ TString Endpoint;
+ TString Database;
+ TMap<uint64_t, TEvYdbCompute::TEvCpuLoadRequest::TPtr> Requests;
+ NYdb::TCredentialsProviderPtr CredentialsProvider;
+ int64_t Cookie = 0;
+ TActorId HttpProxyId;
+};
+
+std::unique_ptr<NActors::IActor> CreateMonitoringRestClientActor(const TString& endpoint, const TString& database, const NYdb::TCredentialsProviderPtr& credentialsProvider) {
+ return std::make_unique<TMonitoringRestServiceActor>(endpoint, database, credentialsProvider);
+}
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/ya.make b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make
index daedf40cdff..523a26c2bfe 100644
--- a/ydb/core/fq/libs/compute/ydb/control_plane/ya.make
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make
@@ -6,10 +6,12 @@ SRCS(
compute_databases_cache.cpp
database_monitoring.cpp
monitoring_grpc_client_actor.cpp
+ monitoring_rest_client_actor.cpp
ydbcp_grpc_client_actor.cpp
)
PEERDIR(
+ library/cpp/json
ydb/library/actors/core
ydb/library/actors/protos
ydb/core/fq/libs/compute/ydb/synchronization_service
@@ -18,6 +20,8 @@ PEERDIR(
ydb/core/protos
ydb/library/db_pool/protos
ydb/library/yql/public/issue
+ ydb/library/yql/utils
+ ydb/library/yql/utils/actors
ydb/public/api/grpc
ydb/public/api/grpc/draft
ydb/public/lib/operation_id/protos
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h
index 5996774d73f..442ff42e2cf 100644
--- a/ydb/core/fq/libs/compute/ydb/events/events.h
+++ b/ydb/core/fq/libs/compute/ydb/events/events.h
@@ -457,16 +457,17 @@ struct TEvYdbCompute {
};
struct TEvCpuLoadResponse : public NActors::TEventLocal<TEvCpuLoadResponse, EvCpuLoadResponse> {
- TEvCpuLoadResponse(double instantLoad = 0.0, double averageLoad = 0.0)
- : InstantLoad(instantLoad), AverageLoad(averageLoad)
+ TEvCpuLoadResponse(double instantLoad = 0.0, double averageLoad = 0.0, ui32 cpuNumber = 0)
+ : InstantLoad(instantLoad), AverageLoad(averageLoad), CpuNumber(cpuNumber)
{}
TEvCpuLoadResponse(NYql::TIssues issues)
- : InstantLoad(0.0), AverageLoad(0.0), Issues(std::move(issues))
+ : InstantLoad(0.0), AverageLoad(0.0), CpuNumber(0), Issues(std::move(issues))
{}
double InstantLoad;
double AverageLoad;
+ ui32 CpuNumber;
NYql::TIssues Issues;
};
diff --git a/ydb/core/fq/libs/config/protos/compute.proto b/ydb/core/fq/libs/config/protos/compute.proto
index aff608e2773..60bc59125dd 100644
--- a/ydb/core/fq/libs/config/protos/compute.proto
+++ b/ydb/core/fq/libs/config/protos/compute.proto
@@ -26,6 +26,7 @@ message TLoadControlConfig {
uint32 PendingQueueSize = 6; // default 0 == instant decline if overloaded
bool Strict = 7; // default false, whether to deny execution in load level unavailable
uint32 CpuNumber = 8;
+ string MonitoringEndpoint = 9; // if defined, will be used as REST API instead of default GRPC
}
message TComputeDatabaseConfig {