aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHor911 <hor911@ydb.tech>2024-01-30 22:31:37 +0300
committerGitHub <noreply@github.com>2024-01-30 22:31:37 +0300
commit27e909566c6ad5df05f0352851fd8c77c7b0205d (patch)
treeb775b8bd569c4c1e167f5e33570d4cc1c3c51577
parentd39b86338f490599783e2c7e425c359b7bf134c5 (diff)
downloadydb-27e909566c6ad5df05f0352851fd8c77c7b0205d.tar.gz
Limit v2 compute load by CPU (#861)
* Limit v2 compute load by CPU * Mon fix * Per db load config + cpu load ajustment (cashback) * Review fixes * Clear config etc
-rw-r--r--ydb/core/fq/libs/compute/common/utils.cpp5
-rw-r--r--ydb/core/fq/libs/compute/common/utils.h2
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp81
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h4
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp264
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/monitoring_grpc_client_actor.cpp131
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/ya.make2
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/events.h58
-rw-r--r--ydb/core/fq/libs/compute/ydb/initializer_actor.cpp22
-rw-r--r--ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp15
-rw-r--r--ydb/core/fq/libs/config/protos/compute.proto13
11 files changed, 584 insertions, 13 deletions
diff --git a/ydb/core/fq/libs/compute/common/utils.cpp b/ydb/core/fq/libs/compute/common/utils.cpp
index d5f22b664f..127bd26f1e 100644
--- a/ydb/core/fq/libs/compute/common/utils.cpp
+++ b/ydb/core/fq/libs/compute/common/utils.cpp
@@ -338,7 +338,7 @@ void EnumeratePlans(NYson::TYsonWriter& writer, NJson::TJsonValue& value, ui32&
}
}
-TString GetV1StatFromV2Plan(const TString& plan) {
+TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage) {
TStringStream out;
NYson::TYsonWriter writer(&out);
writer.OnBeginMap();
@@ -358,6 +358,9 @@ TString GetV1StatFromV2Plan(const TString& plan) {
totals.MaxMemoryUsage.Write(writer, "MaxMemoryUsage");
totals.CpuTimeUs.Write(writer, "CpuTimeUs");
totals.SourceCpuTimeUs.Write(writer, "SourceCpuTimeUs");
+ if (cpuUsage) {
+ *cpuUsage = (totals.CpuTimeUs.Sum + totals.SourceCpuTimeUs.Sum) / 1000000.0;
+ }
totals.InputBytes.Write(writer, "InputBytes");
totals.InputRows.Write(writer, "InputRows");
totals.OutputBytes.Write(writer, "OutputBytes");
diff --git a/ydb/core/fq/libs/compute/common/utils.h b/ydb/core/fq/libs/compute/common/utils.h
index d2fad6d01f..4a61a45bf6 100644
--- a/ydb/core/fq/libs/compute/common/utils.h
+++ b/ydb/core/fq/libs/compute/common/utils.h
@@ -24,7 +24,7 @@ inline std::shared_ptr<NYdb::NTable::TTableClient> CreateNewTableClient(const TS
tableSettings);
}
-TString GetV1StatFromV2Plan(const TString& plan);
+TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage = nullptr);
TString GetV1StatFromV2PlanV2(const TString& plan);
TString FormatDurationMs(ui64 durationMs);
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp
index bfd5984b56..08382442f6 100644
--- a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp
@@ -38,6 +38,7 @@ struct TDatabaseClients {
TActorId ActorId;
NConfig::TComputeDatabaseConfig Config;
TActorId DatabasesCacheActorId;
+ TActorId MonitoringActorId;
};
std::optional<TClientConfig> GetClient(const TString& scope, const TString& endpoint, const TString& database) const {
@@ -306,6 +307,7 @@ public:
switch (controlPlane.type_case()) {
case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET:
case NConfig::TYdbComputeControlPlane::kSingle:
+ CreateSingleClientActors(controlPlane.GetSingle());
break;
case NConfig::TYdbComputeControlPlane::kCms:
CreateCmsClientActors(controlPlane.GetCms(), controlPlane.GetDatabasesCacheReloadPeriod());
@@ -317,6 +319,16 @@ public:
Become(&TComputeDatabaseControlPlaneServiceActor::StateFunc);
}
+ static NCloud::TGrpcClientSettings CreateGrpcClientSettings(const NConfig::TYdbStorageConfig& connection) {
+ NCloud::TGrpcClientSettings settings;
+ settings.Endpoint = connection.GetEndpoint();
+ settings.EnableSsl = connection.GetUseSsl();
+ if (connection.GetCertificateFile()) {
+ settings.CertificateRootCA = StripString(TFileInput(connection.GetCertificateFile()).ReadAll());
+ }
+ return settings;
+ }
+
static NCloud::TGrpcClientSettings CreateGrpcClientSettings(const NConfig::TComputeDatabaseConfig& config) {
NCloud::TGrpcClientSettings settings;
const auto& connection = config.GetControlPlaneConnection();
@@ -328,12 +340,29 @@ public:
return settings;
}
+ void CreateSingleClientActors(const NConfig::TYdbComputeControlPlane::TSingle& singleConfig) {
+ auto globalLoadConfig = Config.GetYdb().GetLoadControlConfig();
+ if (globalLoadConfig.GetEnable()) {
+ auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(singleConfig.GetConnection()), CredentialsProviderFactory(GetYdbCredentialSettings(singleConfig.GetConnection()))->CreateProvider()).release());
+ MonitoringActorId = Register(CreateDatabaseMonitoringActor(clientActor, globalLoadConfig, Counters).release());
+ }
+ }
+
void CreateCmsClientActors(const NConfig::TYdbComputeControlPlane::TCms& cmsConfig, const TString& databasesCacheReloadPeriod) {
const auto& mapping = cmsConfig.GetDatabaseMapping();
+ auto globalLoadConfig = Config.GetYdb().GetLoadControlConfig();
for (const auto& config: mapping.GetCommon()) {
const auto clientActor = Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release());
- Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor});
+ TActorId databaseMonitoringActor;
+ const NConfig::TLoadControlConfig& loadConfig = config.GetLoadControlConfig().GetEnable()
+ ? Config.GetYdb().GetLoadControlConfig()
+ : globalLoadConfig;
+ if (loadConfig.GetEnable()) {
+ auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
+ databaseMonitoringActor = Register(CreateDatabaseMonitoringActor(clientActor, loadConfig, Counters).release());
+ }
+ Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor, databaseMonitoringActor});
}
Y_ABORT_UNLESS(Clients->CommonDatabaseClients);
@@ -341,7 +370,15 @@ public:
for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) {
const auto clientActor = Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release());
- Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor};
+ TActorId databaseMonitoringActor;
+ const NConfig::TLoadControlConfig& loadConfig = config.GetLoadControlConfig().GetEnable()
+ ? Config.GetYdb().GetLoadControlConfig()
+ : globalLoadConfig;
+ if (loadConfig.GetEnable()) {
+ auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
+ databaseMonitoringActor = Register(CreateDatabaseMonitoringActor(clientActor, loadConfig, Counters).release());
+ }
+ Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor, databaseMonitoringActor};
}
}
@@ -350,7 +387,7 @@ public:
for (const auto& config: mapping.GetCommon()) {
const auto clientActor = Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release());
- Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor});
+ Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor, {}});
}
Y_ABORT_UNLESS(Clients->CommonDatabaseClients);
@@ -358,12 +395,15 @@ public:
for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) {
const auto clientActor = Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release());
- Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor};
+ Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor, {}};
}
}
STRICT_STFUNC(StateFunc,
hFunc(TEvYdbCompute::TEvCreateDatabaseRequest, Handle);
+ hFunc(TEvYdbCompute::TEvCpuLoadRequest, Handle);
+ hFunc(TEvYdbCompute::TEvCpuQuotaRequest, Handle);
+ hFunc(TEvYdbCompute::TEvCpuQuotaAdjust, Handle);
)
void Handle(TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& ev) {
@@ -374,7 +414,38 @@ public:
Register(new TCreateDatabaseRequestActor(Clients, SynchronizationServiceActorId, Config, ev));
}
+ void Handle(TEvYdbCompute::TEvCpuLoadRequest::TPtr& ev) {
+ auto actorId = GetMonitoringActorIdByScope(ev.Get()->Get()->Scope);
+ if (actorId != TActorId{}) {
+ Send(ev->Forward(actorId));
+ } else {
+ Send(ev->Sender, new TEvYdbCompute::TEvCpuLoadResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load monitoring disabled"}}), 0, ev->Cookie);
+ }
+ }
+
+ void Handle(TEvYdbCompute::TEvCpuQuotaRequest::TPtr& ev) {
+ auto actorId = GetMonitoringActorIdByScope(ev.Get()->Get()->Scope);
+ if (actorId != TActorId{}) {
+ Send(ev->Forward(actorId));
+ } else {
+ Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(), 0, ev->Cookie);
+ }
+ }
+
+ void Handle(TEvYdbCompute::TEvCpuQuotaAdjust::TPtr& ev) {
+ auto actorId = GetMonitoringActorIdByScope(ev.Get()->Get()->Scope);
+ if (actorId != TActorId{}) {
+ Send(ev->Forward(actorId));
+ }
+ }
+
private:
+ TActorId GetMonitoringActorIdByScope(const TString& scope) {
+ return Config.GetYdb().GetControlPlane().HasSingle()
+ ? MonitoringActorId
+ : Clients->GetClient(scope).MonitoringActorId;
+ }
+
TActorId SynchronizationServiceActorId;
NFq::NConfig::TComputeConfig Config;
std::shared_ptr<TDatabaseClients> Clients;
@@ -383,6 +454,8 @@ private:
TYqSharedResources::TPtr YqSharedResources;
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
::NMonitoring::TDynamicCounterPtr Counters;
+ TActorId MonitoringClientActorId;
+ TActorId MonitoringActorId;
};
std::unique_ptr<NActors::IActor> CreateComputeDatabaseControlPlaneServiceActor(const NFq::NConfig::TComputeConfig& config,
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h
index 6a047019cb..509d72ada9 100644
--- a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h
@@ -28,4 +28,8 @@ std::unique_ptr<NActors::IActor> CreateCmsGrpcClientActor(const NCloud::TGrpcCli
std::unique_ptr<NActors::IActor> CreateComputeDatabasesCacheActor(const NActors::TActorId& databaseClientActorId, const TString& databasesCacheReloadPeriod, const ::NMonitoring::TDynamicCounterPtr& counters);
+std::unique_ptr<NActors::IActor> CreateMonitoringGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider);
+
+std::unique_ptr<NActors::IActor> CreateDatabaseMonitoringActor(const NActors::TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters);
+
}
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp
new file mode 100644
index 0000000000..19963aa9fc
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp
@@ -0,0 +1,264 @@
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+#include <ydb/core/fq/libs/control_plane_storage/util.h>
+
+#include <ydb/library/services/services.pb.h>
+
+#include <ydb/library/security/ydb_credentials_provider_factory.h>
+#include <ydb/library/ycloud/api/events.h>
+#include <ydb/library/ycloud/impl/grpc_service_client.h>
+
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/event.h>
+#include <ydb/library/actors/core/hfunc.h>
+
+#include <util/generic/queue.h>
+
+#define LOG_E(stream) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseMonitoring]: " << stream)
+#define LOG_W(stream) LOG_WARN_S( *NActors::TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseMonitoring]: " << stream)
+#define LOG_I(stream) LOG_INFO_S( *NActors::TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseMonitoring]: " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseMonitoring]: " << stream)
+#define LOG_T(stream) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseMonitoring]: " << stream)
+
+namespace NFq {
+
+class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComputeDatabaseMonitoringActor> {
+ struct TCounters {
+ ::NMonitoring::TDynamicCounterPtr Counters;
+ struct TCommonMetrics {
+ ::NMonitoring::TDynamicCounters::TCounterPtr Ok;
+ ::NMonitoring::TDynamicCounters::TCounterPtr Error;
+ ::NMonitoring::THistogramPtr LatencyMs;
+ };
+
+ TCommonMetrics CpuLoadRequest;
+ ::NMonitoring::TDynamicCounters::TCounterPtr InstantLoadPercentage;
+ ::NMonitoring::TDynamicCounters::TCounterPtr AverageLoadPercentage;
+ ::NMonitoring::TDynamicCounters::TCounterPtr QuotedLoadPercentage;
+ ::NMonitoring::TDynamicCounters::TCounterPtr PendingQueueSize;
+ ::NMonitoring::TDynamicCounters::TCounterPtr PendingQueueOverload;
+
+ explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters)
+ : Counters(counters)
+ {
+ Register();
+ }
+
+ private:
+ void Register() {
+ ::NMonitoring::TDynamicCounterPtr component = Counters->GetSubgroup("component", "ComputeDatabaseMonitoring");
+ auto subComponent = component->GetSubgroup("subcomponent", "CpuLoadRequest");
+ RegisterCommonMetrics(CpuLoadRequest, subComponent);
+ InstantLoadPercentage = subComponent->GetCounter("InstantLoadPercentage", false);
+ AverageLoadPercentage = subComponent->GetCounter("AverageLoadPercentage", false);
+ QuotedLoadPercentage = subComponent->GetCounter("QuotedLoadPercentage", false);
+ PendingQueueSize = subComponent->GetCounter("PendingQueueSize", false);
+ PendingQueueOverload = subComponent->GetCounter("PendingQueueOverload", true);
+ }
+
+ void RegisterCommonMetrics(TCommonMetrics& metrics, ::NMonitoring::TDynamicCounterPtr subComponent) {
+ metrics.Ok = subComponent->GetCounter("Ok", true);
+ metrics.Error = subComponent->GetCounter("Error", true);
+ metrics.LatencyMs = subComponent->GetHistogram("LatencyMs", GetLatencyHistogramBuckets());
+ }
+
+ static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() {
+ return ::NMonitoring::ExplicitHistogram({0, 1, 2, 5, 10, 20, 50, 100, 500, 1000, 2000, 5000, 10000, 30000, 50000, 500000});
+ }
+ };
+
+public:
+ using TBase = NActors::TActorBootstrapped<TComputeDatabaseMonitoringActor>;
+ TComputeDatabaseMonitoringActor(const TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters)
+ : MonitoringClientActorId(monitoringClientActorId)
+ , Counters(counters)
+ , MonitoringRequestDelay(GetDuration(config.GetMonitoringRequestDelay(), TDuration::Seconds(1)))
+ , AverageLoadInterval(std::max<TDuration>(GetDuration(config.GetAverageLoadInterval(), TDuration::Seconds(10)), TDuration::Seconds(1)))
+ , MaxClusterLoad(std::min<ui32>(config.GetMaxClusterLoadPercentage(), 100) / 100.0)
+ , DefaultQueryLoad(config.GetDefaultQueryLoadPercentage() ? std::min<ui32>(config.GetDefaultQueryLoadPercentage(), 100) / 100.0 : 0.1)
+ , PendingQueueSize(config.GetPendingQueueSize())
+ , Strict(config.GetStrict())
+ , CpuNumber(config.GetCpuNumber())
+ {}
+
+ static constexpr char ActorName[] = "FQ_COMPUTE_DATABASE_MONITORING_ACTOR";
+
+ void Bootstrap() {
+ LOG_E("Monitoring Bootstrap, client " << MonitoringClientActorId.ToString());
+ Become(&TComputeDatabaseMonitoringActor::StateFunc);
+ SendCpuLoadRequest();
+ }
+
+ // these TEvCpuLoadRequest and TEvCpuLoadResponse *are* unrelated, just same events are used
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvYdbCompute::TEvCpuLoadRequest, Handle);
+ hFunc(TEvYdbCompute::TEvCpuLoadResponse, Handle);
+ hFunc(TEvYdbCompute::TEvCpuQuotaRequest, Handle);
+ hFunc(TEvYdbCompute::TEvCpuQuotaAdjust, Handle);
+ cFunc(NActors::TEvents::TEvWakeup::EventType, SendCpuLoadRequest);
+ )
+
+ void Handle(TEvYdbCompute::TEvCpuLoadRequest::TPtr& ev) {
+ auto response = std::make_unique<TEvYdbCompute::TEvCpuLoadResponse>(InstantLoad, AverageLoad);
+ if (!Ready) {
+ response->Issues.AddIssue("CPU Load is unavailable");
+ }
+ Send(ev->Sender, response.release(), 0, ev->Cookie);
+ }
+
+ void Handle(TEvYdbCompute::TEvCpuLoadResponse::TPtr& ev) {
+ const auto& response = *ev.Get()->Get();
+
+ auto now = TInstant::Now();
+ if (!response.Issues) {
+ auto delta = now - LastCpuLoad;
+ LastCpuLoad = now;
+
+ InstantLoad = response.InstantLoad;
+ // exponential moving average
+ if (!Ready || delta >= AverageLoadInterval) {
+ AverageLoad = InstantLoad;
+ QuotedLoad = InstantLoad;
+ } else {
+ auto ratio = static_cast<double>(delta.GetValue()) / AverageLoadInterval.GetValue();
+ AverageLoad = (1 - ratio) * AverageLoad + ratio * InstantLoad;
+ QuotedLoad = (1 - ratio) * QuotedLoad + ratio * InstantLoad;
+ }
+ Ready = true;
+ Counters.CpuLoadRequest.Ok->Inc();
+ *Counters.InstantLoadPercentage = static_cast<ui64>(InstantLoad * 100);
+ *Counters.AverageLoadPercentage = static_cast<ui64>(AverageLoad * 100);
+ CheckPendingQueue();
+ *Counters.QuotedLoadPercentage = static_cast<ui64>(QuotedLoad * 100);
+ } else {
+ LOG_E("CPU Load Request FAILED: " << response.Issues.ToOneLineString());
+ Counters.CpuLoadRequest.Error->Inc();
+ CheckLoadIsOutdated();
+ }
+ Counters.CpuLoadRequest.LatencyMs->Collect((now - StartCpuLoad).MilliSeconds());
+
+ // TODO: make load pulling reactive
+ // 1. Long period (i.e. AverageLoadInterval/2) when idle (no requests)
+ // 2. Active pulling when busy
+
+ if (MonitoringRequestDelay) {
+ Schedule(MonitoringRequestDelay, new NActors::TEvents::TEvWakeup());
+ } else {
+ SendCpuLoadRequest();
+ }
+ }
+
+ void Handle(TEvYdbCompute::TEvCpuQuotaRequest::TPtr& ev) {
+ auto& request = *ev.Get()->Get();
+
+ if (request.Quota > 1.0) {
+ Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Incorrect quota value (exceeds 1.0) " << request.Quota}}), 0, ev->Cookie);
+ } else {
+ if (!request.Quota) {
+ request.Quota = DefaultQueryLoad;
+ }
+ CheckLoadIsOutdated();
+ if (MaxClusterLoad > 0.0 && ((!Ready && Strict) || QuotedLoad >= MaxClusterLoad)) {
+ if (PendingQueue.size() >= PendingQueueSize) {
+ Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(NYdb::EStatus::OVERLOADED, NYql::TIssues{
+ NYql::TIssue{TStringBuilder{}
+ << "Cluster is overloaded, current quoted load " << static_cast<ui64>(QuotedLoad * 100)
+ << "%, average load " << static_cast<ui64>(AverageLoad * 100) << "%"
+ }}), 0, ev->Cookie);
+ Counters.PendingQueueOverload->Inc();
+ } else {
+ PendingQueue.push(ev);
+ Counters.PendingQueueSize->Inc();
+ }
+ } else {
+ QuotedLoad += request.Quota;
+ *Counters.QuotedLoadPercentage = static_cast<ui64>(QuotedLoad * 100);
+ Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(), 0, ev->Cookie);
+ }
+ }
+ }
+
+ void Handle(TEvYdbCompute::TEvCpuQuotaAdjust::TPtr& ev) {
+ if (CpuNumber) {
+ auto& request = *ev.Get()->Get();
+ if (request.Duration && request.Duration < AverageLoadInterval / 2 && request.Quota <= 1.0) {
+ auto load = (request.CpuSecondsConsumed * 1000 / request.Duration.MilliSeconds()) / CpuNumber;
+ auto quota = request.Quota ? request.Quota : DefaultQueryLoad;
+ if (quota > load) {
+ auto adjustment = (quota - load) / 2;
+ if (QuotedLoad > adjustment) {
+ QuotedLoad -= adjustment;
+ } else {
+ QuotedLoad = 0.0;
+ }
+ CheckPendingQueue();
+ *Counters.QuotedLoadPercentage = static_cast<ui64>(QuotedLoad * 100);
+ }
+ }
+ }
+ }
+
+ void SendCpuLoadRequest() {
+ StartCpuLoad = TInstant::Now();
+ Send(MonitoringClientActorId, new TEvYdbCompute::TEvCpuLoadRequest(""));
+ }
+
+ void CheckLoadIsOutdated() {
+ // TODO: support timeout to decline quota after request pending time is over, not load info
+ if (TInstant::Now() - LastCpuLoad > AverageLoadInterval) {
+ Ready = false;
+ QuotedLoad = 0.0;
+ if (Strict) {
+ while (PendingQueue.size()) {
+ auto& ev = PendingQueue.front();
+ Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load info is not available"}}), 0, ev->Cookie);
+ PendingQueue.pop();
+ Counters.PendingQueueSize->Dec();
+ }
+ }
+ }
+ }
+
+ void CheckPendingQueue() {
+ auto now = TInstant::Now();
+ while (QuotedLoad < MaxClusterLoad && PendingQueue.size()) {
+ auto& ev = PendingQueue.front();
+ auto& request = *ev.Get()->Get();
+ if (request.Deadline && now >= request.Deadline) {
+ Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(NYdb::EStatus::CANCELLED, NYql::TIssues{
+ NYql::TIssue{TStringBuilder{} << "Deadline reached " << request.Deadline}}), 0, ev->Cookie);
+ } else {
+ QuotedLoad += request.Quota;
+ Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(), 0, ev->Cookie);
+ }
+ PendingQueue.pop();
+ Counters.PendingQueueSize->Dec();
+ }
+ }
+
+private:
+ TInstant StartCpuLoad;
+ TInstant LastCpuLoad;
+ TActorId MonitoringClientActorId;
+ TCounters Counters;
+
+ double InstantLoad = 0.0;
+ double AverageLoad = 0.0;
+ double QuotedLoad = 0.0;
+ bool Ready = false;
+
+ const TDuration MonitoringRequestDelay;
+ const TDuration AverageLoadInterval;
+ const double MaxClusterLoad;
+ const double DefaultQueryLoad;
+ const ui32 PendingQueueSize;
+ const bool Strict;
+ const ui32 CpuNumber;
+
+ TQueue<TEvYdbCompute::TEvCpuQuotaRequest::TPtr> PendingQueue;
+};
+
+std::unique_ptr<NActors::IActor> CreateDatabaseMonitoringActor(const NActors::TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters) {
+ return std::make_unique<TComputeDatabaseMonitoringActor>(monitoringClientActorId, config, counters);
+}
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/monitoring_grpc_client_actor.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/monitoring_grpc_client_actor.cpp
new file mode 100644
index 0000000000..9fc7fec3e2
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/monitoring_grpc_client_actor.cpp
@@ -0,0 +1,131 @@
+#include <ydb/public/api/grpc/ydb_monitoring_v1.grpc.pb.h>
+
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+#include <ydb/library/services/services.pb.h>
+
+#include <ydb/library/security/ydb_credentials_provider_factory.h>
+#include <ydb/library/ycloud/api/events.h>
+#include <ydb/library/ycloud/impl/grpc_service_client.h>
+
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/event.h>
+#include <ydb/library/actors/core/hfunc.h>
+#include <ydb/library/actors/core/log.h>
+
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringGrpcClient]: " << stream)
+#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringGrpcClient]: " << stream)
+#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringGrpcClient]: " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringGrpcClient]: " << stream)
+#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringGrpcClient]: " << stream)
+
+namespace NFq {
+
+using namespace NActors;
+
+namespace {
+
+struct TEvPrivate {
+ enum EEv {
+ EvSelfCheckRequest = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+ EvSelfCheckResponse,
+
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
+
+ struct TEvSelfCheckRequest : NCloud::TEvGrpcProtoRequest<TEvSelfCheckRequest, EvSelfCheckRequest, Ydb::Monitoring::SelfCheckRequest> {};
+ struct TEvSelfCheckResponse : NCloud::TEvGrpcProtoResponse<TEvSelfCheckResponse, EvSelfCheckResponse, Ydb::Monitoring::SelfCheckResponse> {};
+};
+
+}
+
+class TMonitoringGrpcServiceActor : public NActors::TActor<TMonitoringGrpcServiceActor>, TGrpcServiceClient<Ydb::Monitoring::V1::MonitoringService> {
+public:
+ using TBase = NActors::TActor<TMonitoringGrpcServiceActor>;
+ struct TSelfCheckGrpcRequest : TGrpcRequest {
+ static constexpr auto Request = &Ydb::Monitoring::V1::MonitoringService::Stub::AsyncSelfCheck;
+ using TRequestEventType = TEvPrivate::TEvSelfCheckRequest;
+ using TResponseEventType = TEvPrivate::TEvSelfCheckResponse;
+ };
+
+ TMonitoringGrpcServiceActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider)
+ : TBase(&TMonitoringGrpcServiceActor::StateFunc)
+ , TGrpcServiceClient(settings)
+ , Settings(settings)
+ , CredentialsProvider(credentialsProvider)
+ {}
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvYdbCompute::TEvCpuLoadRequest, Handle);
+ hFunc(TEvPrivate::TEvSelfCheckResponse, Handle);
+ )
+
+ void Handle(TEvYdbCompute::TEvCpuLoadRequest::TPtr& ev) {
+ auto forwardRequest = std::make_unique<TEvPrivate::TEvSelfCheckRequest>();
+ forwardRequest->Request.set_return_verbose_status(true);
+ forwardRequest->Token = CredentialsProvider->GetAuthInfo();
+ TEvPrivate::TEvSelfCheckRequest::TPtr forwardEvent = (NActors::TEventHandle<TEvPrivate::TEvSelfCheckRequest>*)new IEventHandle(SelfId(), SelfId(), forwardRequest.release(), 0, Cookie);
+ MakeCall<TSelfCheckGrpcRequest>(std::move(forwardEvent));
+ Requests[Cookie++] = ev;
+ }
+
+ void Handle(TEvPrivate::TEvSelfCheckResponse::TPtr& ev) {
+ const auto& status = ev->Get()->Status;
+
+ auto it = Requests.find(ev->Cookie);
+ if (it == Requests.end()) {
+ LOG_E("Request doesn't exist (SelfCheckResponse). Need to fix this bug urgently");
+ return;
+ }
+ auto request = it->second;
+ Requests.erase(it);
+
+ auto forwardResponse = std::make_unique<TEvYdbCompute::TEvCpuLoadResponse>();
+ if (!status.Ok()) {
+ forwardResponse->Issues.AddIssue("GrpcCode: " + ToString(status.GRpcStatusCode));
+ forwardResponse->Issues.AddIssue("Message: " + status.Msg);
+ forwardResponse->Issues.AddIssue("Details: " + status.Details);
+ Send(request->Sender, forwardResponse.release(), 0, request->Cookie);
+ return;
+ }
+
+ Ydb::Monitoring::SelfCheckResult response;
+ ev->Get()->Response.operation().result().UnpackTo(&response);
+
+ double totalLoad = 0.0;
+ double nodeCount = 0;
+
+ for (auto& databaseStatus : response.database_status()) {
+ for (auto& nodeStatus : databaseStatus.compute().nodes()) {
+ for (auto& poolStatus : nodeStatus.pools()) {
+ if (poolStatus.name() == "User") {
+ totalLoad += poolStatus.usage();
+ nodeCount++;
+ break;
+ }
+ }
+ }
+ }
+
+ if (nodeCount) {
+ forwardResponse->InstantLoad = totalLoad / nodeCount;
+ } else {
+ forwardResponse->Issues.AddIssue("User pool node load missed");
+ }
+
+ Send(request->Sender, forwardResponse.release(), 0, request->Cookie);
+ }
+
+private:
+ NCloud::TGrpcClientSettings Settings;
+ TMap<uint64_t, TEvYdbCompute::TEvCpuLoadRequest::TPtr> Requests;
+ NYdb::TCredentialsProviderPtr CredentialsProvider;
+ int64_t Cookie = 0;
+};
+
+std::unique_ptr<NActors::IActor> CreateMonitoringGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider) {
+ return std::make_unique<TMonitoringGrpcServiceActor>(settings, credentialsProvider);
+}
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/ya.make b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make
index 8367142a69..daedf40cdf 100644
--- a/ydb/core/fq/libs/compute/ydb/control_plane/ya.make
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make
@@ -4,6 +4,8 @@ SRCS(
cms_grpc_client_actor.cpp
compute_database_control_plane_service.cpp
compute_databases_cache.cpp
+ database_monitoring.cpp
+ monitoring_grpc_client_actor.cpp
ydbcp_grpc_client_actor.cpp
)
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h
index 86e7b2ee73..6f5961b754 100644
--- a/ydb/core/fq/libs/compute/ydb/events/events.h
+++ b/ydb/core/fq/libs/compute/ydb/events/events.h
@@ -58,6 +58,12 @@ struct TEvYdbCompute {
EvInvalidateSynchronizationRequest,
EvInvalidateSynchronizationResponse,
+ EvCpuLoadRequest,
+ EvCpuLoadResponse,
+ EvCpuQuotaRequest,
+ EvCpuQuotaResponse,
+ EvCpuQuotaAdjust,
+
EvEnd
};
@@ -438,6 +444,58 @@ struct TEvYdbCompute {
NYql::TIssues Issues;
};
+
+ struct TEvCpuLoadRequest : public NActors::TEventLocal<TEvCpuLoadRequest, EvCpuLoadRequest> {
+ TEvCpuLoadRequest(const TString& scope)
+ : Scope(scope)
+ {}
+
+ TString Scope;
+ };
+
+ struct TEvCpuLoadResponse : public NActors::TEventLocal<TEvCpuLoadResponse, EvCpuLoadResponse> {
+ TEvCpuLoadResponse(double instantLoad = 0.0, double averageLoad = 0.0)
+ : InstantLoad(instantLoad), AverageLoad(averageLoad)
+ {}
+
+ TEvCpuLoadResponse(NYql::TIssues issues)
+ : InstantLoad(0.0), AverageLoad(0.0), Issues(std::move(issues))
+ {}
+
+ double InstantLoad;
+ double AverageLoad;
+ NYql::TIssues Issues;
+ };
+
+ struct TEvCpuQuotaRequest : public NActors::TEventLocal<TEvCpuQuotaRequest, EvCpuQuotaRequest> {
+ TEvCpuQuotaRequest(const TString& scope, TInstant deadline = TInstant::Zero(), double quota = 0.0)
+ : Scope(scope), Deadline(deadline), Quota(quota)
+ {}
+
+ TString Scope;
+ TInstant Deadline;
+ double Quota; // if zero, default quota is used
+ };
+
+ struct TEvCpuQuotaResponse : public NActors::TEventLocal<TEvCpuQuotaResponse, EvCpuQuotaResponse> {
+ TEvCpuQuotaResponse(NYdb::EStatus status = NYdb::EStatus::SUCCESS, NYql::TIssues issues = {})
+ : Status(status), Issues(std::move(issues))
+ {}
+
+ NYdb::EStatus Status;
+ NYql::TIssues Issues;
+ };
+
+ struct TEvCpuQuotaAdjust : public NActors::TEventLocal<TEvCpuQuotaAdjust, EvCpuQuotaAdjust> {
+ TEvCpuQuotaAdjust(const TString& scope, TDuration duration, double cpuSecondsConsumed, double quota = 0.0)
+ : Scope(scope), Duration(duration), CpuSecondsConsumed(cpuSecondsConsumed), Quota(quota)
+ {}
+
+ TString Scope;
+ TDuration Duration;
+ double CpuSecondsConsumed;
+ double Quota; // if zero, default quota is used
+ };
};
}
diff --git a/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp b/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp
index 290f49c195..a34ab3d464 100644
--- a/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp
@@ -3,6 +3,7 @@
#include <ydb/core/fq/libs/common/util.h>
#include <ydb/core/fq/libs/compute/common/metrics.h>
#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+#include <ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h>
#include <ydb/core/fq/libs/compute/ydb/events/events.h>
#include <ydb/core/fq/libs/ydb/ydb.h>
#include <ydb/library/services/services.pb.h>
@@ -73,12 +74,8 @@ public:
void Start() {
LOG_I("Start initializer actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status));
if (!Params.RequestStartedAt) {
- auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
- pingCounters->InFly->Inc();
Become(&TInitializerActor::StateFunc);
- Fq::Private::PingTaskRequest pingTaskRequest;
- *pingTaskRequest.mutable_started_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds());
- Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
+ Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaRequest(Params.Scope.ToString(), Params.Deadline));
} else {
LOG_I("Query has been initialized (did nothing)");
Send(Parent, new TEvYdbCompute::TEvInitializerResponse({}, NYdb::EStatus::SUCCESS));
@@ -87,9 +84,24 @@ public:
}
STRICT_STFUNC(StateFunc,
+ hFunc(TEvYdbCompute::TEvCpuQuotaResponse, Handle);
hFunc(TEvents::TEvForwardPingResponse, Handle);
)
+ void Handle(TEvYdbCompute::TEvCpuQuotaResponse::TPtr& ev) {
+ const auto& response = *ev.Get()->Get();
+ if (response.Status == NYdb::EStatus::SUCCESS) {
+ auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
+ pingCounters->InFly->Inc();
+ Fq::Private::PingTaskRequest pingTaskRequest;
+ *pingTaskRequest.mutable_started_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds());
+ Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
+ } else {
+ Send(Parent, new TEvYdbCompute::TEvInitializerResponse(response.Issues, response.Status));
+ FailedAndPassAway();
+ }
+ }
+
void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) {
auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
pingCounters->InFly->Dec();
diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
index 2934358859..d6ef6600f0 100644
--- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
@@ -7,6 +7,7 @@
#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
#include <ydb/core/fq/libs/compute/common/utils.h>
#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+#include <ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h>
#include <ydb/core/fq/libs/ydb/ydb.h>
#include <ydb/core/util/backoff.h>
#include <ydb/library/services/services.pb.h>
@@ -232,7 +233,12 @@ public:
pingTaskRequest.set_pending_status_code(StatusCode);
PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
try {
- pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan()));
+ TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us());
+ double cpuUsage = 0.0;
+ pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan(), &cpuUsage));
+ if (duration && cpuUsage) {
+ Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage));
+ }
} catch(const NJson::TJsonException& ex) {
LOG_E("Error statistics conversion: " << ex.what());
}
@@ -249,7 +255,12 @@ public:
pingTaskRequest.set_status(ComputeStatus);
PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
try {
- pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan()));
+ TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us());
+ double cpuUsage = 0.0;
+ pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan(), &cpuUsage));
+ if (duration && cpuUsage) {
+ Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage));
+ }
} catch(const NJson::TJsonException& ex) {
LOG_E("Error statistics conversion: " << ex.what());
}
diff --git a/ydb/core/fq/libs/config/protos/compute.proto b/ydb/core/fq/libs/config/protos/compute.proto
index d81fefa721..6084dc6ced 100644
--- a/ydb/core/fq/libs/config/protos/compute.proto
+++ b/ydb/core/fq/libs/config/protos/compute.proto
@@ -17,10 +17,22 @@ message TSynchronizationService {
message TInPlaceCompute {
}
+message TLoadControlConfig {
+ bool Enable = 1;
+ string MonitoringRequestDelay = 2; // default "1s"
+ string AverageLoadInterval = 3; // default "10s", allowed min "1s"
+ uint32 MaxClusterLoadPercentage = 4; // default 0 == no load control
+ uint32 DefaultQueryLoadPercentage = 5; // default 10
+ uint32 PendingQueueSize = 6; // default 0 == instant decline if overloaded
+ bool Strict = 7; // default false, whether to deny execution in load level unavailable
+ uint32 CpuNumber = 8;
+}
+
message TComputeDatabaseConfig {
TYdbStorageConfig ControlPlaneConnection = 1;
TYdbStorageConfig ExecutionConnection = 3;
string Tenant = 2;
+ TLoadControlConfig LoadControlConfig = 4;
}
message TDatabaseMapping {
@@ -56,6 +68,7 @@ message TYdbCompute {
TYdbComputeControlPlane ControlPlane = 2;
TSynchronizationService SynchronizationService = 3;
repeated string PinTenantName = 4;
+ TLoadControlConfig LoadControlConfig = 5;
}
enum EComputeType {