diff options
author | Hor911 <hor911@ydb.tech> | 2024-01-30 22:31:37 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-30 22:31:37 +0300 |
commit | 27e909566c6ad5df05f0352851fd8c77c7b0205d (patch) | |
tree | b775b8bd569c4c1e167f5e33570d4cc1c3c51577 | |
parent | d39b86338f490599783e2c7e425c359b7bf134c5 (diff) | |
download | ydb-27e909566c6ad5df05f0352851fd8c77c7b0205d.tar.gz |
Limit v2 compute load by CPU (#861)
* Limit v2 compute load by CPU
* Mon fix
* Per db load config + cpu load ajustment (cashback)
* Review fixes
* Clear config etc
11 files changed, 584 insertions, 13 deletions
diff --git a/ydb/core/fq/libs/compute/common/utils.cpp b/ydb/core/fq/libs/compute/common/utils.cpp index d5f22b664f..127bd26f1e 100644 --- a/ydb/core/fq/libs/compute/common/utils.cpp +++ b/ydb/core/fq/libs/compute/common/utils.cpp @@ -338,7 +338,7 @@ void EnumeratePlans(NYson::TYsonWriter& writer, NJson::TJsonValue& value, ui32& } } -TString GetV1StatFromV2Plan(const TString& plan) { +TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage) { TStringStream out; NYson::TYsonWriter writer(&out); writer.OnBeginMap(); @@ -358,6 +358,9 @@ TString GetV1StatFromV2Plan(const TString& plan) { totals.MaxMemoryUsage.Write(writer, "MaxMemoryUsage"); totals.CpuTimeUs.Write(writer, "CpuTimeUs"); totals.SourceCpuTimeUs.Write(writer, "SourceCpuTimeUs"); + if (cpuUsage) { + *cpuUsage = (totals.CpuTimeUs.Sum + totals.SourceCpuTimeUs.Sum) / 1000000.0; + } totals.InputBytes.Write(writer, "InputBytes"); totals.InputRows.Write(writer, "InputRows"); totals.OutputBytes.Write(writer, "OutputBytes"); diff --git a/ydb/core/fq/libs/compute/common/utils.h b/ydb/core/fq/libs/compute/common/utils.h index d2fad6d01f..4a61a45bf6 100644 --- a/ydb/core/fq/libs/compute/common/utils.h +++ b/ydb/core/fq/libs/compute/common/utils.h @@ -24,7 +24,7 @@ inline std::shared_ptr<NYdb::NTable::TTableClient> CreateNewTableClient(const TS tableSettings); } -TString GetV1StatFromV2Plan(const TString& plan); +TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage = nullptr); TString GetV1StatFromV2PlanV2(const TString& plan); TString FormatDurationMs(ui64 durationMs); diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp index bfd5984b56..08382442f6 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp +++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp @@ -38,6 +38,7 @@ struct TDatabaseClients { TActorId ActorId; NConfig::TComputeDatabaseConfig Config; TActorId DatabasesCacheActorId; + TActorId MonitoringActorId; }; std::optional<TClientConfig> GetClient(const TString& scope, const TString& endpoint, const TString& database) const { @@ -306,6 +307,7 @@ public: switch (controlPlane.type_case()) { case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET: case NConfig::TYdbComputeControlPlane::kSingle: + CreateSingleClientActors(controlPlane.GetSingle()); break; case NConfig::TYdbComputeControlPlane::kCms: CreateCmsClientActors(controlPlane.GetCms(), controlPlane.GetDatabasesCacheReloadPeriod()); @@ -317,6 +319,16 @@ public: Become(&TComputeDatabaseControlPlaneServiceActor::StateFunc); } + static NCloud::TGrpcClientSettings CreateGrpcClientSettings(const NConfig::TYdbStorageConfig& connection) { + NCloud::TGrpcClientSettings settings; + settings.Endpoint = connection.GetEndpoint(); + settings.EnableSsl = connection.GetUseSsl(); + if (connection.GetCertificateFile()) { + settings.CertificateRootCA = StripString(TFileInput(connection.GetCertificateFile()).ReadAll()); + } + return settings; + } + static NCloud::TGrpcClientSettings CreateGrpcClientSettings(const NConfig::TComputeDatabaseConfig& config) { NCloud::TGrpcClientSettings settings; const auto& connection = config.GetControlPlaneConnection(); @@ -328,12 +340,29 @@ public: return settings; } + void CreateSingleClientActors(const NConfig::TYdbComputeControlPlane::TSingle& singleConfig) { + auto globalLoadConfig = Config.GetYdb().GetLoadControlConfig(); + if (globalLoadConfig.GetEnable()) { + auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(singleConfig.GetConnection()), CredentialsProviderFactory(GetYdbCredentialSettings(singleConfig.GetConnection()))->CreateProvider()).release()); + MonitoringActorId = Register(CreateDatabaseMonitoringActor(clientActor, globalLoadConfig, Counters).release()); + } + } + void CreateCmsClientActors(const NConfig::TYdbComputeControlPlane::TCms& cmsConfig, const TString& databasesCacheReloadPeriod) { const auto& mapping = cmsConfig.GetDatabaseMapping(); + auto globalLoadConfig = Config.GetYdb().GetLoadControlConfig(); for (const auto& config: mapping.GetCommon()) { const auto clientActor = Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()); const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release()); - Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor}); + TActorId databaseMonitoringActor; + const NConfig::TLoadControlConfig& loadConfig = config.GetLoadControlConfig().GetEnable() + ? Config.GetYdb().GetLoadControlConfig() + : globalLoadConfig; + if (loadConfig.GetEnable()) { + auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()); + databaseMonitoringActor = Register(CreateDatabaseMonitoringActor(clientActor, loadConfig, Counters).release()); + } + Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor, databaseMonitoringActor}); } Y_ABORT_UNLESS(Clients->CommonDatabaseClients); @@ -341,7 +370,15 @@ public: for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) { const auto clientActor = Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()); const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release()); - Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor}; + TActorId databaseMonitoringActor; + const NConfig::TLoadControlConfig& loadConfig = config.GetLoadControlConfig().GetEnable() + ? Config.GetYdb().GetLoadControlConfig() + : globalLoadConfig; + if (loadConfig.GetEnable()) { + auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()); + databaseMonitoringActor = Register(CreateDatabaseMonitoringActor(clientActor, loadConfig, Counters).release()); + } + Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor, databaseMonitoringActor}; } } @@ -350,7 +387,7 @@ public: for (const auto& config: mapping.GetCommon()) { const auto clientActor = Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()); const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release()); - Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor}); + Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor, {}}); } Y_ABORT_UNLESS(Clients->CommonDatabaseClients); @@ -358,12 +395,15 @@ public: for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) { const auto clientActor = Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()); const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release()); - Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor}; + Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor, {}}; } } STRICT_STFUNC(StateFunc, hFunc(TEvYdbCompute::TEvCreateDatabaseRequest, Handle); + hFunc(TEvYdbCompute::TEvCpuLoadRequest, Handle); + hFunc(TEvYdbCompute::TEvCpuQuotaRequest, Handle); + hFunc(TEvYdbCompute::TEvCpuQuotaAdjust, Handle); ) void Handle(TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& ev) { @@ -374,7 +414,38 @@ public: Register(new TCreateDatabaseRequestActor(Clients, SynchronizationServiceActorId, Config, ev)); } + void Handle(TEvYdbCompute::TEvCpuLoadRequest::TPtr& ev) { + auto actorId = GetMonitoringActorIdByScope(ev.Get()->Get()->Scope); + if (actorId != TActorId{}) { + Send(ev->Forward(actorId)); + } else { + Send(ev->Sender, new TEvYdbCompute::TEvCpuLoadResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load monitoring disabled"}}), 0, ev->Cookie); + } + } + + void Handle(TEvYdbCompute::TEvCpuQuotaRequest::TPtr& ev) { + auto actorId = GetMonitoringActorIdByScope(ev.Get()->Get()->Scope); + if (actorId != TActorId{}) { + Send(ev->Forward(actorId)); + } else { + Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(), 0, ev->Cookie); + } + } + + void Handle(TEvYdbCompute::TEvCpuQuotaAdjust::TPtr& ev) { + auto actorId = GetMonitoringActorIdByScope(ev.Get()->Get()->Scope); + if (actorId != TActorId{}) { + Send(ev->Forward(actorId)); + } + } + private: + TActorId GetMonitoringActorIdByScope(const TString& scope) { + return Config.GetYdb().GetControlPlane().HasSingle() + ? MonitoringActorId + : Clients->GetClient(scope).MonitoringActorId; + } + TActorId SynchronizationServiceActorId; NFq::NConfig::TComputeConfig Config; std::shared_ptr<TDatabaseClients> Clients; @@ -383,6 +454,8 @@ private: TYqSharedResources::TPtr YqSharedResources; NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; ::NMonitoring::TDynamicCounterPtr Counters; + TActorId MonitoringClientActorId; + TActorId MonitoringActorId; }; std::unique_ptr<NActors::IActor> CreateComputeDatabaseControlPlaneServiceActor(const NFq::NConfig::TComputeConfig& config, diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h index 6a047019cb..509d72ada9 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h +++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h @@ -28,4 +28,8 @@ std::unique_ptr<NActors::IActor> CreateCmsGrpcClientActor(const NCloud::TGrpcCli std::unique_ptr<NActors::IActor> CreateComputeDatabasesCacheActor(const NActors::TActorId& databaseClientActorId, const TString& databasesCacheReloadPeriod, const ::NMonitoring::TDynamicCounterPtr& counters); +std::unique_ptr<NActors::IActor> CreateMonitoringGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider); + +std::unique_ptr<NActors::IActor> CreateDatabaseMonitoringActor(const NActors::TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters); + } diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp new file mode 100644 index 0000000000..19963aa9fc --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp @@ -0,0 +1,264 @@ +#include <ydb/core/fq/libs/compute/ydb/events/events.h> +#include <ydb/core/fq/libs/control_plane_storage/util.h> + +#include <ydb/library/services/services.pb.h> + +#include <ydb/library/security/ydb_credentials_provider_factory.h> +#include <ydb/library/ycloud/api/events.h> +#include <ydb/library/ycloud/impl/grpc_service_client.h> + +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/event.h> +#include <ydb/library/actors/core/hfunc.h> + +#include <util/generic/queue.h> + +#define LOG_E(stream) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseMonitoring]: " << stream) +#define LOG_W(stream) LOG_WARN_S( *NActors::TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseMonitoring]: " << stream) +#define LOG_I(stream) LOG_INFO_S( *NActors::TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseMonitoring]: " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseMonitoring]: " << stream) +#define LOG_T(stream) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseMonitoring]: " << stream) + +namespace NFq { + +class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComputeDatabaseMonitoringActor> { + struct TCounters { + ::NMonitoring::TDynamicCounterPtr Counters; + struct TCommonMetrics { + ::NMonitoring::TDynamicCounters::TCounterPtr Ok; + ::NMonitoring::TDynamicCounters::TCounterPtr Error; + ::NMonitoring::THistogramPtr LatencyMs; + }; + + TCommonMetrics CpuLoadRequest; + ::NMonitoring::TDynamicCounters::TCounterPtr InstantLoadPercentage; + ::NMonitoring::TDynamicCounters::TCounterPtr AverageLoadPercentage; + ::NMonitoring::TDynamicCounters::TCounterPtr QuotedLoadPercentage; + ::NMonitoring::TDynamicCounters::TCounterPtr PendingQueueSize; + ::NMonitoring::TDynamicCounters::TCounterPtr PendingQueueOverload; + + explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters) + : Counters(counters) + { + Register(); + } + + private: + void Register() { + ::NMonitoring::TDynamicCounterPtr component = Counters->GetSubgroup("component", "ComputeDatabaseMonitoring"); + auto subComponent = component->GetSubgroup("subcomponent", "CpuLoadRequest"); + RegisterCommonMetrics(CpuLoadRequest, subComponent); + InstantLoadPercentage = subComponent->GetCounter("InstantLoadPercentage", false); + AverageLoadPercentage = subComponent->GetCounter("AverageLoadPercentage", false); + QuotedLoadPercentage = subComponent->GetCounter("QuotedLoadPercentage", false); + PendingQueueSize = subComponent->GetCounter("PendingQueueSize", false); + PendingQueueOverload = subComponent->GetCounter("PendingQueueOverload", true); + } + + void RegisterCommonMetrics(TCommonMetrics& metrics, ::NMonitoring::TDynamicCounterPtr subComponent) { + metrics.Ok = subComponent->GetCounter("Ok", true); + metrics.Error = subComponent->GetCounter("Error", true); + metrics.LatencyMs = subComponent->GetHistogram("LatencyMs", GetLatencyHistogramBuckets()); + } + + static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() { + return ::NMonitoring::ExplicitHistogram({0, 1, 2, 5, 10, 20, 50, 100, 500, 1000, 2000, 5000, 10000, 30000, 50000, 500000}); + } + }; + +public: + using TBase = NActors::TActorBootstrapped<TComputeDatabaseMonitoringActor>; + TComputeDatabaseMonitoringActor(const TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters) + : MonitoringClientActorId(monitoringClientActorId) + , Counters(counters) + , MonitoringRequestDelay(GetDuration(config.GetMonitoringRequestDelay(), TDuration::Seconds(1))) + , AverageLoadInterval(std::max<TDuration>(GetDuration(config.GetAverageLoadInterval(), TDuration::Seconds(10)), TDuration::Seconds(1))) + , MaxClusterLoad(std::min<ui32>(config.GetMaxClusterLoadPercentage(), 100) / 100.0) + , DefaultQueryLoad(config.GetDefaultQueryLoadPercentage() ? std::min<ui32>(config.GetDefaultQueryLoadPercentage(), 100) / 100.0 : 0.1) + , PendingQueueSize(config.GetPendingQueueSize()) + , Strict(config.GetStrict()) + , CpuNumber(config.GetCpuNumber()) + {} + + static constexpr char ActorName[] = "FQ_COMPUTE_DATABASE_MONITORING_ACTOR"; + + void Bootstrap() { + LOG_E("Monitoring Bootstrap, client " << MonitoringClientActorId.ToString()); + Become(&TComputeDatabaseMonitoringActor::StateFunc); + SendCpuLoadRequest(); + } + + // these TEvCpuLoadRequest and TEvCpuLoadResponse *are* unrelated, just same events are used + STRICT_STFUNC(StateFunc, + hFunc(TEvYdbCompute::TEvCpuLoadRequest, Handle); + hFunc(TEvYdbCompute::TEvCpuLoadResponse, Handle); + hFunc(TEvYdbCompute::TEvCpuQuotaRequest, Handle); + hFunc(TEvYdbCompute::TEvCpuQuotaAdjust, Handle); + cFunc(NActors::TEvents::TEvWakeup::EventType, SendCpuLoadRequest); + ) + + void Handle(TEvYdbCompute::TEvCpuLoadRequest::TPtr& ev) { + auto response = std::make_unique<TEvYdbCompute::TEvCpuLoadResponse>(InstantLoad, AverageLoad); + if (!Ready) { + response->Issues.AddIssue("CPU Load is unavailable"); + } + Send(ev->Sender, response.release(), 0, ev->Cookie); + } + + void Handle(TEvYdbCompute::TEvCpuLoadResponse::TPtr& ev) { + const auto& response = *ev.Get()->Get(); + + auto now = TInstant::Now(); + if (!response.Issues) { + auto delta = now - LastCpuLoad; + LastCpuLoad = now; + + InstantLoad = response.InstantLoad; + // exponential moving average + if (!Ready || delta >= AverageLoadInterval) { + AverageLoad = InstantLoad; + QuotedLoad = InstantLoad; + } else { + auto ratio = static_cast<double>(delta.GetValue()) / AverageLoadInterval.GetValue(); + AverageLoad = (1 - ratio) * AverageLoad + ratio * InstantLoad; + QuotedLoad = (1 - ratio) * QuotedLoad + ratio * InstantLoad; + } + Ready = true; + Counters.CpuLoadRequest.Ok->Inc(); + *Counters.InstantLoadPercentage = static_cast<ui64>(InstantLoad * 100); + *Counters.AverageLoadPercentage = static_cast<ui64>(AverageLoad * 100); + CheckPendingQueue(); + *Counters.QuotedLoadPercentage = static_cast<ui64>(QuotedLoad * 100); + } else { + LOG_E("CPU Load Request FAILED: " << response.Issues.ToOneLineString()); + Counters.CpuLoadRequest.Error->Inc(); + CheckLoadIsOutdated(); + } + Counters.CpuLoadRequest.LatencyMs->Collect((now - StartCpuLoad).MilliSeconds()); + + // TODO: make load pulling reactive + // 1. Long period (i.e. AverageLoadInterval/2) when idle (no requests) + // 2. Active pulling when busy + + if (MonitoringRequestDelay) { + Schedule(MonitoringRequestDelay, new NActors::TEvents::TEvWakeup()); + } else { + SendCpuLoadRequest(); + } + } + + void Handle(TEvYdbCompute::TEvCpuQuotaRequest::TPtr& ev) { + auto& request = *ev.Get()->Get(); + + if (request.Quota > 1.0) { + Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Incorrect quota value (exceeds 1.0) " << request.Quota}}), 0, ev->Cookie); + } else { + if (!request.Quota) { + request.Quota = DefaultQueryLoad; + } + CheckLoadIsOutdated(); + if (MaxClusterLoad > 0.0 && ((!Ready && Strict) || QuotedLoad >= MaxClusterLoad)) { + if (PendingQueue.size() >= PendingQueueSize) { + Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(NYdb::EStatus::OVERLOADED, NYql::TIssues{ + NYql::TIssue{TStringBuilder{} + << "Cluster is overloaded, current quoted load " << static_cast<ui64>(QuotedLoad * 100) + << "%, average load " << static_cast<ui64>(AverageLoad * 100) << "%" + }}), 0, ev->Cookie); + Counters.PendingQueueOverload->Inc(); + } else { + PendingQueue.push(ev); + Counters.PendingQueueSize->Inc(); + } + } else { + QuotedLoad += request.Quota; + *Counters.QuotedLoadPercentage = static_cast<ui64>(QuotedLoad * 100); + Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(), 0, ev->Cookie); + } + } + } + + void Handle(TEvYdbCompute::TEvCpuQuotaAdjust::TPtr& ev) { + if (CpuNumber) { + auto& request = *ev.Get()->Get(); + if (request.Duration && request.Duration < AverageLoadInterval / 2 && request.Quota <= 1.0) { + auto load = (request.CpuSecondsConsumed * 1000 / request.Duration.MilliSeconds()) / CpuNumber; + auto quota = request.Quota ? request.Quota : DefaultQueryLoad; + if (quota > load) { + auto adjustment = (quota - load) / 2; + if (QuotedLoad > adjustment) { + QuotedLoad -= adjustment; + } else { + QuotedLoad = 0.0; + } + CheckPendingQueue(); + *Counters.QuotedLoadPercentage = static_cast<ui64>(QuotedLoad * 100); + } + } + } + } + + void SendCpuLoadRequest() { + StartCpuLoad = TInstant::Now(); + Send(MonitoringClientActorId, new TEvYdbCompute::TEvCpuLoadRequest("")); + } + + void CheckLoadIsOutdated() { + // TODO: support timeout to decline quota after request pending time is over, not load info + if (TInstant::Now() - LastCpuLoad > AverageLoadInterval) { + Ready = false; + QuotedLoad = 0.0; + if (Strict) { + while (PendingQueue.size()) { + auto& ev = PendingQueue.front(); + Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load info is not available"}}), 0, ev->Cookie); + PendingQueue.pop(); + Counters.PendingQueueSize->Dec(); + } + } + } + } + + void CheckPendingQueue() { + auto now = TInstant::Now(); + while (QuotedLoad < MaxClusterLoad && PendingQueue.size()) { + auto& ev = PendingQueue.front(); + auto& request = *ev.Get()->Get(); + if (request.Deadline && now >= request.Deadline) { + Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(NYdb::EStatus::CANCELLED, NYql::TIssues{ + NYql::TIssue{TStringBuilder{} << "Deadline reached " << request.Deadline}}), 0, ev->Cookie); + } else { + QuotedLoad += request.Quota; + Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(), 0, ev->Cookie); + } + PendingQueue.pop(); + Counters.PendingQueueSize->Dec(); + } + } + +private: + TInstant StartCpuLoad; + TInstant LastCpuLoad; + TActorId MonitoringClientActorId; + TCounters Counters; + + double InstantLoad = 0.0; + double AverageLoad = 0.0; + double QuotedLoad = 0.0; + bool Ready = false; + + const TDuration MonitoringRequestDelay; + const TDuration AverageLoadInterval; + const double MaxClusterLoad; + const double DefaultQueryLoad; + const ui32 PendingQueueSize; + const bool Strict; + const ui32 CpuNumber; + + TQueue<TEvYdbCompute::TEvCpuQuotaRequest::TPtr> PendingQueue; +}; + +std::unique_ptr<NActors::IActor> CreateDatabaseMonitoringActor(const NActors::TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters) { + return std::make_unique<TComputeDatabaseMonitoringActor>(monitoringClientActorId, config, counters); +} + +} diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/monitoring_grpc_client_actor.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/monitoring_grpc_client_actor.cpp new file mode 100644 index 0000000000..9fc7fec3e2 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/control_plane/monitoring_grpc_client_actor.cpp @@ -0,0 +1,131 @@ +#include <ydb/public/api/grpc/ydb_monitoring_v1.grpc.pb.h> + +#include <ydb/core/fq/libs/compute/ydb/events/events.h> +#include <ydb/library/services/services.pb.h> + +#include <ydb/library/security/ydb_credentials_provider_factory.h> +#include <ydb/library/ycloud/api/events.h> +#include <ydb/library/ycloud/impl/grpc_service_client.h> + +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/event.h> +#include <ydb/library/actors/core/hfunc.h> +#include <ydb/library/actors/core/log.h> + +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringGrpcClient]: " << stream) +#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringGrpcClient]: " << stream) +#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringGrpcClient]: " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringGrpcClient]: " << stream) +#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [MonitoringGrpcClient]: " << stream) + +namespace NFq { + +using namespace NActors; + +namespace { + +struct TEvPrivate { + enum EEv { + EvSelfCheckRequest = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvSelfCheckResponse, + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); + + struct TEvSelfCheckRequest : NCloud::TEvGrpcProtoRequest<TEvSelfCheckRequest, EvSelfCheckRequest, Ydb::Monitoring::SelfCheckRequest> {}; + struct TEvSelfCheckResponse : NCloud::TEvGrpcProtoResponse<TEvSelfCheckResponse, EvSelfCheckResponse, Ydb::Monitoring::SelfCheckResponse> {}; +}; + +} + +class TMonitoringGrpcServiceActor : public NActors::TActor<TMonitoringGrpcServiceActor>, TGrpcServiceClient<Ydb::Monitoring::V1::MonitoringService> { +public: + using TBase = NActors::TActor<TMonitoringGrpcServiceActor>; + struct TSelfCheckGrpcRequest : TGrpcRequest { + static constexpr auto Request = &Ydb::Monitoring::V1::MonitoringService::Stub::AsyncSelfCheck; + using TRequestEventType = TEvPrivate::TEvSelfCheckRequest; + using TResponseEventType = TEvPrivate::TEvSelfCheckResponse; + }; + + TMonitoringGrpcServiceActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider) + : TBase(&TMonitoringGrpcServiceActor::StateFunc) + , TGrpcServiceClient(settings) + , Settings(settings) + , CredentialsProvider(credentialsProvider) + {} + + STRICT_STFUNC(StateFunc, + hFunc(TEvYdbCompute::TEvCpuLoadRequest, Handle); + hFunc(TEvPrivate::TEvSelfCheckResponse, Handle); + ) + + void Handle(TEvYdbCompute::TEvCpuLoadRequest::TPtr& ev) { + auto forwardRequest = std::make_unique<TEvPrivate::TEvSelfCheckRequest>(); + forwardRequest->Request.set_return_verbose_status(true); + forwardRequest->Token = CredentialsProvider->GetAuthInfo(); + TEvPrivate::TEvSelfCheckRequest::TPtr forwardEvent = (NActors::TEventHandle<TEvPrivate::TEvSelfCheckRequest>*)new IEventHandle(SelfId(), SelfId(), forwardRequest.release(), 0, Cookie); + MakeCall<TSelfCheckGrpcRequest>(std::move(forwardEvent)); + Requests[Cookie++] = ev; + } + + void Handle(TEvPrivate::TEvSelfCheckResponse::TPtr& ev) { + const auto& status = ev->Get()->Status; + + auto it = Requests.find(ev->Cookie); + if (it == Requests.end()) { + LOG_E("Request doesn't exist (SelfCheckResponse). Need to fix this bug urgently"); + return; + } + auto request = it->second; + Requests.erase(it); + + auto forwardResponse = std::make_unique<TEvYdbCompute::TEvCpuLoadResponse>(); + if (!status.Ok()) { + forwardResponse->Issues.AddIssue("GrpcCode: " + ToString(status.GRpcStatusCode)); + forwardResponse->Issues.AddIssue("Message: " + status.Msg); + forwardResponse->Issues.AddIssue("Details: " + status.Details); + Send(request->Sender, forwardResponse.release(), 0, request->Cookie); + return; + } + + Ydb::Monitoring::SelfCheckResult response; + ev->Get()->Response.operation().result().UnpackTo(&response); + + double totalLoad = 0.0; + double nodeCount = 0; + + for (auto& databaseStatus : response.database_status()) { + for (auto& nodeStatus : databaseStatus.compute().nodes()) { + for (auto& poolStatus : nodeStatus.pools()) { + if (poolStatus.name() == "User") { + totalLoad += poolStatus.usage(); + nodeCount++; + break; + } + } + } + } + + if (nodeCount) { + forwardResponse->InstantLoad = totalLoad / nodeCount; + } else { + forwardResponse->Issues.AddIssue("User pool node load missed"); + } + + Send(request->Sender, forwardResponse.release(), 0, request->Cookie); + } + +private: + NCloud::TGrpcClientSettings Settings; + TMap<uint64_t, TEvYdbCompute::TEvCpuLoadRequest::TPtr> Requests; + NYdb::TCredentialsProviderPtr CredentialsProvider; + int64_t Cookie = 0; +}; + +std::unique_ptr<NActors::IActor> CreateMonitoringGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider) { + return std::make_unique<TMonitoringGrpcServiceActor>(settings, credentialsProvider); +} + +} diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/ya.make b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make index 8367142a69..daedf40cdf 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/ya.make +++ b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make @@ -4,6 +4,8 @@ SRCS( cms_grpc_client_actor.cpp compute_database_control_plane_service.cpp compute_databases_cache.cpp + database_monitoring.cpp + monitoring_grpc_client_actor.cpp ydbcp_grpc_client_actor.cpp ) diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h index 86e7b2ee73..6f5961b754 100644 --- a/ydb/core/fq/libs/compute/ydb/events/events.h +++ b/ydb/core/fq/libs/compute/ydb/events/events.h @@ -58,6 +58,12 @@ struct TEvYdbCompute { EvInvalidateSynchronizationRequest, EvInvalidateSynchronizationResponse, + EvCpuLoadRequest, + EvCpuLoadResponse, + EvCpuQuotaRequest, + EvCpuQuotaResponse, + EvCpuQuotaAdjust, + EvEnd }; @@ -438,6 +444,58 @@ struct TEvYdbCompute { NYql::TIssues Issues; }; + + struct TEvCpuLoadRequest : public NActors::TEventLocal<TEvCpuLoadRequest, EvCpuLoadRequest> { + TEvCpuLoadRequest(const TString& scope) + : Scope(scope) + {} + + TString Scope; + }; + + struct TEvCpuLoadResponse : public NActors::TEventLocal<TEvCpuLoadResponse, EvCpuLoadResponse> { + TEvCpuLoadResponse(double instantLoad = 0.0, double averageLoad = 0.0) + : InstantLoad(instantLoad), AverageLoad(averageLoad) + {} + + TEvCpuLoadResponse(NYql::TIssues issues) + : InstantLoad(0.0), AverageLoad(0.0), Issues(std::move(issues)) + {} + + double InstantLoad; + double AverageLoad; + NYql::TIssues Issues; + }; + + struct TEvCpuQuotaRequest : public NActors::TEventLocal<TEvCpuQuotaRequest, EvCpuQuotaRequest> { + TEvCpuQuotaRequest(const TString& scope, TInstant deadline = TInstant::Zero(), double quota = 0.0) + : Scope(scope), Deadline(deadline), Quota(quota) + {} + + TString Scope; + TInstant Deadline; + double Quota; // if zero, default quota is used + }; + + struct TEvCpuQuotaResponse : public NActors::TEventLocal<TEvCpuQuotaResponse, EvCpuQuotaResponse> { + TEvCpuQuotaResponse(NYdb::EStatus status = NYdb::EStatus::SUCCESS, NYql::TIssues issues = {}) + : Status(status), Issues(std::move(issues)) + {} + + NYdb::EStatus Status; + NYql::TIssues Issues; + }; + + struct TEvCpuQuotaAdjust : public NActors::TEventLocal<TEvCpuQuotaAdjust, EvCpuQuotaAdjust> { + TEvCpuQuotaAdjust(const TString& scope, TDuration duration, double cpuSecondsConsumed, double quota = 0.0) + : Scope(scope), Duration(duration), CpuSecondsConsumed(cpuSecondsConsumed), Quota(quota) + {} + + TString Scope; + TDuration Duration; + double CpuSecondsConsumed; + double Quota; // if zero, default quota is used + }; }; } diff --git a/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp b/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp index 290f49c195..a34ab3d464 100644 --- a/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp @@ -3,6 +3,7 @@ #include <ydb/core/fq/libs/common/util.h> #include <ydb/core/fq/libs/compute/common/metrics.h> #include <ydb/core/fq/libs/compute/common/run_actor_params.h> +#include <ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h> #include <ydb/core/fq/libs/compute/ydb/events/events.h> #include <ydb/core/fq/libs/ydb/ydb.h> #include <ydb/library/services/services.pb.h> @@ -73,12 +74,8 @@ public: void Start() { LOG_I("Start initializer actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status)); if (!Params.RequestStartedAt) { - auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); - pingCounters->InFly->Inc(); Become(&TInitializerActor::StateFunc); - Fq::Private::PingTaskRequest pingTaskRequest; - *pingTaskRequest.mutable_started_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds()); - Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest)); + Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaRequest(Params.Scope.ToString(), Params.Deadline)); } else { LOG_I("Query has been initialized (did nothing)"); Send(Parent, new TEvYdbCompute::TEvInitializerResponse({}, NYdb::EStatus::SUCCESS)); @@ -87,9 +84,24 @@ public: } STRICT_STFUNC(StateFunc, + hFunc(TEvYdbCompute::TEvCpuQuotaResponse, Handle); hFunc(TEvents::TEvForwardPingResponse, Handle); ) + void Handle(TEvYdbCompute::TEvCpuQuotaResponse::TPtr& ev) { + const auto& response = *ev.Get()->Get(); + if (response.Status == NYdb::EStatus::SUCCESS) { + auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); + pingCounters->InFly->Inc(); + Fq::Private::PingTaskRequest pingTaskRequest; + *pingTaskRequest.mutable_started_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds()); + Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest)); + } else { + Send(Parent, new TEvYdbCompute::TEvInitializerResponse(response.Issues, response.Status)); + FailedAndPassAway(); + } + } + void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) { auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); pingCounters->InFly->Dec(); diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp index 2934358859..d6ef6600f0 100644 --- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp @@ -7,6 +7,7 @@ #include <ydb/core/fq/libs/compute/common/run_actor_params.h> #include <ydb/core/fq/libs/compute/common/utils.h> #include <ydb/core/fq/libs/compute/ydb/events/events.h> +#include <ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h> #include <ydb/core/fq/libs/ydb/ydb.h> #include <ydb/core/util/backoff.h> #include <ydb/library/services/services.pb.h> @@ -232,7 +233,12 @@ public: pingTaskRequest.set_pending_status_code(StatusCode); PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast()); try { - pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan())); + TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us()); + double cpuUsage = 0.0; + pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan(), &cpuUsage)); + if (duration && cpuUsage) { + Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage)); + } } catch(const NJson::TJsonException& ex) { LOG_E("Error statistics conversion: " << ex.what()); } @@ -249,7 +255,12 @@ public: pingTaskRequest.set_status(ComputeStatus); PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast()); try { - pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan())); + TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us()); + double cpuUsage = 0.0; + pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan(), &cpuUsage)); + if (duration && cpuUsage) { + Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage)); + } } catch(const NJson::TJsonException& ex) { LOG_E("Error statistics conversion: " << ex.what()); } diff --git a/ydb/core/fq/libs/config/protos/compute.proto b/ydb/core/fq/libs/config/protos/compute.proto index d81fefa721..6084dc6ced 100644 --- a/ydb/core/fq/libs/config/protos/compute.proto +++ b/ydb/core/fq/libs/config/protos/compute.proto @@ -17,10 +17,22 @@ message TSynchronizationService { message TInPlaceCompute { } +message TLoadControlConfig { + bool Enable = 1; + string MonitoringRequestDelay = 2; // default "1s" + string AverageLoadInterval = 3; // default "10s", allowed min "1s" + uint32 MaxClusterLoadPercentage = 4; // default 0 == no load control + uint32 DefaultQueryLoadPercentage = 5; // default 10 + uint32 PendingQueueSize = 6; // default 0 == instant decline if overloaded + bool Strict = 7; // default false, whether to deny execution in load level unavailable + uint32 CpuNumber = 8; +} + message TComputeDatabaseConfig { TYdbStorageConfig ControlPlaneConnection = 1; TYdbStorageConfig ExecutionConnection = 3; string Tenant = 2; + TLoadControlConfig LoadControlConfig = 4; } message TDatabaseMapping { @@ -56,6 +68,7 @@ message TYdbCompute { TYdbComputeControlPlane ControlPlane = 2; TSynchronizationService SynchronizationService = 3; repeated string PinTenantName = 4; + TLoadControlConfig LoadControlConfig = 5; } enum EComputeType { |