diff options
author | Oleg Doronin <[email protected]> | 2025-08-05 09:16:09 +0200 |
---|---|---|
committer | GitHub <[email protected]> | 2025-08-05 10:16:09 +0300 |
commit | 0c917181943247fc9c0cee5606ae1725dad0af9b (patch) | |
tree | c467329b71d5084076bc560419bb0a687754a292 | |
parent | d1fe2766ba5bff2434c05a76de86e57fbbd0e8ba (diff) |
configuration for default pool has been added (#22307)
7 files changed, 111 insertions, 28 deletions
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 2137b67da9f..9734f0ac0c1 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -1444,7 +1444,7 @@ private: ResourcePoolsCache.UpdateConfig(FeatureFlags, WorkloadManagerConfig, ActorContext()); const auto& databaseId = ev->Get()->GetDatabaseId(); - if (!ResourcePoolsCache.ResourcePoolsEnabled(databaseId) || (ev->Get()->IsInternalCall() && WorkloadManagerConfig.GetEnabled())) { + if (!ResourcePoolsCache.ResourcePoolsEnabled(databaseId) || ev->Get()->IsInternalCall()) { ev->Get()->SetPoolId(""); return true; } diff --git a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp index 1b90c742fc9..cafc0cd1f7d 100644 --- a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp +++ b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp @@ -78,7 +78,7 @@ public: diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, BUILTIN_ACL_ROOT); auto token = MakeIntrusive<NACLib::TUserToken>(BUILTIN_ACL_METADATA, TVector<NACLib::TSID>{}); - Register(CreatePoolCreatorActor(SelfId(), Event->Get()->DatabaseId, Event->Get()->PoolId, NResourcePool::TPoolSettings(), token, diffAcl)); + Register(CreatePoolCreatorActor(SelfId(), Event->Get()->DatabaseId, Event->Get()->PoolId, PoolSettingsFromConfig(WorkloadManagerConfig), token, diffAcl)); } void Handle(TEvPrivate::TEvCreatePoolResponse::TPtr& ev) { @@ -251,15 +251,7 @@ public: void Bootstrap() { if (PoolId == NResourcePool::DEFAULT_POOL_ID) { - NResourcePool::TPoolSettings poolSettings; - poolSettings.ResourceWeight = WorkloadManagerConfig.GetResourceWeight(); - poolSettings.ConcurrentQueryLimit = WorkloadManagerConfig.GetConcurrentQueryLimit(); - poolSettings.QueueSize = WorkloadManagerConfig.GetQueueSize(); - poolSettings.QueryCpuLimitPercentPerNode = WorkloadManagerConfig.GetQueryCpuLimitPercentPerNode(); - poolSettings.QueryMemoryLimitPercentPerNode = WorkloadManagerConfig.GetQueryMemoryLimitPercentPerNode(); - poolSettings.TotalCpuLimitPercentPerNode = WorkloadManagerConfig.GetTotalCpuLimitPercentPerNode(); - poolSettings.DatabaseLoadCpuThreshold = WorkloadManagerConfig.GetDatabaseLoadCpuThreshold(); - Reply(poolSettings); + Reply(PoolSettingsFromConfig(WorkloadManagerConfig)); return; } ReplyError(Ydb::StatusIds::BAD_REQUEST, "Unknown static pool " + PoolId + ", please check the database configuration"); diff --git a/ydb/core/kqp/workload_service/common/helpers.cpp b/ydb/core/kqp/workload_service/common/helpers.cpp index b9ad774c6f8..572cc62ecba 100644 --- a/ydb/core/kqp/workload_service/common/helpers.cpp +++ b/ydb/core/kqp/workload_service/common/helpers.cpp @@ -43,4 +43,36 @@ ui64 SaturationSub(ui64 x, ui64 y) { return (x > y) ? x - y : 0; } +NResourcePool::TPoolSettings PoolSettingsFromConfig(const NKikimrConfig::TWorkloadManagerConfig& workloadManagerConfig) { + NResourcePool::TPoolSettings poolSettings; + if (workloadManagerConfig.HasResourceWeight()) { + poolSettings.ResourceWeight = workloadManagerConfig.GetResourceWeight(); + } + + if (workloadManagerConfig.HasConcurrentQueryLimit()) { + poolSettings.ConcurrentQueryLimit = workloadManagerConfig.GetConcurrentQueryLimit(); + } + + if (workloadManagerConfig.HasQueueSize()) { + poolSettings.QueueSize = workloadManagerConfig.GetQueueSize(); + } + + if (workloadManagerConfig.HasQueryCpuLimitPercentPerNode()) { + poolSettings.QueryCpuLimitPercentPerNode = workloadManagerConfig.GetQueryCpuLimitPercentPerNode(); + } + + if (workloadManagerConfig.HasQueryMemoryLimitPercentPerNode()) { + poolSettings.QueryMemoryLimitPercentPerNode = workloadManagerConfig.GetQueryMemoryLimitPercentPerNode(); + } + + if (workloadManagerConfig.HasTotalCpuLimitPercentPerNode()) { + poolSettings.TotalCpuLimitPercentPerNode = workloadManagerConfig.GetTotalCpuLimitPercentPerNode(); + } + + if (workloadManagerConfig.HasDatabaseLoadCpuThreshold()) { + poolSettings.DatabaseLoadCpuThreshold = workloadManagerConfig.GetDatabaseLoadCpuThreshold(); + } + return poolSettings; +} + } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/common/helpers.h b/ydb/core/kqp/workload_service/common/helpers.h index bd2471bebd1..d09d0897921 100644 --- a/ydb/core/kqp/workload_service/common/helpers.h +++ b/ydb/core/kqp/workload_service/common/helpers.h @@ -2,8 +2,8 @@ #include <library/cpp/retry/retry_policy.h> +#include <ydb/core/protos/workload_manager_config.pb.h> #include <ydb/core/resource_pools/resource_pool_settings.h> - #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/library/actors/core/log.h> @@ -110,4 +110,6 @@ void ParsePoolSettings(const NKikimrSchemeOp::TResourcePoolDescription& descript ui64 SaturationSub(ui64 x, ui64 y); +NResourcePool::TPoolSettings PoolSettingsFromConfig(const NKikimrConfig::TWorkloadManagerConfig& workloadManagerConfig); + } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp index b59bdfc3414..4d9322f692f 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp @@ -491,11 +491,17 @@ public: TPoolStateDescription GetPoolDescription(TDuration leaseDuration = FUTURE_WAIT_TIMEOUT, const TString& poolId = "") const override { const auto& edgeActor = GetRuntime()->AllocateEdgeActor(); - GetRuntime()->Register(CreateRefreshPoolStateActor(edgeActor, CanonizePath(Settings_.DomainName_), poolId ? poolId : Settings_.PoolId_, leaseDuration, GetRuntime()->GetAppData().Counters)); - auto response = GetRuntime()->GrabEdgeEvent<TEvPrivate::TEvRefreshPoolStateResponse>(edgeActor, FUTURE_WAIT_TIMEOUT); - UNIT_ASSERT_VALUES_EQUAL_C(response->Get()->Status, Ydb::StatusIds::SUCCESS, response->Get()->Issues.ToOneLineString()); + TInstant startAt = TInstant::Now(); + while (true) { + GetRuntime()->Register(CreateRefreshPoolStateActor(edgeActor, CanonizePath(Settings_.DomainName_), poolId ? poolId : Settings_.PoolId_, leaseDuration, GetRuntime()->GetAppData().Counters)); + auto response = GetRuntime()->GrabEdgeEvent<TEvPrivate::TEvRefreshPoolStateResponse>(edgeActor, FUTURE_WAIT_TIMEOUT - (TInstant::Now() - startAt)); + if (TInstant::Now() < startAt + FUTURE_WAIT_TIMEOUT && response->Get()->Status != Ydb::StatusIds::SUCCESS) { + continue; + } + UNIT_ASSERT_VALUES_EQUAL_C(response->Get()->Status, Ydb::StatusIds::SUCCESS, response->Get()->Issues.ToOneLineString()); - return response->Get()->PoolState; + return response->Get()->PoolState; + } } void WaitPoolState(const TPoolStateDescription& state, const TString& poolId = "") const override { diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp index 5470191eaa1..6700c9e7dde 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp @@ -225,7 +225,7 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) { auto settings = TQueryRunnerSettings().HangUpDuringExecution(true); auto hangingRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, settings); - ydb->WaitQueryExecution(hangingRequest); + ydb->WaitQueryExecution(hangingRequest); auto delayedRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, settings); TSampleQueries::CheckCancelled(hangingRequest.GetResult()); @@ -296,7 +296,7 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) { // Delay request auto result = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)); - ydb->WaitPoolState({.DelayedRequests = 1, .RunningRequests = 0}); + ydb->WaitPoolState({.DelayedRequests = 0, .RunningRequests = 0}); // Free load ydb->ContinueQueryExecution(result); @@ -533,7 +533,7 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) { // Wait pool change TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query)); // Force pool update - ydb->WaitPoolHandlersCount(2); + ydb->WaitPoolHandlersCount(1); // Check that pool using tables auto hangingRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().HangUpDuringExecution(true)); @@ -561,7 +561,7 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) { // Wait pool change TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query)); // Force pool update - ydb->WaitPoolHandlersCount(2); + ydb->WaitPoolHandlersCount(1); // Check that pool is not using tables auto hangingRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().HangUpDuringExecution(true)); @@ -762,7 +762,7 @@ Y_UNIT_TEST_SUITE(ResourcePoolClassifiersDdl) { } void WaitForSuccess(TIntrusivePtr<IYdbSetup> ydb, const TQueryRunnerSettings& settings) { - ydb->WaitFor(TDuration::Seconds(10), "Resource pool classifier success", [ydb, settings](TString& errorString) { + ydb->WaitFor(TDuration::Seconds(20), "Resource pool classifier success", [ydb, settings](TString& errorString) { auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings); errorString = result.GetIssues().ToOneLineString(); @@ -1656,4 +1656,55 @@ Y_UNIT_TEST_SUITE(ResourcePoolsSysView) { } } +Y_UNIT_TEST_SUITE(DefaultPoolSettings) { + Y_UNIT_TEST(TestResourcePoolsSysViewFilters) { + NKikimrConfig::TWorkloadManagerConfig config; + config.SetConcurrentQueryLimit(10); + config.SetQueueSize(100); + + auto ydb = TYdbSetupSettings() + .CreateSampleTenants(true) + .EnableResourcePoolsOnServerless(true) + .WorkloadManagerConfig(config) + .Create(); + + const auto& dedicatedTenant = ydb->GetSettings().GetDedicatedTenantName(); + + auto settings = TQueryRunnerSettings() + .PoolId("") + .NodeIndex(1); + + { // Check tables + auto result = ydb->ExecuteQuery(R"( + SELECT * FROM `.sys/resource_pools` ORDER BY Name ASC + )", settings.PoolId(NResourcePool::DEFAULT_POOL_ID).Database(dedicatedTenant)); + TSampleQueries::CheckSuccess(result); + + NYdb::TResultSetParser resultSet(result.GetResultSet(0)); + UNIT_ASSERT_C(resultSet.TryNextRow(), "Unexpected row count"); + + auto name = resultSet.ColumnParser("Name").GetOptionalUtf8(); + UNIT_ASSERT_VALUES_EQUAL(*name, "default"); + auto concurrentQueryLimit = resultSet.ColumnParser("ConcurrentQueryLimit").GetOptionalInt32(); + UNIT_ASSERT_VALUES_EQUAL(*concurrentQueryLimit, 10); + auto queueSize = resultSet.ColumnParser("QueueSize").GetOptionalInt32(); + UNIT_ASSERT_VALUES_EQUAL(*queueSize, 100); + auto databaseLoadCpuThreshold = resultSet.ColumnParser("DatabaseLoadCpuThreshold").GetOptionalDouble(); + UNIT_ASSERT_VALUES_EQUAL(*databaseLoadCpuThreshold, -1); + auto resourceWeight = resultSet.ColumnParser("ResourceWeight").GetOptionalDouble(); + UNIT_ASSERT_VALUES_EQUAL(*resourceWeight, -1); + auto totalCpuLimitPercentPerNode = resultSet.ColumnParser("TotalCpuLimitPercentPerNode").GetOptionalDouble(); + UNIT_ASSERT_VALUES_EQUAL(*totalCpuLimitPercentPerNode, -1); + auto queryCpuLimitPercentPerNode = resultSet.ColumnParser("QueryCpuLimitPercentPerNode").GetOptionalDouble(); + UNIT_ASSERT_VALUES_EQUAL(*queryCpuLimitPercentPerNode, -1); + auto queryMemoryLimitPercentPerNode = resultSet.ColumnParser("QueryMemoryLimitPercentPerNode").GetOptionalDouble(); + UNIT_ASSERT_VALUES_EQUAL(*queryMemoryLimitPercentPerNode, -1); + + UNIT_ASSERT_C(!resultSet.TryNextRow(), "Unexpected row count"); + } + } +} + + + } // namespace NKikimr::NKqp diff --git a/ydb/core/protos/workload_manager_config.proto b/ydb/core/protos/workload_manager_config.proto index 67ad00539c6..022b278279b 100644 --- a/ydb/core/protos/workload_manager_config.proto +++ b/ydb/core/protos/workload_manager_config.proto @@ -4,15 +4,15 @@ option java_package = "ru.yandex.kikimr.proto"; message TWorkloadManagerConfig { optional bool Enabled = 1 [default = false]; // Inflight - optional int32 ConcurrentQueryLimit = 2 [default = 5]; - optional int32 QueueSize = 3 [default = 30]; - optional double DatabaseLoadCpuThreshold = 4 [default = 70]; + optional int32 ConcurrentQueryLimit = 2; + optional int32 QueueSize = 3; + optional double DatabaseLoadCpuThreshold = 4; // Cpu - optional double TotalCpuLimitPercentPerNode = 5 [default = 70]; - optional double QueryCpuLimitPercentPerNode = 6 [default = 100]; - optional double ResourceWeight = 7 [default = -1]; + optional double TotalCpuLimitPercentPerNode = 5; + optional double QueryCpuLimitPercentPerNode = 6; + optional double ResourceWeight = 7; // Memory - optional double QueryMemoryLimitPercentPerNode = 8 [default = -1]; + optional double QueryMemoryLimitPercentPerNode = 8; }; |