diff options
author | Oleg Doronin <dorooleg@yandex.ru> | 2025-02-13 20:31:11 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-13 20:31:11 +0300 |
commit | 09744cf9fbdd1cd31f648b5fabc8a9ed09875e3b (patch) | |
tree | 572c2cb1d453265a96d7702d031f0df0df4a208a | |
parent | 68fb3935e206a006c98b5268401bea04ff5bd475 (diff) | |
download | ydb-09744cf9fbdd1cd31f648b5fabc8a9ed09875e3b.tar.gz |
sys view for resource pool classifiers has been supported YQ-3792 (#14287)
20 files changed, 571 insertions, 27 deletions
diff --git a/ydb/core/grpc_services/rpc_read_columns.cpp b/ydb/core/grpc_services/rpc_read_columns.cpp index 630f720b18..7ec9fa43ea 100644 --- a/ydb/core/grpc_services/rpc_read_columns.cpp +++ b/ydb/core/grpc_services/rpc_read_columns.cpp @@ -321,7 +321,9 @@ private: JoinPath(ResolveNamesResult->ResultSet.front().Path), range, columns, - Request->GetInternalToken()); + Request->GetInternalToken(), + Request->GetDatabaseName().GetOrElse({}), + false); if (!tableScanActor) { return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index 096c6f0a9a..2b90a05716 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -51,7 +51,8 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr TIntrusivePtr<NActors::TProtoArenaHolder> arena, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions, NKikimrConfig::TTableServiceConfig::EBlockTrackingMode, - TIntrusiveConstPtr<NACLib::TUserToken> userToken); + TIntrusiveConstPtr<NACLib::TUserToken> userToken, + const TString& database); IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NYql::NDq::TComputeRuntimeSettings& settings, diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp index 2b8632c8e3..46267030ba 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp @@ -226,7 +226,7 @@ public: } IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(args.ExecuterId, args.TxId, args.Task, AsyncIoFactory, runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings, - std::move(args.SchedulingOptions), args.BlockTrackingMode, std::move(args.UserToken)); + std::move(args.SchedulingOptions), args.BlockTrackingMode, std::move(args.UserToken), args.Database); return args.ShareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) : TlsActivationContext->AsActorContext().Register(computeActor); } diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h index 1439cf5ff3..362c9f0ee4 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h @@ -131,6 +131,7 @@ public: std::shared_ptr<IKqpNodeState> State = nullptr; TComputeActorSchedulingOptions SchedulingOptions = {}; TIntrusiveConstPtr<NACLib::TUserToken> UserToken; + TString Database; }; typedef std::variant<TActorId, NKikimr::NKqp::NRm::TKqpRMAllocateResult> TActorStartResult; diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index d77957ecf1..62fd105ca4 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -16,13 +16,15 @@ TKqpComputeActor::TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqPro NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions schedulingOptions, NKikimrConfig::TTableServiceConfig::EBlockTrackingMode mode, - TIntrusiveConstPtr<NACLib::TUserToken> userToken) + TIntrusiveConstPtr<NACLib::TUserToken> userToken, + const TString& database) : TBase(std::move(schedulingOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings) , ComputeCtx(settings.StatsMode) , FederatedQuerySetup(federatedQuerySetup) , BlockTrackingMode(mode) , ArrayBufferMinFillPercentage(memoryLimits.ArrayBufferMinFillPercentage) , UserToken(std::move(userToken)) + , Database(database) { InitializeTask(); if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) { @@ -89,6 +91,7 @@ void TKqpComputeActor::DoBootstrap() { TSmallVec<NMiniKQL::TKqpScanComputeContext::TColumn> columns; TVector<TSerializedTableRange> ranges; + bool reverse = false; if (Meta) { YQL_ENSURE(ComputeCtx.GetTableScans().empty()); @@ -107,13 +110,14 @@ void TKqpComputeActor::DoBootstrap() { for (auto& range : protoRanges) { ranges.emplace_back(range); } + reverse = Meta->GetReverse(); } if (ScanData) { ScanData->TaskId = GetTask().GetId(); ScanData->TableReader = CreateKqpTableReader(*ScanData); - auto scanActor = NSysView::CreateSystemViewScan(SelfId(), 0, ScanData->TableId, ScanData->TablePath, ranges, columns, UserToken); + auto scanActor = NSysView::CreateSystemViewScan(SelfId(), 0, ScanData->TableId, ScanData->TablePath, ranges, columns, UserToken, Database, reverse); if (!scanActor) { InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder() @@ -289,10 +293,11 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions cpuOptions, NKikimrConfig::TTableServiceConfig::EBlockTrackingMode mode, - TIntrusiveConstPtr<NACLib::TUserToken> userToken) + TIntrusiveConstPtr<NACLib::TUserToken> userToken, + const TString& database) { return new TKqpComputeActor(executerId, txId, task, std::move(asyncIoFactory), - settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings, std::move(cpuOptions), mode, std::move(userToken)); + settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings, std::move(cpuOptions), mode, std::move(userToken), database); } } // namespace NKqp diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h index 8aaf145563..24ec5d8f61 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h @@ -30,7 +30,8 @@ public: NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions, NKikimrConfig::TTableServiceConfig::EBlockTrackingMode mode, - TIntrusiveConstPtr<NACLib::TUserToken> userToken); + TIntrusiveConstPtr<NACLib::TUserToken> userToken, + const TString& database); void DoBootstrap(); @@ -66,6 +67,7 @@ private: const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode; const TMaybe<ui8> ArrayBufferMinFillPercentage; TIntrusiveConstPtr<NACLib::TUserToken> UserToken; + const TString Database; }; } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index ad9d596ae6..7d937c7f40 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -938,9 +938,10 @@ protected: .Columns = BuildKqpColumns(op, tableInfo), }; + auto readSettings = ExtractReadSettings(op, stageInfo, HolderFactory(), TypeEnv()); task.Meta.Reads.ConstructInPlace(); task.Meta.Reads->emplace_back(std::move(readInfo)); - task.Meta.ReadInfo.Reverse = op.GetReadRange().GetReverse(); + task.Meta.ReadInfo.Reverse = readSettings.Reverse; task.Meta.Type = TTaskMeta::TTaskType::Compute; FillSecureParamsFromStage(task.Meta.SecureParams, stage); diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 386f5a4934..592a57590a 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -501,7 +501,8 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize) .ShareMailbox = (computeTasksSize <= 1), .RlPath = Nothing(), .BlockTrackingMode = BlockTrackingMode, - .UserToken = UserToken + .UserToken = UserToken, + .Database = Database }); if (const auto* rmResult = std::get_if<NRm::TKqpRMAllocateResult>(&startResult)) { diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index e2650323f3..eacc6bdb39 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -286,6 +286,8 @@ private: createArgs.UserToken.Reset(MakeIntrusive<NACLib::TUserToken>(msg.GetUserToken())); } + createArgs.Database = msg.GetDatabase(); + auto result = CaFactory_->CreateKqpComputeActor(std::move(createArgs)); if (const auto* rmResult = std::get_if<NRm::TKqpRMAllocateResult>(&result)) { diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp index 76e4fba15b..f05349928d 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp @@ -861,6 +861,279 @@ Y_UNIT_TEST_SUITE(ResourcePoolClassifiersDdl) { WaitForSuccess(ydb, settings.GroupSIDs({firstSID, secondSID})); } + + Y_UNIT_TEST(TestResourcePoolClassifiersSysViewOnServerless) { + auto ydb = TYdbSetupSettings() + .CreateSampleTenants(true) + .EnableResourcePoolsOnServerless(true) + .Create(); + + const auto& serverlessTenant = ydb->GetSettings().GetServerlessTenantName(); + const auto& sharedTenant = ydb->GetSettings().GetSharedTenantName(); + + auto settings = TQueryRunnerSettings() + .PoolId("") + .NodeIndex(1); + + const TString& poolId = "my_pool"; + ydb->ExecuteQueryRetry("Wait TestResourcePoolClassifiersSysViewOnServerless", TStringBuilder() << R"( + CREATE RESOURCE POOL )" << poolId << R"( WITH ( + CONCURRENT_QUERY_LIMIT=1, + QUEUE_SIZE=0 + ); + CREATE RESOURCE POOL CLASSIFIER a_first_classifier WITH ( + RESOURCE_POOL=")" << poolId << R"(", + MEMBER_NAME="staff@builtin", + RANK=1 + ); + CREATE RESOURCE POOL CLASSIFIER b_second_classifier WITH ( + RESOURCE_POOL=")" << NResourcePool::DEFAULT_POOL_ID << R"(", + MEMBER_NAME="boss@builtin", + RANK=2 + ); + )", settings.Database(serverlessTenant)); + + ydb->ExecuteQueryRetry("Wait TestResourcePoolClassifiersSysViewOnServerless", TStringBuilder() << R"( + CREATE RESOURCE POOL )" << poolId << R"( WITH ( + CONCURRENT_QUERY_LIMIT=1, + QUEUE_SIZE=0 + ); + CREATE RESOURCE POOL CLASSIFIER a_first_classifier_shared WITH ( + RESOURCE_POOL=")" << poolId << R"(", + MEMBER_NAME="staff@builtin", + RANK=1 + ); + CREATE RESOURCE POOL CLASSIFIER b_second_classifier_shared WITH ( + RESOURCE_POOL=")" << NResourcePool::DEFAULT_POOL_ID << R"(", + MEMBER_NAME="boss@builtin", + RANK=2 + ); + )", settings.Database(sharedTenant)); + + { // Check tables + auto result = ydb->ExecuteQuery(R"( + SELECT * FROM `.sys/resource_pool_classifiers` ORDER BY Name ASC + )", settings.PoolId(NResourcePool::DEFAULT_POOL_ID).Database(serverlessTenant)); + TSampleQueries::CheckSuccess(result); + + NYdb::TResultSetParser resultSet(result.GetResultSet(0)); + UNIT_ASSERT_C(resultSet.TryNextRow(), "Unexpected row count"); + + auto name = resultSet.ColumnParser("Name").GetOptionalUtf8(); + UNIT_ASSERT_VALUES_EQUAL(name, "a_first_classifier"); + auto rank = resultSet.ColumnParser("Rank").GetOptionalInt64(); + UNIT_ASSERT_VALUES_EQUAL(rank, 1); + auto config = resultSet.ColumnParser("Config").GetOptionalJsonDocument(); + UNIT_ASSERT_VALUES_EQUAL(config, R"({"member_name":"staff@builtin","resource_pool":"my_pool"})"); + + UNIT_ASSERT_C(resultSet.TryNextRow(), "Unexpected row count"); + + name = resultSet.ColumnParser("Name").GetOptionalUtf8(); + UNIT_ASSERT_VALUES_EQUAL(name, "b_second_classifier"); + rank = resultSet.ColumnParser("Rank").GetOptionalInt64(); + UNIT_ASSERT_VALUES_EQUAL(rank, 2); + config = resultSet.ColumnParser("Config").GetOptionalJsonDocument(); + UNIT_ASSERT_VALUES_EQUAL(config, R"({"member_name":"boss@builtin","resource_pool":"default"})"); + + UNIT_ASSERT_C(!resultSet.TryNextRow(), "Unexpected row count"); + } + + { // Check tables + auto result = ydb->ExecuteQuery(R"( + SELECT * FROM `.sys/resource_pool_classifiers` ORDER BY Name ASC + )", settings.PoolId(NResourcePool::DEFAULT_POOL_ID).Database(sharedTenant)); + TSampleQueries::CheckSuccess(result); + + NYdb::TResultSetParser resultSet(result.GetResultSet(0)); + UNIT_ASSERT_C(resultSet.TryNextRow(), "Unexpected row count"); + + auto name = resultSet.ColumnParser("Name").GetOptionalUtf8(); + UNIT_ASSERT_VALUES_EQUAL(name, "a_first_classifier_shared"); + auto rank = resultSet.ColumnParser("Rank").GetOptionalInt64(); + UNIT_ASSERT_VALUES_EQUAL(rank, 1); + auto config = resultSet.ColumnParser("Config").GetOptionalJsonDocument(); + UNIT_ASSERT_VALUES_EQUAL(config, R"({"member_name":"staff@builtin","resource_pool":"my_pool"})"); + + UNIT_ASSERT_C(resultSet.TryNextRow(), "Unexpected row count"); + + name = resultSet.ColumnParser("Name").GetOptionalUtf8(); + UNIT_ASSERT_VALUES_EQUAL(name, "b_second_classifier_shared"); + rank = resultSet.ColumnParser("Rank").GetOptionalInt64(); + UNIT_ASSERT_VALUES_EQUAL(rank, 2); + config = resultSet.ColumnParser("Config").GetOptionalJsonDocument(); + UNIT_ASSERT_VALUES_EQUAL(config, R"({"member_name":"boss@builtin","resource_pool":"default"})"); + + UNIT_ASSERT_C(!resultSet.TryNextRow(), "Unexpected row count"); + } + } + + Y_UNIT_TEST(TestResourcePoolClassifiersSysViewFilters) { + auto ydb = TYdbSetupSettings() + .CreateSampleTenants(true) + .EnableResourcePoolsOnServerless(true) + .Create(); + + const auto& dedicatedTenant = ydb->GetSettings().GetDedicatedTenantName(); + + auto settings = TQueryRunnerSettings() + .PoolId("") + .NodeIndex(1); + + const TString& poolId = "my_pool"; + ydb->ExecuteQueryRetry("Wait TestResourcePoolClassifiersSysViewOnServerless", TStringBuilder() << R"( + CREATE RESOURCE POOL )" << poolId << R"( WITH ( + CONCURRENT_QUERY_LIMIT=1, + QUEUE_SIZE=0 + ); + CREATE RESOURCE POOL CLASSIFIER a WITH ( + RESOURCE_POOL=")" << poolId << R"(", + MEMBER_NAME="staff@builtin", + RANK=1 + ); + CREATE RESOURCE POOL CLASSIFIER b WITH ( + RESOURCE_POOL=")" << NResourcePool::DEFAULT_POOL_ID << R"(", + MEMBER_NAME="boss@builtin", + RANK=2 + ); + CREATE RESOURCE POOL CLASSIFIER c WITH ( + RESOURCE_POOL=")" << NResourcePool::DEFAULT_POOL_ID << R"(", + MEMBER_NAME="super_boss@builtin", + RANK=3 + ); + )", settings.Database(dedicatedTenant)); + + { // Check tables + auto result = ydb->ExecuteQuery(R"( + SELECT * FROM `.sys/resource_pool_classifiers` ORDER BY Name ASC + )", settings.PoolId(NResourcePool::DEFAULT_POOL_ID).Database(dedicatedTenant)); + TSampleQueries::CheckSuccess(result); + + NYdb::TResultSetParser resultSet(result.GetResultSet(0)); + UNIT_ASSERT_C(resultSet.TryNextRow(), "Unexpected row count"); + + auto name = resultSet.ColumnParser("Name").GetOptionalUtf8(); + UNIT_ASSERT_VALUES_EQUAL(name, "a"); + auto rank = resultSet.ColumnParser("Rank").GetOptionalInt64(); + UNIT_ASSERT_VALUES_EQUAL(rank, 1); + auto config = resultSet.ColumnParser("Config").GetOptionalJsonDocument(); + UNIT_ASSERT_VALUES_EQUAL(config, R"({"member_name":"staff@builtin","resource_pool":"my_pool"})"); + + UNIT_ASSERT_C(resultSet.TryNextRow(), "Unexpected row count"); + + name = resultSet.ColumnParser("Name").GetOptionalUtf8(); + UNIT_ASSERT_VALUES_EQUAL(name, "b"); + rank = resultSet.ColumnParser("Rank").GetOptionalInt64(); + UNIT_ASSERT_VALUES_EQUAL(rank, 2); + config = resultSet.ColumnParser("Config").GetOptionalJsonDocument(); + UNIT_ASSERT_VALUES_EQUAL(config, R"({"member_name":"boss@builtin","resource_pool":"default"})"); + + UNIT_ASSERT_C(resultSet.TryNextRow(), "Unexpected row count"); + + name = resultSet.ColumnParser("Name").GetOptionalUtf8(); + UNIT_ASSERT_VALUES_EQUAL(name, "c"); + rank = resultSet.ColumnParser("Rank").GetOptionalInt64(); + UNIT_ASSERT_VALUES_EQUAL(rank, 3); + config = resultSet.ColumnParser("Config").GetOptionalJsonDocument(); + UNIT_ASSERT_VALUES_EQUAL(config, R"({"member_name":"super_boss@builtin","resource_pool":"default"})"); + + UNIT_ASSERT_C(!resultSet.TryNextRow(), "Unexpected row count"); + } + + { // Check tables + auto result = ydb->ExecuteQuery(R"( + SELECT * FROM `.sys/resource_pool_classifiers` ORDER BY Name DESC + )", settings.PoolId(NResourcePool::DEFAULT_POOL_ID).Database(dedicatedTenant)); + TSampleQueries::CheckSuccess(result); + + NYdb::TResultSetParser resultSet(result.GetResultSet(0)); + UNIT_ASSERT_C(resultSet.TryNextRow(), "Unexpected row count"); + + auto name = resultSet.ColumnParser("Name").GetOptionalUtf8(); + UNIT_ASSERT_VALUES_EQUAL(name, "c"); + auto rank = resultSet.ColumnParser("Rank").GetOptionalInt64(); + UNIT_ASSERT_VALUES_EQUAL(rank, 3); + auto config = resultSet.ColumnParser("Config").GetOptionalJsonDocument(); + UNIT_ASSERT_VALUES_EQUAL(config, R"({"member_name":"super_boss@builtin","resource_pool":"default"})"); + + UNIT_ASSERT_C(resultSet.TryNextRow(), "Unexpected row count"); + + name = resultSet.ColumnParser("Name").GetOptionalUtf8(); + UNIT_ASSERT_VALUES_EQUAL(name, "b"); + rank = resultSet.ColumnParser("Rank").GetOptionalInt64(); + UNIT_ASSERT_VALUES_EQUAL(rank, 2); + config = resultSet.ColumnParser("Config").GetOptionalJsonDocument(); + UNIT_ASSERT_VALUES_EQUAL(config, R"({"member_name":"boss@builtin","resource_pool":"default"})"); + + + UNIT_ASSERT_C(resultSet.TryNextRow(), "Unexpected row count"); + + name = resultSet.ColumnParser("Name").GetOptionalUtf8(); + UNIT_ASSERT_VALUES_EQUAL(name, "a"); + rank = resultSet.ColumnParser("Rank").GetOptionalInt64(); + UNIT_ASSERT_VALUES_EQUAL(rank, 1); + config = resultSet.ColumnParser("Config").GetOptionalJsonDocument(); + UNIT_ASSERT_VALUES_EQUAL(config, R"({"member_name":"staff@builtin","resource_pool":"my_pool"})"); + + UNIT_ASSERT_C(!resultSet.TryNextRow(), "Unexpected row count"); + } + + { // Check tables + auto result = ydb->ExecuteQuery(R"( + SELECT * FROM `.sys/resource_pool_classifiers` WHERE Name <= "a" + )", settings.PoolId(NResourcePool::DEFAULT_POOL_ID).Database(dedicatedTenant)); + TSampleQueries::CheckSuccess(result); + + NYdb::TResultSetParser resultSet(result.GetResultSet(0)); + UNIT_ASSERT_C(resultSet.TryNextRow(), "Unexpected row count"); + + auto name = resultSet.ColumnParser("Name").GetOptionalUtf8(); + UNIT_ASSERT_VALUES_EQUAL(name, "a"); + auto rank = resultSet.ColumnParser("Rank").GetOptionalInt64(); + UNIT_ASSERT_VALUES_EQUAL(rank, 1); + auto config = resultSet.ColumnParser("Config").GetOptionalJsonDocument(); + UNIT_ASSERT_VALUES_EQUAL(config, R"({"member_name":"staff@builtin","resource_pool":"my_pool"})"); + + UNIT_ASSERT_C(!resultSet.TryNextRow(), "Unexpected row count"); + } + + { // Check tables + auto result = ydb->ExecuteQuery(R"( + SELECT * FROM `.sys/resource_pool_classifiers` WHERE "a" < Name AND Name < "c" + )", settings.PoolId(NResourcePool::DEFAULT_POOL_ID).Database(dedicatedTenant)); + TSampleQueries::CheckSuccess(result); + + NYdb::TResultSetParser resultSet(result.GetResultSet(0)); + UNIT_ASSERT_C(resultSet.TryNextRow(), "Unexpected row count"); + + auto name = resultSet.ColumnParser("Name").GetOptionalUtf8(); + UNIT_ASSERT_VALUES_EQUAL(name, "b"); + auto rank = resultSet.ColumnParser("Rank").GetOptionalInt64(); + UNIT_ASSERT_VALUES_EQUAL(rank, 2); + auto config = resultSet.ColumnParser("Config").GetOptionalJsonDocument(); + UNIT_ASSERT_VALUES_EQUAL(config, R"({"member_name":"boss@builtin","resource_pool":"default"})"); + + UNIT_ASSERT_C(!resultSet.TryNextRow(), "Unexpected row count"); + } + + { // Check tables + auto result = ydb->ExecuteQuery(R"( + SELECT * FROM `.sys/resource_pool_classifiers` WHERE Name >= "c" + )", settings.PoolId(NResourcePool::DEFAULT_POOL_ID).Database(dedicatedTenant)); + TSampleQueries::CheckSuccess(result); + + NYdb::TResultSetParser resultSet(result.GetResultSet(0)); + UNIT_ASSERT_C(resultSet.TryNextRow(), "Unexpected row count"); + + auto name = resultSet.ColumnParser("Name").GetOptionalUtf8(); + UNIT_ASSERT_VALUES_EQUAL(name, "c"); + auto rank = resultSet.ColumnParser("Rank").GetOptionalInt64(); + UNIT_ASSERT_VALUES_EQUAL(rank, 3); + auto config = resultSet.ColumnParser("Config").GetOptionalJsonDocument(); + UNIT_ASSERT_VALUES_EQUAL(config, R"({"member_name":"super_boss@builtin","resource_pool":"default"})"); + + UNIT_ASSERT_C(!resultSet.TryNextRow(), "Unexpected row count"); + } + } } } // namespace NKikimr::NKqp diff --git a/ydb/core/sys_view/common/schema.cpp b/ydb/core/sys_view/common/schema.cpp index 677cc90b0c..6ecbe10138 100644 --- a/ydb/core/sys_view/common/schema.cpp +++ b/ydb/core/sys_view/common/schema.cpp @@ -287,6 +287,8 @@ private: RegisterPgTablesSystemViews(); + RegisterSystemView<Schema::ResourcePoolClassifiers>(ResourcePoolClassifiersName); + { using namespace NAuth; RegisterSystemView<Schema::AuthUsers>(UsersName); diff --git a/ydb/core/sys_view/common/schema.h b/ydb/core/sys_view/common/schema.h index 8710a197a0..ddcfcab7b7 100644 --- a/ydb/core/sys_view/common/schema.h +++ b/ydb/core/sys_view/common/schema.h @@ -49,6 +49,8 @@ constexpr TStringBuf PgTablesName = "pg_tables"; constexpr TStringBuf InformationSchemaTablesName = "tables"; constexpr TStringBuf PgClassName = "pg_class"; +constexpr TStringBuf ResourcePoolClassifiersName = "resource_pool_classifiers"; + namespace NAuth { constexpr TStringBuf UsersName = "auth_users"; constexpr TStringBuf GroupsName = "auth_groups"; @@ -701,6 +703,18 @@ struct Schema : NIceDb::Schema { private: std::unordered_map<TString, TVector<PgColumn>> columnsStorage; }; + + struct ResourcePoolClassifiers : Table<20> { + struct Name : Column<1, NScheme::NTypeIds::Utf8> {}; + struct Rank : Column<2, NScheme::NTypeIds::Int64> {}; + struct Config : Column<3, NScheme::NTypeIds::JsonDocument> {}; + + using TKey = TableKey<Name>; + using TColumns = TableColumns< + Name, + Rank, + Config>; + }; }; bool MaybeSystemViewPath(const TVector<TString>& path); diff --git a/ydb/core/sys_view/resource_pool_classifiers/resource_pool_classifiers.cpp b/ydb/core/sys_view/resource_pool_classifiers/resource_pool_classifiers.cpp new file mode 100644 index 0000000000..a7852e1f92 --- /dev/null +++ b/ydb/core/sys_view/resource_pool_classifiers/resource_pool_classifiers.cpp @@ -0,0 +1,189 @@ +#include "resource_pool_classifiers.h" + +#include <ydb/core/kqp/gateway/behaviour/resource_pool_classifier/fetcher.h> +#include <ydb/core/kqp/gateway/behaviour/resource_pool_classifier/snapshot.h> +#include <ydb/core/kqp/workload_service/actors/actors.h> +#include <ydb/core/node_whiteboard/node_whiteboard.h> +#include <ydb/core/sys_view/common/events.h> +#include <ydb/core/sys_view/common/scan_actor_base_impl.h> +#include <ydb/core/sys_view/common/schema.h> +#include <ydb/services/metadata/service.h> + +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> + +#include <ydb/library/actors/core/hfunc.h> +#include <ydb/library/actors/core/interconnect.h> +#include <ydb/library/actors/interconnect/interconnect.h> + +#include <yql/essentials/types/binary_json/read.h> +#include <yql/essentials/types/binary_json/write.h> + +namespace NKikimr { +namespace NSysView { + +using namespace NActors; +using namespace NNodeWhiteboard; + +class TResourcePoolClassifiersScan : public TScanActorBase<TResourcePoolClassifiersScan> { +public: + using TBase = TScanActorBase<TResourcePoolClassifiersScan>; + + static constexpr auto ActorActivityType() { + return NKikimrServices::TActivity::KQP_SYSTEM_VIEW_SCAN; + } + + TResourcePoolClassifiersScan(const NActors::TActorId& ownerId, ui32 scanId, const TTableId& tableId, + const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns, + TIntrusiveConstPtr<NACLib::TUserToken> userToken, const TString& database, bool reverse) + : TBase(ownerId, scanId, tableId, tableRange, columns) + , UserToken(std::move(userToken)) + , Database(database) + , Reverse(reverse) + { + const auto& cellsFrom = TableRange.From.GetCells(); + if (cellsFrom.size() == 1 && !cellsFrom[0].IsNull()) { + From = TString{cellsFrom[0].Data(), cellsFrom[0].Size()}; + } + + const auto& cellsTo = TableRange.To.GetCells(); + if (cellsTo.size() == 1 && !cellsTo[0].IsNull()) { + To = TString{cellsTo[0].Data(), cellsTo[0].Size()}; + } + } + + STFUNC(StateScan) { + try { + switch (ev->GetTypeRewrite()) { + hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, Handle); + hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle) + hFunc(NKqp::NWorkload::TEvFetchDatabaseResponse, Handle); + hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleAbortExecution); + cFunc(TEvents::TEvWakeup::EventType, HandleTimeout); + cFunc(TEvents::TEvPoison::EventType, PassAway); + default: + LOG_CRIT(*TlsActivationContext, NKikimrServices::SYSTEM_VIEWS, + "NSysView::TResourcePoolClassifiersScan: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); + } + } catch (...) { + LOG_CRIT(*TlsActivationContext, NKikimrServices::SYSTEM_VIEWS, + "NSysView::TResourcePoolClassifiersScan: with exception %s", CurrentExceptionMessage().c_str()); + ReplyErrorAndDie(Ydb::StatusIds::INTERNAL_ERROR, CurrentExceptionMessage()); + } + } + +private: + void ProceedToScan() override { + Become(&TResourcePoolClassifiersScan::StateScan); + if (AckReceived) { + StartScan(); + } + } + + void StartScan() { + if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) { + ReplyEmptyAndDie(); + } + Register(NKqp::NWorkload::CreateDatabaseFetcherActor(SelfId(), Database, UserToken, NACLib::EAccessRights::GenericFull)); + } + + void Handle(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr&) { + StartScan(); + } + + void Handle(NKqp::NWorkload::TEvFetchDatabaseResponse::TPtr& ev) { + auto& event = *ev->Get(); + if (event.Status != Ydb::StatusIds::SUCCESS) { + ReplyErrorAndDie(event.Status, event.Issues.ToOneLineString()); + return; + } + Database = event.DatabaseId; + Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvAskSnapshot(std::make_shared<NKqp::TResourcePoolClassifierSnapshotsFetcher>())); + } + + void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) { + using TExtractor = std::function<TCell(const NKqp::TResourcePoolClassifierConfig&, TVector<TBuffer>&)>; + using TSchema = Schema::ResourcePoolClassifiers; + + struct TExtractorsMap : public THashMap<NTable::TTag, TExtractor> { + TExtractorsMap() { + insert({TSchema::Name::ColumnId, [] (const NKqp::TResourcePoolClassifierConfig& config, TVector<TBuffer>&) { + return TCell(config.GetName().data(), config.GetName().size()); + }}); + insert({TSchema::Rank::ColumnId, [] (const NKqp::TResourcePoolClassifierConfig& config, TVector<TBuffer>&) { + return TCell::Make<i64>(config.GetRank()); + }}); + insert({TSchema::Config::ColumnId, [] (const NKqp::TResourcePoolClassifierConfig& config, TVector<TBuffer>& holder) { + TStringStream str; + NJson::WriteJson(&str, &config.GetConfigJson(), NJson::TJsonWriterConfig{}); + const auto maybeBinaryJson = NBinaryJson::SerializeToBinaryJson(str.Str()); + if (std::holds_alternative<TString>(maybeBinaryJson)) { + ythrow yexception() << "Can't serialize binary json value: " << std::get<TString>(maybeBinaryJson); + } + holder.emplace_back(std::move(std::get<NBinaryJson::TBinaryJson>(maybeBinaryJson))); + return TCell(holder.back().Data(), holder.back().Size()); + }}); + } + }; + static TExtractorsMap extractors; + + const auto& snapshot = ev->Get()->GetSnapshotAs<NKqp::TResourcePoolClassifierSnapshot>(); + const auto& config = snapshot->GetResourcePoolClassifierConfigs(); + auto resourcePoolsIt = config.find(Database); + if (resourcePoolsIt == config.end()) { + ReplyEmptyAndDie(); + return; + } + + auto batch = MakeHolder<NKqp::TEvKqpCompute::TEvScanData>(ScanId); + batch->Finished = true; + // It's a mandatory condition to keep sorted PK here + for (const auto& [name, config] : std::map(resourcePoolsIt->second.begin(), resourcePoolsIt->second.end())) { + if (!IsInRange(name)) { + continue; + } + TVector<TCell> cells; + TVector<TBuffer> holder; + for (auto column : Columns) { + auto extractor = extractors.find(column.Tag); + if (extractor == extractors.end()) { + cells.push_back(TCell()); + } else { + cells.push_back(extractor->second(config, holder)); + } + } + TArrayRef<const TCell> ref(cells); + batch->Rows.emplace_back(TOwnedCellVec::Make(ref)); + } + if (Reverse) { + std::reverse(batch->Rows.begin(), batch->Rows.end()); + } + SendBatch(std::move(batch)); + } + + bool IsInRange(const TString& name) const { + if ((From && name < From) || (!TableRange.FromInclusive && From && name == From)) { + return false; + } + if ((To && To < name) || (!TableRange.ToInclusive && To && name == To)) { + return false; + } + return true; + } + +private: + TMaybe<TString> From; + TMaybe<TString> To; + const TIntrusiveConstPtr<NACLib::TUserToken> UserToken; + TString Database; + const bool Reverse; +}; + +THolder<NActors::IActor> CreateResourcePoolClassifiersScan(const NActors::TActorId& ownerId, ui32 scanId, const TTableId& tableId, + const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns, + TIntrusiveConstPtr<NACLib::TUserToken> userToken, const TString& database, bool reverse) +{ + return MakeHolder<TResourcePoolClassifiersScan>(ownerId, scanId, tableId, tableRange, columns, std::move(userToken), database, reverse); +} + +} // NSysView +} // NKikimr diff --git a/ydb/core/sys_view/resource_pool_classifiers/resource_pool_classifiers.h b/ydb/core/sys_view/resource_pool_classifiers/resource_pool_classifiers.h new file mode 100644 index 0000000000..16100446aa --- /dev/null +++ b/ydb/core/sys_view/resource_pool_classifiers/resource_pool_classifiers.h @@ -0,0 +1,16 @@ +#pragma once + +#include <ydb/core/kqp/runtime/kqp_compute.h> + +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/actorid.h> + +namespace NKikimr { +namespace NSysView { + +THolder<NActors::IActor> CreateResourcePoolClassifiersScan(const NActors::TActorId& ownerId, ui32 scanId, const TTableId& tableId, + const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns, + TIntrusiveConstPtr<NACLib::TUserToken> userToken, const TString& database, bool reverse); + +} // NSysView +} // NKikimr diff --git a/ydb/core/sys_view/resource_pool_classifiers/ya.make b/ydb/core/sys_view/resource_pool_classifiers/ya.make new file mode 100644 index 0000000000..4188be7fb4 --- /dev/null +++ b/ydb/core/sys_view/resource_pool_classifiers/ya.make @@ -0,0 +1,17 @@ +LIBRARY() + +SRCS( + resource_pool_classifiers.h + resource_pool_classifiers.cpp +) + +PEERDIR( + ydb/library/actors/core + ydb/core/base + ydb/core/kqp/runtime + ydb/core/sys_view/common +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/sys_view/scan.cpp b/ydb/core/sys_view/scan.cpp index 3be9914ee9..de4322b251 100644 --- a/ydb/core/sys_view/scan.cpp +++ b/ydb/core/sys_view/scan.cpp @@ -8,19 +8,20 @@ #include <ydb/core/sys_view/auth/permissions.h> #include <ydb/core/sys_view/auth/users.h> #include <ydb/core/sys_view/common/schema.h> -#include <ydb/core/sys_view/partition_stats/partition_stats.h> #include <ydb/core/sys_view/nodes/nodes.h> -#include <ydb/core/sys_view/query_stats/query_stats.h> -#include <ydb/core/sys_view/query_stats/query_metrics.h> +#include <ydb/core/sys_view/partition_stats/partition_stats.h> +#include <ydb/core/sys_view/partition_stats/top_partitions.h> #include <ydb/core/sys_view/pg_tables/pg_tables.h> +#include <ydb/core/sys_view/query_stats/query_metrics.h> +#include <ydb/core/sys_view/query_stats/query_stats.h> +#include <ydb/core/sys_view/resource_pool_classifiers/resource_pool_classifiers.h> #include <ydb/core/sys_view/sessions/sessions.h> -#include <ydb/core/sys_view/storage/pdisks.h> -#include <ydb/core/sys_view/storage/vslots.h> #include <ydb/core/sys_view/storage/groups.h> +#include <ydb/core/sys_view/storage/pdisks.h> #include <ydb/core/sys_view/storage/storage_pools.h> #include <ydb/core/sys_view/storage/storage_stats.h> +#include <ydb/core/sys_view/storage/vslots.h> #include <ydb/core/sys_view/tablets/tablets.h> -#include <ydb/core/sys_view/partition_stats/top_partitions.h> #include <ydb/library/actors/core/actor_bootstrapped.h> #include <ydb/library/actors/core/hfunc.h> @@ -41,7 +42,9 @@ public: const TString& tablePath, TVector<TSerializedTableRange> ranges, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns, - TIntrusiveConstPtr<NACLib::TUserToken> userToken) + TIntrusiveConstPtr<NACLib::TUserToken> userToken, + const TString& database, + bool reverse) : TBase(&TSysViewRangesReader::ScanState) , OwnerId(ownerId) , ScanId(scanId) @@ -50,6 +53,8 @@ public: , Ranges(std::move(ranges)) , Columns(columns.begin(), columns.end()) , UserToken(std::move(userToken)) + , Database(database) + , Reverse(reverse) { } @@ -84,7 +89,7 @@ public: if (CurrentRange < Ranges.size()) { auto actor = CreateSystemViewScan( SelfId(), ScanId, TableId, TablePath, Ranges[CurrentRange].ToTableRange(), - Columns, UserToken); + Columns, UserToken, Database, Reverse); ScanActorId = Register(actor.Release()); CurrentRange += 1; } else { @@ -148,6 +153,8 @@ private: TVector<TSerializedTableRange> Ranges; TVector<NMiniKQL::TKqpComputeContextBase::TColumn> Columns; const TIntrusiveConstPtr<NACLib::TUserToken> UserToken; + const TString Database; + const bool Reverse; ui64 CurrentRange = 0; TMaybe<TActorId> ScanActorId; @@ -160,12 +167,14 @@ THolder<NActors::IActor> CreateSystemViewScan( const TString& tablePath, TVector<TSerializedTableRange> ranges, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns, - TIntrusiveConstPtr<NACLib::TUserToken> userToken + TIntrusiveConstPtr<NACLib::TUserToken> userToken, + const TString& database, + bool reverse ) { if (ranges.size() == 1) { - return CreateSystemViewScan(ownerId, scanId, tableId, tablePath, ranges[0].ToTableRange(), columns, std::move(userToken)); + return CreateSystemViewScan(ownerId, scanId, tableId, tablePath, ranges[0].ToTableRange(), columns, std::move(userToken), database, reverse); } else { - return MakeHolder<TSysViewRangesReader>(ownerId, scanId, tableId, tablePath, ranges, columns, std::move(userToken)); + return MakeHolder<TSysViewRangesReader>(ownerId, scanId, tableId, tablePath, ranges, columns, std::move(userToken), database, reverse); } } @@ -176,7 +185,9 @@ THolder<NActors::IActor> CreateSystemViewScan( const TString& tablePath, const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns, - TIntrusiveConstPtr<NACLib::TUserToken> userToken + TIntrusiveConstPtr<NACLib::TUserToken> userToken, + const TString& database, + bool reverse ) { if (tableId.SysViewInfo == PartitionStatsName) { return CreatePartitionStatsScan(ownerId, scanId, tableId, tableRange, columns); @@ -248,6 +259,10 @@ THolder<NActors::IActor> CreateSystemViewScan( return CreatePgClassScan(ownerId, scanId, tableId, tablePath, tableRange, columns); } + if (tableId.SysViewInfo == ResourcePoolClassifiersName) { + return CreateResourcePoolClassifiersScan(ownerId, scanId, tableId, tableRange, columns, std::move(userToken), database, reverse); + } + { using namespace NAuth; if (tableId.SysViewInfo == UsersName) { diff --git a/ydb/core/sys_view/scan.h b/ydb/core/sys_view/scan.h index 4d329e09ec..4f400f51aa 100644 --- a/ydb/core/sys_view/scan.h +++ b/ydb/core/sys_view/scan.h @@ -9,11 +9,11 @@ namespace NSysView { THolder<NActors::IActor> CreateSystemViewScan(const NActors::TActorId& ownerId, ui32 scanId, const TTableId& tableId, const TString& tablePath, TVector<TSerializedTableRange> ranges, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns, - TIntrusiveConstPtr<NACLib::TUserToken> userToken); + TIntrusiveConstPtr<NACLib::TUserToken> userToken, const TString& database, bool reverse); THolder<NActors::IActor> CreateSystemViewScan(const NActors::TActorId& ownerId, ui32 scanId, const TTableId& tableId, const TString& tablePath, const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns, - TIntrusiveConstPtr<NACLib::TUserToken> userToken); + TIntrusiveConstPtr<NACLib::TUserToken> userToken, const TString& database, bool reverse); } // NSysView } // NKikimr diff --git a/ydb/core/sys_view/ut_kqp.cpp b/ydb/core/sys_view/ut_kqp.cpp index c35f501add..214a3e9bad 100644 --- a/ydb/core/sys_view/ut_kqp.cpp +++ b/ydb/core/sys_view/ut_kqp.cpp @@ -1762,7 +1762,7 @@ Y_UNIT_TEST_SUITE(SystemView) { UNIT_ASSERT_VALUES_EQUAL(entry.Type, ESchemeEntryType::Directory); auto children = result.GetChildren(); - UNIT_ASSERT_VALUES_EQUAL(children.size(), 29); + UNIT_ASSERT_VALUES_EQUAL(children.size(), 30); THashSet<TString> names; for (const auto& child : children) { @@ -1780,7 +1780,7 @@ Y_UNIT_TEST_SUITE(SystemView) { UNIT_ASSERT_VALUES_EQUAL(entry.Type, ESchemeEntryType::Directory); auto children = result.GetChildren(); - UNIT_ASSERT_VALUES_EQUAL(children.size(), 23); + UNIT_ASSERT_VALUES_EQUAL(children.size(), 24); THashSet<TString> names; for (const auto& child : children) { diff --git a/ydb/core/sys_view/ya.make b/ydb/core/sys_view/ya.make index 11257ba7a4..5f929a49a1 100644 --- a/ydb/core/sys_view/ya.make +++ b/ydb/core/sys_view/ya.make @@ -11,11 +11,12 @@ PEERDIR( ydb/core/sys_view/auth ydb/core/sys_view/common ydb/core/sys_view/nodes - ydb/core/sys_view/sessions ydb/core/sys_view/partition_stats ydb/core/sys_view/pg_tables ydb/core/sys_view/query_stats + ydb/core/sys_view/resource_pool_classifiers ydb/core/sys_view/service + ydb/core/sys_view/sessions ydb/core/sys_view/storage ydb/core/sys_view/tablets ) @@ -32,6 +33,7 @@ RECURSE( pg_tables processor query_stats + resource_pool_classifiers service storage tablets diff --git a/ydb/core/tablet_flat/flat_cxx_database.h b/ydb/core/tablet_flat/flat_cxx_database.h index e6ea695ba4..750b9adb5a 100644 --- a/ydb/core/tablet_flat/flat_cxx_database.h +++ b/ydb/core/tablet_flat/flat_cxx_database.h @@ -238,6 +238,7 @@ template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Date32> { typedef i32 Ty template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Datetime64> { typedef i64 Type; }; template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Timestamp64> { typedef i64 Type; }; template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Interval64> { typedef i64 Type; }; +template <> struct NSchemeTypeMapper<NScheme::NTypeIds::JsonDocument> { typedef TString Type; }; /// only for compatibility with old code template <NScheme::TTypeId ValType> |