summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOleg Doronin <[email protected]>2025-08-05 09:16:09 +0200
committerGitHub <[email protected]>2025-08-05 10:16:09 +0300
commit0c917181943247fc9c0cee5606ae1725dad0af9b (patch)
treec467329b71d5084076bc560419bb0a687754a292
parentd1fe2766ba5bff2434c05a76de86e57fbbd0e8ba (diff)
configuration for default pool has been added (#22307)
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp2
-rw-r--r--ydb/core/kqp/workload_service/actors/scheme_actors.cpp12
-rw-r--r--ydb/core/kqp/workload_service/common/helpers.cpp32
-rw-r--r--ydb/core/kqp/workload_service/common/helpers.h4
-rw-r--r--ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp14
-rw-r--r--ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp61
-rw-r--r--ydb/core/protos/workload_manager_config.proto14
7 files changed, 111 insertions, 28 deletions
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 2137b67da9f..9734f0ac0c1 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -1444,7 +1444,7 @@ private:
ResourcePoolsCache.UpdateConfig(FeatureFlags, WorkloadManagerConfig, ActorContext());
const auto& databaseId = ev->Get()->GetDatabaseId();
- if (!ResourcePoolsCache.ResourcePoolsEnabled(databaseId) || (ev->Get()->IsInternalCall() && WorkloadManagerConfig.GetEnabled())) {
+ if (!ResourcePoolsCache.ResourcePoolsEnabled(databaseId) || ev->Get()->IsInternalCall()) {
ev->Get()->SetPoolId("");
return true;
}
diff --git a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp
index 1b90c742fc9..cafc0cd1f7d 100644
--- a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp
+++ b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp
@@ -78,7 +78,7 @@ public:
diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, BUILTIN_ACL_ROOT);
auto token = MakeIntrusive<NACLib::TUserToken>(BUILTIN_ACL_METADATA, TVector<NACLib::TSID>{});
- Register(CreatePoolCreatorActor(SelfId(), Event->Get()->DatabaseId, Event->Get()->PoolId, NResourcePool::TPoolSettings(), token, diffAcl));
+ Register(CreatePoolCreatorActor(SelfId(), Event->Get()->DatabaseId, Event->Get()->PoolId, PoolSettingsFromConfig(WorkloadManagerConfig), token, diffAcl));
}
void Handle(TEvPrivate::TEvCreatePoolResponse::TPtr& ev) {
@@ -251,15 +251,7 @@ public:
void Bootstrap() {
if (PoolId == NResourcePool::DEFAULT_POOL_ID) {
- NResourcePool::TPoolSettings poolSettings;
- poolSettings.ResourceWeight = WorkloadManagerConfig.GetResourceWeight();
- poolSettings.ConcurrentQueryLimit = WorkloadManagerConfig.GetConcurrentQueryLimit();
- poolSettings.QueueSize = WorkloadManagerConfig.GetQueueSize();
- poolSettings.QueryCpuLimitPercentPerNode = WorkloadManagerConfig.GetQueryCpuLimitPercentPerNode();
- poolSettings.QueryMemoryLimitPercentPerNode = WorkloadManagerConfig.GetQueryMemoryLimitPercentPerNode();
- poolSettings.TotalCpuLimitPercentPerNode = WorkloadManagerConfig.GetTotalCpuLimitPercentPerNode();
- poolSettings.DatabaseLoadCpuThreshold = WorkloadManagerConfig.GetDatabaseLoadCpuThreshold();
- Reply(poolSettings);
+ Reply(PoolSettingsFromConfig(WorkloadManagerConfig));
return;
}
ReplyError(Ydb::StatusIds::BAD_REQUEST, "Unknown static pool " + PoolId + ", please check the database configuration");
diff --git a/ydb/core/kqp/workload_service/common/helpers.cpp b/ydb/core/kqp/workload_service/common/helpers.cpp
index b9ad774c6f8..572cc62ecba 100644
--- a/ydb/core/kqp/workload_service/common/helpers.cpp
+++ b/ydb/core/kqp/workload_service/common/helpers.cpp
@@ -43,4 +43,36 @@ ui64 SaturationSub(ui64 x, ui64 y) {
return (x > y) ? x - y : 0;
}
+NResourcePool::TPoolSettings PoolSettingsFromConfig(const NKikimrConfig::TWorkloadManagerConfig& workloadManagerConfig) {
+ NResourcePool::TPoolSettings poolSettings;
+ if (workloadManagerConfig.HasResourceWeight()) {
+ poolSettings.ResourceWeight = workloadManagerConfig.GetResourceWeight();
+ }
+
+ if (workloadManagerConfig.HasConcurrentQueryLimit()) {
+ poolSettings.ConcurrentQueryLimit = workloadManagerConfig.GetConcurrentQueryLimit();
+ }
+
+ if (workloadManagerConfig.HasQueueSize()) {
+ poolSettings.QueueSize = workloadManagerConfig.GetQueueSize();
+ }
+
+ if (workloadManagerConfig.HasQueryCpuLimitPercentPerNode()) {
+ poolSettings.QueryCpuLimitPercentPerNode = workloadManagerConfig.GetQueryCpuLimitPercentPerNode();
+ }
+
+ if (workloadManagerConfig.HasQueryMemoryLimitPercentPerNode()) {
+ poolSettings.QueryMemoryLimitPercentPerNode = workloadManagerConfig.GetQueryMemoryLimitPercentPerNode();
+ }
+
+ if (workloadManagerConfig.HasTotalCpuLimitPercentPerNode()) {
+ poolSettings.TotalCpuLimitPercentPerNode = workloadManagerConfig.GetTotalCpuLimitPercentPerNode();
+ }
+
+ if (workloadManagerConfig.HasDatabaseLoadCpuThreshold()) {
+ poolSettings.DatabaseLoadCpuThreshold = workloadManagerConfig.GetDatabaseLoadCpuThreshold();
+ }
+ return poolSettings;
+}
+
} // NKikimr::NKqp::NWorkload
diff --git a/ydb/core/kqp/workload_service/common/helpers.h b/ydb/core/kqp/workload_service/common/helpers.h
index bd2471bebd1..d09d0897921 100644
--- a/ydb/core/kqp/workload_service/common/helpers.h
+++ b/ydb/core/kqp/workload_service/common/helpers.h
@@ -2,8 +2,8 @@
#include <library/cpp/retry/retry_policy.h>
+#include <ydb/core/protos/workload_manager_config.pb.h>
#include <ydb/core/resource_pools/resource_pool_settings.h>
-
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/library/actors/core/log.h>
@@ -110,4 +110,6 @@ void ParsePoolSettings(const NKikimrSchemeOp::TResourcePoolDescription& descript
ui64 SaturationSub(ui64 x, ui64 y);
+NResourcePool::TPoolSettings PoolSettingsFromConfig(const NKikimrConfig::TWorkloadManagerConfig& workloadManagerConfig);
+
} // NKikimr::NKqp::NWorkload
diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp
index b59bdfc3414..4d9322f692f 100644
--- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp
+++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp
@@ -491,11 +491,17 @@ public:
TPoolStateDescription GetPoolDescription(TDuration leaseDuration = FUTURE_WAIT_TIMEOUT, const TString& poolId = "") const override {
const auto& edgeActor = GetRuntime()->AllocateEdgeActor();
- GetRuntime()->Register(CreateRefreshPoolStateActor(edgeActor, CanonizePath(Settings_.DomainName_), poolId ? poolId : Settings_.PoolId_, leaseDuration, GetRuntime()->GetAppData().Counters));
- auto response = GetRuntime()->GrabEdgeEvent<TEvPrivate::TEvRefreshPoolStateResponse>(edgeActor, FUTURE_WAIT_TIMEOUT);
- UNIT_ASSERT_VALUES_EQUAL_C(response->Get()->Status, Ydb::StatusIds::SUCCESS, response->Get()->Issues.ToOneLineString());
+ TInstant startAt = TInstant::Now();
+ while (true) {
+ GetRuntime()->Register(CreateRefreshPoolStateActor(edgeActor, CanonizePath(Settings_.DomainName_), poolId ? poolId : Settings_.PoolId_, leaseDuration, GetRuntime()->GetAppData().Counters));
+ auto response = GetRuntime()->GrabEdgeEvent<TEvPrivate::TEvRefreshPoolStateResponse>(edgeActor, FUTURE_WAIT_TIMEOUT - (TInstant::Now() - startAt));
+ if (TInstant::Now() < startAt + FUTURE_WAIT_TIMEOUT && response->Get()->Status != Ydb::StatusIds::SUCCESS) {
+ continue;
+ }
+ UNIT_ASSERT_VALUES_EQUAL_C(response->Get()->Status, Ydb::StatusIds::SUCCESS, response->Get()->Issues.ToOneLineString());
- return response->Get()->PoolState;
+ return response->Get()->PoolState;
+ }
}
void WaitPoolState(const TPoolStateDescription& state, const TString& poolId = "") const override {
diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp
index 5470191eaa1..6700c9e7dde 100644
--- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp
+++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp
@@ -225,7 +225,7 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
auto settings = TQueryRunnerSettings().HangUpDuringExecution(true);
auto hangingRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, settings);
- ydb->WaitQueryExecution(hangingRequest);
+ ydb->WaitQueryExecution(hangingRequest);
auto delayedRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, settings);
TSampleQueries::CheckCancelled(hangingRequest.GetResult());
@@ -296,7 +296,7 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
// Delay request
auto result = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false));
- ydb->WaitPoolState({.DelayedRequests = 1, .RunningRequests = 0});
+ ydb->WaitPoolState({.DelayedRequests = 0, .RunningRequests = 0});
// Free load
ydb->ContinueQueryExecution(result);
@@ -533,7 +533,7 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) {
// Wait pool change
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query)); // Force pool update
- ydb->WaitPoolHandlersCount(2);
+ ydb->WaitPoolHandlersCount(1);
// Check that pool using tables
auto hangingRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().HangUpDuringExecution(true));
@@ -561,7 +561,7 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) {
// Wait pool change
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query)); // Force pool update
- ydb->WaitPoolHandlersCount(2);
+ ydb->WaitPoolHandlersCount(1);
// Check that pool is not using tables
auto hangingRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().HangUpDuringExecution(true));
@@ -762,7 +762,7 @@ Y_UNIT_TEST_SUITE(ResourcePoolClassifiersDdl) {
}
void WaitForSuccess(TIntrusivePtr<IYdbSetup> ydb, const TQueryRunnerSettings& settings) {
- ydb->WaitFor(TDuration::Seconds(10), "Resource pool classifier success", [ydb, settings](TString& errorString) {
+ ydb->WaitFor(TDuration::Seconds(20), "Resource pool classifier success", [ydb, settings](TString& errorString) {
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings);
errorString = result.GetIssues().ToOneLineString();
@@ -1656,4 +1656,55 @@ Y_UNIT_TEST_SUITE(ResourcePoolsSysView) {
}
}
+Y_UNIT_TEST_SUITE(DefaultPoolSettings) {
+ Y_UNIT_TEST(TestResourcePoolsSysViewFilters) {
+ NKikimrConfig::TWorkloadManagerConfig config;
+ config.SetConcurrentQueryLimit(10);
+ config.SetQueueSize(100);
+
+ auto ydb = TYdbSetupSettings()
+ .CreateSampleTenants(true)
+ .EnableResourcePoolsOnServerless(true)
+ .WorkloadManagerConfig(config)
+ .Create();
+
+ const auto& dedicatedTenant = ydb->GetSettings().GetDedicatedTenantName();
+
+ auto settings = TQueryRunnerSettings()
+ .PoolId("")
+ .NodeIndex(1);
+
+ { // Check tables
+ auto result = ydb->ExecuteQuery(R"(
+ SELECT * FROM `.sys/resource_pools` ORDER BY Name ASC
+ )", settings.PoolId(NResourcePool::DEFAULT_POOL_ID).Database(dedicatedTenant));
+ TSampleQueries::CheckSuccess(result);
+
+ NYdb::TResultSetParser resultSet(result.GetResultSet(0));
+ UNIT_ASSERT_C(resultSet.TryNextRow(), "Unexpected row count");
+
+ auto name = resultSet.ColumnParser("Name").GetOptionalUtf8();
+ UNIT_ASSERT_VALUES_EQUAL(*name, "default");
+ auto concurrentQueryLimit = resultSet.ColumnParser("ConcurrentQueryLimit").GetOptionalInt32();
+ UNIT_ASSERT_VALUES_EQUAL(*concurrentQueryLimit, 10);
+ auto queueSize = resultSet.ColumnParser("QueueSize").GetOptionalInt32();
+ UNIT_ASSERT_VALUES_EQUAL(*queueSize, 100);
+ auto databaseLoadCpuThreshold = resultSet.ColumnParser("DatabaseLoadCpuThreshold").GetOptionalDouble();
+ UNIT_ASSERT_VALUES_EQUAL(*databaseLoadCpuThreshold, -1);
+ auto resourceWeight = resultSet.ColumnParser("ResourceWeight").GetOptionalDouble();
+ UNIT_ASSERT_VALUES_EQUAL(*resourceWeight, -1);
+ auto totalCpuLimitPercentPerNode = resultSet.ColumnParser("TotalCpuLimitPercentPerNode").GetOptionalDouble();
+ UNIT_ASSERT_VALUES_EQUAL(*totalCpuLimitPercentPerNode, -1);
+ auto queryCpuLimitPercentPerNode = resultSet.ColumnParser("QueryCpuLimitPercentPerNode").GetOptionalDouble();
+ UNIT_ASSERT_VALUES_EQUAL(*queryCpuLimitPercentPerNode, -1);
+ auto queryMemoryLimitPercentPerNode = resultSet.ColumnParser("QueryMemoryLimitPercentPerNode").GetOptionalDouble();
+ UNIT_ASSERT_VALUES_EQUAL(*queryMemoryLimitPercentPerNode, -1);
+
+ UNIT_ASSERT_C(!resultSet.TryNextRow(), "Unexpected row count");
+ }
+ }
+}
+
+
+
} // namespace NKikimr::NKqp
diff --git a/ydb/core/protos/workload_manager_config.proto b/ydb/core/protos/workload_manager_config.proto
index 67ad00539c6..022b278279b 100644
--- a/ydb/core/protos/workload_manager_config.proto
+++ b/ydb/core/protos/workload_manager_config.proto
@@ -4,15 +4,15 @@ option java_package = "ru.yandex.kikimr.proto";
message TWorkloadManagerConfig {
optional bool Enabled = 1 [default = false];
// Inflight
- optional int32 ConcurrentQueryLimit = 2 [default = 5];
- optional int32 QueueSize = 3 [default = 30];
- optional double DatabaseLoadCpuThreshold = 4 [default = 70];
+ optional int32 ConcurrentQueryLimit = 2;
+ optional int32 QueueSize = 3;
+ optional double DatabaseLoadCpuThreshold = 4;
// Cpu
- optional double TotalCpuLimitPercentPerNode = 5 [default = 70];
- optional double QueryCpuLimitPercentPerNode = 6 [default = 100];
- optional double ResourceWeight = 7 [default = -1];
+ optional double TotalCpuLimitPercentPerNode = 5;
+ optional double QueryCpuLimitPercentPerNode = 6;
+ optional double ResourceWeight = 7;
// Memory
- optional double QueryMemoryLimitPercentPerNode = 8 [default = -1];
+ optional double QueryMemoryLimitPercentPerNode = 8;
};