diff options
author | gvit <gvit@ydb.tech> | 2023-10-18 19:49:39 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-10-18 21:14:02 +0300 |
commit | 0d1d00f64e1b2b8ed7cb57ae24fda0d8a979193e (patch) | |
tree | b1ea2aa5f38738abd7d16fd1411bd5d6f3202cc2 | |
parent | deb6568e35fddbfa50456d58799c91e4ce34dbfb (diff) | |
download | ydb-0d1d00f64e1b2b8ed7cb57ae24fda0d8a979193e.tar.gz |
refactor rpc alter table KIKIMR-18963
-rw-r--r-- | ydb/core/grpc_services/rpc_alter_table.cpp | 409 | ||||
-rw-r--r-- | ydb/core/grpc_services/table_settings.cpp | 17 | ||||
-rw-r--r-- | ydb/core/grpc_services/table_settings.h | 3 | ||||
-rw-r--r-- | ydb/core/ydb_convert/table_description.cpp | 317 | ||||
-rw-r--r-- | ydb/core/ydb_convert/table_description.h | 32 |
5 files changed, 373 insertions, 405 deletions
diff --git a/ydb/core/grpc_services/rpc_alter_table.cpp b/ydb/core/grpc_services/rpc_alter_table.cpp index 374864e0aa6..160d7d959ed 100644 --- a/ydb/core/grpc_services/rpc_alter_table.cpp +++ b/ydb/core/grpc_services/rpc_alter_table.cpp @@ -52,27 +52,12 @@ static bool CheckAccess(const NACLib::TUserToken& userToken, const NSchemeCache: return true; } -static std::pair<StatusIds::StatusCode, TString> CheckAddIndexDesc(const Ydb::Table::TableIndex& desc) { - if (!desc.name()) { - return {StatusIds::BAD_REQUEST, "Index must have a name"}; - } - - if (!desc.index_columns_size()) { - return {StatusIds::BAD_REQUEST, "At least one column must be specified"}; - } - - if (!desc.data_columns().empty() && !AppData()->FeatureFlags.GetEnableDataColumnForIndexTable()) { - return {StatusIds::UNSUPPORTED, "Data column feature is not supported yet"}; - } - - return {StatusIds::SUCCESS, ""}; -} - using TEvAlterTableRequest = TGrpcRequestOperationCall<Ydb::Table::AlterTableRequest, Ydb::Table::AlterTableResponse>; class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTableRequest> { using TBase = TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTableRequest>; + using EOp = NKikimr::EAlterOperationKind; void PassAway() override { if (SSPipeClient) { @@ -82,65 +67,6 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab IActor::PassAway(); } - enum class EOp { - // columns, column families, storage, ttl - Common, - // add indices - AddIndex, - // drop indices - DropIndex, - // add/alter/drop attributes - Attribute, - // add changefeeds - AddChangefeed, - // drop changefeeds - DropChangefeed, - // rename index - RenameIndex, - }; - - THashSet<EOp> GetOps() const { - const auto& req = GetProtoRequest(); - THashSet<EOp> ops; - - if (req->add_columns_size() || req->drop_columns_size() || req->alter_columns_size() - || req->ttl_action_case() != Ydb::Table::AlterTableRequest::TTL_ACTION_NOT_SET - || req->tiering_action_case() != Ydb::Table::AlterTableRequest::TIERING_ACTION_NOT_SET - || req->has_alter_storage_settings() - || req->add_column_families_size() || req->alter_column_families_size() - || req->set_compaction_policy() || req->has_alter_partitioning_settings() - || req->set_key_bloom_filter() != Ydb::FeatureFlag::STATUS_UNSPECIFIED - || req->has_set_read_replicas_settings()) { - ops.emplace(EOp::Common); - } - - if (req->add_indexes_size()) { - ops.emplace(EOp::AddIndex); - } - - if (req->drop_indexes_size()) { - ops.emplace(EOp::DropIndex); - } - - if (req->add_changefeeds_size()) { - ops.emplace(EOp::AddChangefeed); - } - - if (req->drop_changefeeds_size()) { - ops.emplace(EOp::DropChangefeed); - } - - if (req->alter_attributes_size()) { - ops.emplace(EOp::Attribute); - } - - if (req->rename_indexes_size()) { - ops.emplace(EOp::RenameIndex); - } - - return ops; - } - public: TAlterTableRPC(IRequestOpCtx* msg, ui64 flags = NKqpProto::TKqpSchemeOperation::FLAG_UNSPECIFIED) : TBase(msg) @@ -156,10 +82,10 @@ public: } if (!Request_->GetSerializedToken().empty()) { - UserToken = MakeHolder<NACLib::TUserToken>(Request_->GetSerializedToken()); + UserToken = Request_->GetInternalToken(); } - auto ops = GetOps(); + auto ops = GetAlterOperationKinds(req); if (!ops) { return Reply(StatusIds::BAD_REQUEST, "Empty alter", NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx); @@ -170,6 +96,10 @@ public: } OpType = *ops.begin(); + + Ydb::StatusIds::StatusCode code; + TString error; + switch (OpType) { case EOp::Common: // Altering table settings will need table profiles @@ -179,62 +109,23 @@ public: return; case EOp::AddIndex: - if (req->add_indexes_size() == 1) { - const auto& index = req->add_indexes(0); - auto [status, issues] = CheckAddIndexDesc(index); - if (status == StatusIds::SUCCESS) { - PrepareAlterTableAddIndex(); - } else { - return Reply(status, issues, NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx); - } - } else { - return Reply(StatusIds::UNSUPPORTED, "Only one index can be added by one operation", - NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx); + if (!BuildAlterTableAddIndexRequest(req, &IndexBuildSettings, Flags, code, error)) { + Reply(code, error, NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx); + return; } - break; - case EOp::DropIndex: - if (req->drop_indexes_size() == 1) { - DropIndex(ctx); - } else { - return Reply(StatusIds::UNSUPPORTED, "Only one index can be removed by one operation", - NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx); - } - break; - - case EOp::AddChangefeed: - if (!AppData()->FeatureFlags.GetEnableChangefeeds()) { - return Reply(StatusIds::UNSUPPORTED, "Changefeeds are not supported yet", - NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx); - } - if (req->add_changefeeds_size() == 1) { - AddChangefeed(ctx); - } else { - return Reply(StatusIds::UNSUPPORTED, "Only one changefeed can be added by one operation", - NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx); - } - break; - - case EOp::DropChangefeed: - if (req->drop_changefeeds_size() == 1) { - DropChangefeed(ctx); - } else { - return Reply(StatusIds::UNSUPPORTED, "Only one changefeed can be removed by one operation", - NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx); - } + PrepareAlterTableAddIndex(); break; case EOp::Attribute: PrepareAlterUserAttrubutes(); break; + case EOp::AddChangefeed: + case EOp::DropIndex: + case EOp::DropChangefeed: case EOp::RenameIndex: - if (req->rename_indexes_size() == 1) { - RenameIndex(ctx); - } else { - return Reply(StatusIds::UNSUPPORTED, "Only one index can be renamed by one operation", - NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx); - } + AlterTable(ctx); break; } @@ -374,7 +265,8 @@ private: return AlterTableAddIndexOp(resp, ctx); case EOp::Attribute: Y_ABORT_UNLESS(!resp->ResultSet.empty()); - return AlterUserAttributes(resp->ResultSet.back().TableId.PathId, ctx); + ResolvedPathId = resp->ResultSet.back().TableId.PathId; + return AlterTable(ctx); default: TXLOG_E("Got unexpected cache response"); return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx); @@ -399,20 +291,7 @@ private: } void SendAddIndexOpToSS(const TActorContext& ctx) { - const auto& req = *GetProtoRequest(); - - NKikimrIndexBuilder::TIndexBuildSettings settings; - if (Flags & NKqpProto::TKqpSchemeOperation::FLAG_PG_MODE) { - settings.set_pg_mode(true); - } - if (Flags & NKqpProto::TKqpSchemeOperation::FLAG_IF_NOT_EXISTS) { - settings.set_if_not_exist(true); - } - settings.set_source_path(req.path()); - auto tableIndex = settings.mutable_index(); - tableIndex->CopyFrom(req.add_indexes(0)); - auto ev = new NSchemeShard::TEvIndexBuilder::TEvCreateRequest(TxId, DatabaseName, std::move(settings)); - + auto ev = new NSchemeShard::TEvIndexBuilder::TEvCreateRequest(TxId, DatabaseName, std::move(IndexBuildSettings)); NTabletPipe::SendData(ctx, SSPipeClient, ev); } @@ -450,192 +329,13 @@ private: } } - void DropIndex(const TActorContext &ctx) { - const auto req = GetProtoRequest(); - std::pair<TString, TString> pathPair; - try { - pathPair = SplitPath(req->path()); - } catch (const std::exception&) { - return ReplyWithStatus(StatusIds::BAD_REQUEST, ctx); - } - - const auto& workingDir = pathPair.first; - const auto& name = pathPair.second; - - std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction(); - NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record; - NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme(); - modifyScheme->SetWorkingDir(workingDir); - modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropIndex); - - for (const auto& drop : req->drop_indexes()) { - auto desc = modifyScheme->MutableDropIndex(); - desc->SetIndexName(drop); - desc->SetTableName(name); - } - - ctx.Send(MakeTxProxyID(), proposeRequest.release()); - } - - void AddChangefeed(const TActorContext &ctx) { - const auto req = GetProtoRequest(); - std::pair<TString, TString> pathPair; - try { - pathPair = SplitPath(req->path()); - } catch (const std::exception&) { - return ReplyWithStatus(StatusIds::BAD_REQUEST, ctx); - } - - const auto& workingDir = pathPair.first; - const auto& name = pathPair.second; - - std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction(); - NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record; - NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme(); - modifyScheme->SetWorkingDir(workingDir); - modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStream); - - for (const auto& add : req->add_changefeeds()) { - auto op = modifyScheme->MutableCreateCdcStream(); - op->SetTableName(name); - - if (add.has_retention_period()) { - op->SetRetentionPeriodSeconds(add.retention_period().seconds()); - } - - if (add.has_topic_partitioning_settings()) { - i64 minActivePartitions = add.topic_partitioning_settings().min_active_partitions(); - if (minActivePartitions < 0) { - NYql::TIssues issues; - issues.AddIssue(NYql::TIssue("Topic partitions count must be positive")); - return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx); - } else if (minActivePartitions == 0) { - minActivePartitions = 1; - } - op->SetTopicPartitions(minActivePartitions); - } - - StatusIds::StatusCode code; - TString error; - if (!FillChangefeedDescription(*op->MutableStreamDescription(), add, code, error)) { - NYql::TIssues issues; - issues.AddIssue(NYql::TIssue(error)); - return Reply(code, issues, ctx); - } - } - - ctx.Send(MakeTxProxyID(), proposeRequest.release()); - } - - void DropChangefeed(const TActorContext &ctx) { - const auto req = GetProtoRequest(); - std::pair<TString, TString> pathPair; - try { - pathPair = SplitPath(req->path()); - } catch (const std::exception&) { - return ReplyWithStatus(StatusIds::BAD_REQUEST, ctx); - } - - const auto& workingDir = pathPair.first; - const auto& name = pathPair.second; - - std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction(); - NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record; - NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme(); - modifyScheme->SetWorkingDir(workingDir); - modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStream); - - for (const auto& drop : req->drop_changefeeds()) { - auto op = modifyScheme->MutableDropCdcStream(); - op->SetStreamName(drop); - op->SetTableName(name); - } - - ctx.Send(MakeTxProxyID(), proposeRequest.release()); - } - void AlterTable(const TActorContext &ctx) { const auto req = GetProtoRequest(); - std::pair<TString, TString> pathPair; - try { - pathPair = SplitPath(req->path()); - } catch (const std::exception&) { - return ReplyWithStatus(StatusIds::BAD_REQUEST, ctx); - } - - const auto& workingDir = pathPair.first; - const auto& name = pathPair.second; - std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction(); - NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record; - NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme(); - modifyScheme->SetWorkingDir(workingDir); - modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterTable); - - auto desc = modifyScheme->MutableAlterTable(); - desc->SetName(name); - - for (const auto& drop : req->drop_columns()) { - desc->AddDropColumns()->SetName(drop); - } - - StatusIds::StatusCode code = StatusIds::SUCCESS; + auto modifyScheme = proposeRequest->Record.MutableTransaction()->MutableModifyScheme(); + Ydb::StatusIds::StatusCode code; TString error; - - if (!FillColumnDescription(*desc, req->add_columns(), code, error)) { - NYql::TIssues issues; - issues.AddIssue(NYql::TIssue(error)); - return Reply(code, issues, ctx); - } - - for (const auto& alter : req->alter_columns()) { - auto column = desc->AddColumns(); - column->SetName(alter.name()); - if (!alter.family().empty()) { - column->SetFamilyName(alter.family()); - } - } - - bool hadPartitionConfig = desc->HasPartitionConfig(); - TColumnFamilyManager families(desc->MutablePartitionConfig()); - - // Apply storage settings to the default column family - if (req->has_alter_storage_settings()) { - Ydb::StatusIds::StatusCode code; - TString error; - if (!families.ApplyStorageSettings(req->alter_storage_settings(), &code, &error)) { - NYql::TIssues issues; - issues.AddIssue(NYql::TIssue(error)); - return Reply(code, issues, ctx); - } - } - - for (const auto& familySettings : req->add_column_families()) { - Ydb::StatusIds::StatusCode code; - TString error; - if (!families.ApplyFamilySettings(familySettings, &code, &error)) { - NYql::TIssues issues; - issues.AddIssue(NYql::TIssue(error)); - return Reply(code, issues, ctx); - } - } - - for (const auto& familySettings : req->alter_column_families()) { - Ydb::StatusIds::StatusCode code; - TString error; - if (!families.ApplyFamilySettings(familySettings, &code, &error)) { - NYql::TIssues issues; - issues.AddIssue(NYql::TIssue(error)); - return Reply(code, issues, ctx); - } - } - - // Avoid altering partition config unless we changed something - if (!families.Modified && !hadPartitionConfig) { - desc->ClearPartitionConfig(); - } - - if (!FillAlterTableSettingsDesc(*desc, *req, Profiles, code, error, AppData())) { + if (!BuildAlterTableModifyScheme(req, modifyScheme, Profiles, ResolvedPathId, code, error)) { NYql::TIssues issues; issues.AddIssue(NYql::TIssue(error)); return Reply(code, issues, ctx); @@ -644,69 +344,6 @@ private: ctx.Send(MakeTxProxyID(), proposeRequest.release()); } - void AlterUserAttributes(const TPathId& pathId, const TActorContext &ctx) { - const auto req = GetProtoRequest(); - - std::pair<TString, TString> pathPair; - try { - pathPair = SplitPath(req->path()); - } catch (const std::exception&) { - return ReplyWithStatus(StatusIds::BAD_REQUEST, ctx); - } - - const auto& workingDir = pathPair.first; - const auto& name = pathPair.second; - - std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction(); - auto& record = proposeRequest->Record; - auto& modifyScheme = *record.MutableTransaction()->MutableModifyScheme(); - - modifyScheme.SetWorkingDir(workingDir); - modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterUserAttributes); - modifyScheme.AddApplyIf()->SetPathId(pathId.LocalPathId); - - auto& alter = *modifyScheme.MutableAlterUserAttributes(); - alter.SetPathName(name); - - for (auto [key, value] : req->alter_attributes()) { - auto& attr = *alter.AddUserAttributes(); - attr.SetKey(key); - if (value) { - attr.SetValue(value); - } - } - - ctx.Send(MakeTxProxyID(), proposeRequest.release()); - } - - void RenameIndex(const TActorContext &ctx) { - const auto req = GetProtoRequest(); - - std::pair<TString, TString> pathPair; - try { - pathPair = SplitPath(req->path()); - } catch (const std::exception&) { - return ReplyWithStatus(StatusIds::BAD_REQUEST, ctx); - } - - const auto& workingDir = pathPair.first; - - std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction(); - auto& record = proposeRequest->Record; - auto& modifyScheme = *record.MutableTransaction()->MutableModifyScheme(); - - modifyScheme.SetWorkingDir(workingDir); - modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex); - - auto& alter = *modifyScheme.MutableMoveIndex(); - alter.SetTablePath(req->path()); - alter.SetSrcPath(req->rename_indexes(0).source_name()); - alter.SetDstPath(req->rename_indexes(0).destination_name()); - alter.SetAllowOverwrite(req->rename_indexes(0).replace_destination()); - - ctx.Send(MakeTxProxyID(), proposeRequest.release()); - } - void ReplyWithStatus(StatusIds::StatusCode status, const TActorContext &ctx) { Request_->ReplyWithYdbStatus(status); @@ -719,9 +356,11 @@ private: TIntrusivePtr<NTxProxy::TTxProxyMon> TxProxyMon; TString LogPrefix; TActorId SSPipeClient; - THolder<const NACLib::TUserToken> UserToken; + TIntrusiveConstPtr<NACLib::TUserToken> UserToken; + TPathId ResolvedPathId; TTableProfiles Profiles; EOp OpType; + NKikimrIndexBuilder::TIndexBuildSettings IndexBuildSettings; const ui64 Flags; }; diff --git a/ydb/core/grpc_services/table_settings.cpp b/ydb/core/grpc_services/table_settings.cpp index c84ac1fa495..4f3cccbe58a 100644 --- a/ydb/core/grpc_services/table_settings.cpp +++ b/ydb/core/grpc_services/table_settings.cpp @@ -28,23 +28,6 @@ bool FillCreateTableSettingsDesc(NKikimrSchemeOp::TTableDescription& out, return NKikimr::FillCreateTableSettingsDesc(out, in, code, error, warnings, tableProfileSet); } -bool FillAlterTableSettingsDesc(NKikimrSchemeOp::TTableDescription& out, - const Ydb::Table::AlterTableRequest& in, const TTableProfiles& profiles, - Ydb::StatusIds::StatusCode& code, TString& error, const TAppData* appData) { - - bool changed = false; - auto &partitionConfig = *out.MutablePartitionConfig(); - - if (in.set_compaction_policy()) { - if (!profiles.ApplyCompactionPolicy(in.set_compaction_policy(), partitionConfig, code, error, appData)) { - return false; - } - - changed = true; - } - - return NKikimr::FillAlterTableSettingsDesc(out, in, code, error, changed); -} } // namespace NGRpcService } // namespace NKikimr diff --git a/ydb/core/grpc_services/table_settings.h b/ydb/core/grpc_services/table_settings.h index 55654ea1818..88cdfb5769f 100644 --- a/ydb/core/grpc_services/table_settings.h +++ b/ydb/core/grpc_services/table_settings.h @@ -10,9 +10,6 @@ bool FillCreateTableSettingsDesc(NKikimrSchemeOp::TTableDescription& out, const Ydb::Table::CreateTableRequest& in, const TTableProfiles& profiles, Ydb::StatusIds::StatusCode& code, TString& error, TList<TString>& warnings); -bool FillAlterTableSettingsDesc(NKikimrSchemeOp::TTableDescription& out, - const Ydb::Table::AlterTableRequest& in, const TTableProfiles& profiles, - Ydb::StatusIds::StatusCode& code, TString& error, const TAppData* appData); } // namespace NGRpcService } // namespace NKikimr diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index 08d711ac2a9..bd020c87039 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -3,6 +3,7 @@ #include "table_settings.h" #include "ydb_convert.h" +#include <ydb/core/base/path.h> #include <ydb/core/base/appdata.h> #include <ydb/core/engine/mkql_proto.h> #include <ydb/library/ydb_issue/proto/issue_id.pb.h> @@ -33,6 +34,322 @@ static void FillStoragePool(TStoragePoolHolder* out, TAddStoragePoolFunc<TStorag std::invoke(func, out)->set_media(in.GetPreferredPoolKind()); } +THashSet<EAlterOperationKind> GetAlterOperationKinds(const Ydb::Table::AlterTableRequest* req) { + THashSet<EAlterOperationKind> ops; + + if (req->add_columns_size() || req->drop_columns_size() || + req->alter_columns_size() || + req->ttl_action_case() != + Ydb::Table::AlterTableRequest::TTL_ACTION_NOT_SET || + req->tiering_action_case() != + Ydb::Table::AlterTableRequest::TIERING_ACTION_NOT_SET || + req->has_alter_storage_settings() || req->add_column_families_size() || + req->alter_column_families_size() || req->set_compaction_policy() || + req->has_alter_partitioning_settings() || + req->set_key_bloom_filter() != Ydb::FeatureFlag::STATUS_UNSPECIFIED || + req->has_set_read_replicas_settings()) + { + ops.emplace(EAlterOperationKind::Common); + } + + if (req->add_indexes_size()) { + ops.emplace(EAlterOperationKind::AddIndex); + } + + if (req->drop_indexes_size()) { + ops.emplace(EAlterOperationKind::DropIndex); + } + + if (req->add_changefeeds_size()) { + ops.emplace(EAlterOperationKind::AddChangefeed); + } + + if (req->drop_changefeeds_size()) { + ops.emplace(EAlterOperationKind::DropChangefeed); + } + + if (req->alter_attributes_size()) { + ops.emplace(EAlterOperationKind::Attribute); + } + + if (req->rename_indexes_size()) { + ops.emplace(EAlterOperationKind::RenameIndex); + } + + return ops; +} + +namespace { + +std::pair<TString, TString> SplitPathIntoWorkingDirAndName(const TString& path) { + auto splitPos = path.find_last_of('/'); + if (splitPos == path.npos || splitPos + 1 == path.size()) { + ythrow yexception() << "wrong path format '" << path << "'" ; + } + return {path.substr(0, splitPos), path.substr(splitPos + 1)}; +} + +} + + +bool FillAlterTableSettingsDesc(NKikimrSchemeOp::TTableDescription& out, + const Ydb::Table::AlterTableRequest& in, const TTableProfiles& profiles, + Ydb::StatusIds::StatusCode& code, TString& error, const TAppData* appData) { + + bool changed = false; + auto &partitionConfig = *out.MutablePartitionConfig(); + + if (in.set_compaction_policy()) { + if (!profiles.ApplyCompactionPolicy(in.set_compaction_policy(), partitionConfig, code, error, appData)) { + return false; + } + + changed = true; + } + + return NKikimr::FillAlterTableSettingsDesc(out, in, code, error, changed); +} + +bool BuildAlterTableAddIndexRequest(const Ydb::Table::AlterTableRequest* req, NKikimrIndexBuilder::TIndexBuildSettings* settings, + ui64 flags, + Ydb::StatusIds::StatusCode& code, TString& error) +{ + const auto ops = GetAlterOperationKinds(req); + if (ops.size() != 1 || *ops.begin() != EAlterOperationKind::AddIndex) { + code = Ydb::StatusIds::INTERNAL_ERROR; + error = "Unexpected build alter table add index call."; + return false; + } + + if (req->add_indexes_size() != 1) { + code = Ydb::StatusIds::UNSUPPORTED; + error = "Only one index can be added by one operation"; + return false; + } + + const auto desc = req->add_indexes(0); + + if (!desc.name()) { + code = Ydb::StatusIds::BAD_REQUEST; + error = "Index must have a name"; + return false; + } + + if (!desc.index_columns_size()) { + code = Ydb::StatusIds::BAD_REQUEST; + error = "At least one column must be specified"; + return false; + } + + if (!desc.data_columns().empty() && !AppData()->FeatureFlags.GetEnableDataColumnForIndexTable()) { + code = Ydb::StatusIds::UNSUPPORTED; + error = "Data column feature is not supported yet"; + return false; + } + + if (flags & NKqpProto::TKqpSchemeOperation::FLAG_PG_MODE) { + settings->set_pg_mode(true); + } + + if (flags & NKqpProto::TKqpSchemeOperation::FLAG_IF_NOT_EXISTS) { + settings->set_if_not_exist(true); + } + + settings->set_source_path(req->path()); + auto tableIndex = settings->mutable_index(); + tableIndex->CopyFrom(req->add_indexes(0)); + + return true; +} + +bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme, const TTableProfiles& profiles, + const TPathId& resolvedPathId, + Ydb::StatusIds::StatusCode& code, TString& error) +{ + std::pair<TString, TString> pathPair; + const auto ops = GetAlterOperationKinds(req); + if (ops.empty()) { + code = Ydb::StatusIds::BAD_REQUEST; + error = "Empty alter"; + return false; + } + + if (ops.size() > 1) { + code = Ydb::StatusIds::UNSUPPORTED; + error = "Mixed alter is unsupported"; + return false; + } + + const auto OpType = *ops.begin(); + + try { + pathPair = SplitPathIntoWorkingDirAndName(req->path()); + } catch (const std::exception&) { + code = Ydb::StatusIds::BAD_REQUEST; + return false; + } + + if (!AppData()->FeatureFlags.GetEnableChangefeeds() && OpType == EAlterOperationKind::AddChangefeed) { + code = Ydb::StatusIds::UNSUPPORTED; + error = "Changefeeds are not supported yet"; + return false; + } + + if (req->rename_indexes_size() != 1 && OpType == EAlterOperationKind::RenameIndex) { + code = Ydb::StatusIds::UNSUPPORTED; + error = "Only one index can be renamed by one operation"; + return false; + } + + if (req->drop_changefeeds_size() != 1 && OpType == EAlterOperationKind::DropChangefeed) { + code = Ydb::StatusIds::UNSUPPORTED; + error = "Only one changefeed can be removed by one operation"; + return false; + } + + if (req->add_changefeeds_size() != 1 && OpType == EAlterOperationKind::AddChangefeed) { + code = Ydb::StatusIds::UNSUPPORTED; + error = "Only one changefeed can be added by one operation"; + return false; + } + + if (req->drop_indexes_size() != 1 && OpType == EAlterOperationKind::DropIndex) { + code = Ydb::StatusIds::UNSUPPORTED; + error = "Only one index can be removed by one operation"; + return false; + } + + const auto& workingDir = pathPair.first; + const auto& name = pathPair.second; + modifyScheme->SetWorkingDir(workingDir); + + for(const auto& rename: req->rename_indexes()) { + modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex); + auto& alter = *modifyScheme->MutableMoveIndex(); + alter.SetTablePath(req->path()); + alter.SetSrcPath(rename.source_name()); + alter.SetDstPath(rename.destination_name()); + alter.SetAllowOverwrite(rename.replace_destination()); + } + + for (const auto& drop : req->drop_changefeeds()) { + modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStream); + auto op = modifyScheme->MutableDropCdcStream(); + op->SetStreamName(drop); + op->SetTableName(name); + } + + for (const auto &add : req->add_changefeeds()) { + modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStream); + auto op = modifyScheme->MutableCreateCdcStream(); + op->SetTableName(name); + + if (add.has_retention_period()) { + op->SetRetentionPeriodSeconds(add.retention_period().seconds()); + } + + if (add.has_topic_partitioning_settings()) { + i64 minActivePartitions = + add.topic_partitioning_settings().min_active_partitions(); + if (minActivePartitions < 0) { + code = Ydb::StatusIds::BAD_REQUEST; + error = "Topic partitions count must be positive"; + return false; + } else if (minActivePartitions == 0) { + minActivePartitions = 1; + } + op->SetTopicPartitions(minActivePartitions); + } + + if (!FillChangefeedDescription(*op->MutableStreamDescription(), add, code, error)) { + return false; + } + } + + for (const auto& drop : req->drop_indexes()) { + modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropIndex); + auto desc = modifyScheme->MutableDropIndex(); + desc->SetIndexName(drop); + desc->SetTableName(name); + } + + if (OpType == EAlterOperationKind::Common) { + modifyScheme->SetOperationType( + NKikimrSchemeOp::EOperationType::ESchemeOpAlterTable); + + auto desc = modifyScheme->MutableAlterTable(); + desc->SetName(name); + + for (const auto &drop : req->drop_columns()) { + desc->AddDropColumns()->SetName(drop); + } + + if (!FillColumnDescription(*desc, req->add_columns(), code, error)) { + return false; + } + + for (const auto &alter : req->alter_columns()) { + auto column = desc->AddColumns(); + column->SetName(alter.name()); + if (!alter.family().empty()) { + column->SetFamilyName(alter.family()); + } + } + + bool hadPartitionConfig = desc->HasPartitionConfig(); + TColumnFamilyManager families(desc->MutablePartitionConfig()); + + // Apply storage settings to the default column family + if (req->has_alter_storage_settings()) { + if (!families.ApplyStorageSettings(req->alter_storage_settings(), &code, + &error)) { + return false; + } + } + + for (const auto &familySettings : req->add_column_families()) { + if (!families.ApplyFamilySettings(familySettings, &code, &error)) { + return false; + } + } + + for (const auto &familySettings : req->alter_column_families()) { + if (!families.ApplyFamilySettings(familySettings, &code, &error)) { + return false; + } + } + + // Avoid altering partition config unless we changed something + if (!families.Modified && !hadPartitionConfig) { + desc->ClearPartitionConfig(); + } + + if (!FillAlterTableSettingsDesc(*desc, *req, profiles, code, error, + AppData())) { + return false; + } + } + + if (OpType == EAlterOperationKind::Attribute) { + modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterUserAttributes); + modifyScheme->AddApplyIf()->SetPathId(resolvedPathId.LocalPathId); + + auto& alter = *modifyScheme->MutableAlterUserAttributes(); + alter.SetPathName(name); + + for (auto [key, value] : req->alter_attributes()) { + auto& attr = *alter.AddUserAttributes(); + attr.SetKey(key); + if (value) { + attr.SetValue(value); + } + } + } + + return true; +} + + template <typename TColumn> static Ydb::Type* AddColumn(Ydb::Table::ColumnMeta* newColumn, const TColumn& column) { newColumn->set_name(column.GetName()); diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h index d0e140a5f55..346e877cbfc 100644 --- a/ydb/core/ydb_convert/table_description.h +++ b/ydb/core/ydb_convert/table_description.h @@ -9,6 +9,38 @@ namespace NKikimr { +enum class EAlterOperationKind { + // columns, column families, storage, ttl + Common, + // add indices + AddIndex, + // drop indices + DropIndex, + // add/alter/drop attributes + Attribute, + // add changefeeds + AddChangefeed, + // drop changefeeds + DropChangefeed, + // rename index + RenameIndex, +}; + + + +THashSet<EAlterOperationKind> GetAlterOperationKinds(const Ydb::Table::AlterTableRequest* req); +bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme, + const TTableProfiles& profiles, const TPathId& resolvedPathId, + Ydb::StatusIds::StatusCode& status, TString& error); + +bool FillAlterTableSettingsDesc(NKikimrSchemeOp::TTableDescription& out, + const Ydb::Table::AlterTableRequest& in, const TTableProfiles& profiles, + Ydb::StatusIds::StatusCode& code, TString& error, const TAppData* appData); + +bool BuildAlterTableAddIndexRequest(const Ydb::Table::AlterTableRequest* req, NKikimrIndexBuilder::TIndexBuildSettings* settings, + ui64 flags, + Ydb::StatusIds::StatusCode& status, TString& error); + // out void FillColumnDescription(Ydb::Table::DescribeTableResult& out, NKikimrMiniKQL::TType& splitKeyType, const NKikimrSchemeOp::TTableDescription& in); |