aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-10-18 19:49:39 +0300
committergvit <gvit@ydb.tech>2023-10-18 21:14:02 +0300
commit0d1d00f64e1b2b8ed7cb57ae24fda0d8a979193e (patch)
treeb1ea2aa5f38738abd7d16fd1411bd5d6f3202cc2
parentdeb6568e35fddbfa50456d58799c91e4ce34dbfb (diff)
downloadydb-0d1d00f64e1b2b8ed7cb57ae24fda0d8a979193e.tar.gz
refactor rpc alter table KIKIMR-18963
-rw-r--r--ydb/core/grpc_services/rpc_alter_table.cpp409
-rw-r--r--ydb/core/grpc_services/table_settings.cpp17
-rw-r--r--ydb/core/grpc_services/table_settings.h3
-rw-r--r--ydb/core/ydb_convert/table_description.cpp317
-rw-r--r--ydb/core/ydb_convert/table_description.h32
5 files changed, 373 insertions, 405 deletions
diff --git a/ydb/core/grpc_services/rpc_alter_table.cpp b/ydb/core/grpc_services/rpc_alter_table.cpp
index 374864e0aa6..160d7d959ed 100644
--- a/ydb/core/grpc_services/rpc_alter_table.cpp
+++ b/ydb/core/grpc_services/rpc_alter_table.cpp
@@ -52,27 +52,12 @@ static bool CheckAccess(const NACLib::TUserToken& userToken, const NSchemeCache:
return true;
}
-static std::pair<StatusIds::StatusCode, TString> CheckAddIndexDesc(const Ydb::Table::TableIndex& desc) {
- if (!desc.name()) {
- return {StatusIds::BAD_REQUEST, "Index must have a name"};
- }
-
- if (!desc.index_columns_size()) {
- return {StatusIds::BAD_REQUEST, "At least one column must be specified"};
- }
-
- if (!desc.data_columns().empty() && !AppData()->FeatureFlags.GetEnableDataColumnForIndexTable()) {
- return {StatusIds::UNSUPPORTED, "Data column feature is not supported yet"};
- }
-
- return {StatusIds::SUCCESS, ""};
-}
-
using TEvAlterTableRequest = TGrpcRequestOperationCall<Ydb::Table::AlterTableRequest,
Ydb::Table::AlterTableResponse>;
class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTableRequest> {
using TBase = TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTableRequest>;
+ using EOp = NKikimr::EAlterOperationKind;
void PassAway() override {
if (SSPipeClient) {
@@ -82,65 +67,6 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
IActor::PassAway();
}
- enum class EOp {
- // columns, column families, storage, ttl
- Common,
- // add indices
- AddIndex,
- // drop indices
- DropIndex,
- // add/alter/drop attributes
- Attribute,
- // add changefeeds
- AddChangefeed,
- // drop changefeeds
- DropChangefeed,
- // rename index
- RenameIndex,
- };
-
- THashSet<EOp> GetOps() const {
- const auto& req = GetProtoRequest();
- THashSet<EOp> ops;
-
- if (req->add_columns_size() || req->drop_columns_size() || req->alter_columns_size()
- || req->ttl_action_case() != Ydb::Table::AlterTableRequest::TTL_ACTION_NOT_SET
- || req->tiering_action_case() != Ydb::Table::AlterTableRequest::TIERING_ACTION_NOT_SET
- || req->has_alter_storage_settings()
- || req->add_column_families_size() || req->alter_column_families_size()
- || req->set_compaction_policy() || req->has_alter_partitioning_settings()
- || req->set_key_bloom_filter() != Ydb::FeatureFlag::STATUS_UNSPECIFIED
- || req->has_set_read_replicas_settings()) {
- ops.emplace(EOp::Common);
- }
-
- if (req->add_indexes_size()) {
- ops.emplace(EOp::AddIndex);
- }
-
- if (req->drop_indexes_size()) {
- ops.emplace(EOp::DropIndex);
- }
-
- if (req->add_changefeeds_size()) {
- ops.emplace(EOp::AddChangefeed);
- }
-
- if (req->drop_changefeeds_size()) {
- ops.emplace(EOp::DropChangefeed);
- }
-
- if (req->alter_attributes_size()) {
- ops.emplace(EOp::Attribute);
- }
-
- if (req->rename_indexes_size()) {
- ops.emplace(EOp::RenameIndex);
- }
-
- return ops;
- }
-
public:
TAlterTableRPC(IRequestOpCtx* msg, ui64 flags = NKqpProto::TKqpSchemeOperation::FLAG_UNSPECIFIED)
: TBase(msg)
@@ -156,10 +82,10 @@ public:
}
if (!Request_->GetSerializedToken().empty()) {
- UserToken = MakeHolder<NACLib::TUserToken>(Request_->GetSerializedToken());
+ UserToken = Request_->GetInternalToken();
}
- auto ops = GetOps();
+ auto ops = GetAlterOperationKinds(req);
if (!ops) {
return Reply(StatusIds::BAD_REQUEST, "Empty alter",
NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
@@ -170,6 +96,10 @@ public:
}
OpType = *ops.begin();
+
+ Ydb::StatusIds::StatusCode code;
+ TString error;
+
switch (OpType) {
case EOp::Common:
// Altering table settings will need table profiles
@@ -179,62 +109,23 @@ public:
return;
case EOp::AddIndex:
- if (req->add_indexes_size() == 1) {
- const auto& index = req->add_indexes(0);
- auto [status, issues] = CheckAddIndexDesc(index);
- if (status == StatusIds::SUCCESS) {
- PrepareAlterTableAddIndex();
- } else {
- return Reply(status, issues, NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
- }
- } else {
- return Reply(StatusIds::UNSUPPORTED, "Only one index can be added by one operation",
- NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
+ if (!BuildAlterTableAddIndexRequest(req, &IndexBuildSettings, Flags, code, error)) {
+ Reply(code, error, NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
+ return;
}
- break;
- case EOp::DropIndex:
- if (req->drop_indexes_size() == 1) {
- DropIndex(ctx);
- } else {
- return Reply(StatusIds::UNSUPPORTED, "Only one index can be removed by one operation",
- NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
- }
- break;
-
- case EOp::AddChangefeed:
- if (!AppData()->FeatureFlags.GetEnableChangefeeds()) {
- return Reply(StatusIds::UNSUPPORTED, "Changefeeds are not supported yet",
- NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
- }
- if (req->add_changefeeds_size() == 1) {
- AddChangefeed(ctx);
- } else {
- return Reply(StatusIds::UNSUPPORTED, "Only one changefeed can be added by one operation",
- NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
- }
- break;
-
- case EOp::DropChangefeed:
- if (req->drop_changefeeds_size() == 1) {
- DropChangefeed(ctx);
- } else {
- return Reply(StatusIds::UNSUPPORTED, "Only one changefeed can be removed by one operation",
- NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
- }
+ PrepareAlterTableAddIndex();
break;
case EOp::Attribute:
PrepareAlterUserAttrubutes();
break;
+ case EOp::AddChangefeed:
+ case EOp::DropIndex:
+ case EOp::DropChangefeed:
case EOp::RenameIndex:
- if (req->rename_indexes_size() == 1) {
- RenameIndex(ctx);
- } else {
- return Reply(StatusIds::UNSUPPORTED, "Only one index can be renamed by one operation",
- NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
- }
+ AlterTable(ctx);
break;
}
@@ -374,7 +265,8 @@ private:
return AlterTableAddIndexOp(resp, ctx);
case EOp::Attribute:
Y_ABORT_UNLESS(!resp->ResultSet.empty());
- return AlterUserAttributes(resp->ResultSet.back().TableId.PathId, ctx);
+ ResolvedPathId = resp->ResultSet.back().TableId.PathId;
+ return AlterTable(ctx);
default:
TXLOG_E("Got unexpected cache response");
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
@@ -399,20 +291,7 @@ private:
}
void SendAddIndexOpToSS(const TActorContext& ctx) {
- const auto& req = *GetProtoRequest();
-
- NKikimrIndexBuilder::TIndexBuildSettings settings;
- if (Flags & NKqpProto::TKqpSchemeOperation::FLAG_PG_MODE) {
- settings.set_pg_mode(true);
- }
- if (Flags & NKqpProto::TKqpSchemeOperation::FLAG_IF_NOT_EXISTS) {
- settings.set_if_not_exist(true);
- }
- settings.set_source_path(req.path());
- auto tableIndex = settings.mutable_index();
- tableIndex->CopyFrom(req.add_indexes(0));
- auto ev = new NSchemeShard::TEvIndexBuilder::TEvCreateRequest(TxId, DatabaseName, std::move(settings));
-
+ auto ev = new NSchemeShard::TEvIndexBuilder::TEvCreateRequest(TxId, DatabaseName, std::move(IndexBuildSettings));
NTabletPipe::SendData(ctx, SSPipeClient, ev);
}
@@ -450,192 +329,13 @@ private:
}
}
- void DropIndex(const TActorContext &ctx) {
- const auto req = GetProtoRequest();
- std::pair<TString, TString> pathPair;
- try {
- pathPair = SplitPath(req->path());
- } catch (const std::exception&) {
- return ReplyWithStatus(StatusIds::BAD_REQUEST, ctx);
- }
-
- const auto& workingDir = pathPair.first;
- const auto& name = pathPair.second;
-
- std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction();
- NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record;
- NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme();
- modifyScheme->SetWorkingDir(workingDir);
- modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropIndex);
-
- for (const auto& drop : req->drop_indexes()) {
- auto desc = modifyScheme->MutableDropIndex();
- desc->SetIndexName(drop);
- desc->SetTableName(name);
- }
-
- ctx.Send(MakeTxProxyID(), proposeRequest.release());
- }
-
- void AddChangefeed(const TActorContext &ctx) {
- const auto req = GetProtoRequest();
- std::pair<TString, TString> pathPair;
- try {
- pathPair = SplitPath(req->path());
- } catch (const std::exception&) {
- return ReplyWithStatus(StatusIds::BAD_REQUEST, ctx);
- }
-
- const auto& workingDir = pathPair.first;
- const auto& name = pathPair.second;
-
- std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction();
- NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record;
- NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme();
- modifyScheme->SetWorkingDir(workingDir);
- modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStream);
-
- for (const auto& add : req->add_changefeeds()) {
- auto op = modifyScheme->MutableCreateCdcStream();
- op->SetTableName(name);
-
- if (add.has_retention_period()) {
- op->SetRetentionPeriodSeconds(add.retention_period().seconds());
- }
-
- if (add.has_topic_partitioning_settings()) {
- i64 minActivePartitions = add.topic_partitioning_settings().min_active_partitions();
- if (minActivePartitions < 0) {
- NYql::TIssues issues;
- issues.AddIssue(NYql::TIssue("Topic partitions count must be positive"));
- return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
- } else if (minActivePartitions == 0) {
- minActivePartitions = 1;
- }
- op->SetTopicPartitions(minActivePartitions);
- }
-
- StatusIds::StatusCode code;
- TString error;
- if (!FillChangefeedDescription(*op->MutableStreamDescription(), add, code, error)) {
- NYql::TIssues issues;
- issues.AddIssue(NYql::TIssue(error));
- return Reply(code, issues, ctx);
- }
- }
-
- ctx.Send(MakeTxProxyID(), proposeRequest.release());
- }
-
- void DropChangefeed(const TActorContext &ctx) {
- const auto req = GetProtoRequest();
- std::pair<TString, TString> pathPair;
- try {
- pathPair = SplitPath(req->path());
- } catch (const std::exception&) {
- return ReplyWithStatus(StatusIds::BAD_REQUEST, ctx);
- }
-
- const auto& workingDir = pathPair.first;
- const auto& name = pathPair.second;
-
- std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction();
- NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record;
- NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme();
- modifyScheme->SetWorkingDir(workingDir);
- modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStream);
-
- for (const auto& drop : req->drop_changefeeds()) {
- auto op = modifyScheme->MutableDropCdcStream();
- op->SetStreamName(drop);
- op->SetTableName(name);
- }
-
- ctx.Send(MakeTxProxyID(), proposeRequest.release());
- }
-
void AlterTable(const TActorContext &ctx) {
const auto req = GetProtoRequest();
- std::pair<TString, TString> pathPair;
- try {
- pathPair = SplitPath(req->path());
- } catch (const std::exception&) {
- return ReplyWithStatus(StatusIds::BAD_REQUEST, ctx);
- }
-
- const auto& workingDir = pathPair.first;
- const auto& name = pathPair.second;
-
std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction();
- NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record;
- NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme();
- modifyScheme->SetWorkingDir(workingDir);
- modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterTable);
-
- auto desc = modifyScheme->MutableAlterTable();
- desc->SetName(name);
-
- for (const auto& drop : req->drop_columns()) {
- desc->AddDropColumns()->SetName(drop);
- }
-
- StatusIds::StatusCode code = StatusIds::SUCCESS;
+ auto modifyScheme = proposeRequest->Record.MutableTransaction()->MutableModifyScheme();
+ Ydb::StatusIds::StatusCode code;
TString error;
-
- if (!FillColumnDescription(*desc, req->add_columns(), code, error)) {
- NYql::TIssues issues;
- issues.AddIssue(NYql::TIssue(error));
- return Reply(code, issues, ctx);
- }
-
- for (const auto& alter : req->alter_columns()) {
- auto column = desc->AddColumns();
- column->SetName(alter.name());
- if (!alter.family().empty()) {
- column->SetFamilyName(alter.family());
- }
- }
-
- bool hadPartitionConfig = desc->HasPartitionConfig();
- TColumnFamilyManager families(desc->MutablePartitionConfig());
-
- // Apply storage settings to the default column family
- if (req->has_alter_storage_settings()) {
- Ydb::StatusIds::StatusCode code;
- TString error;
- if (!families.ApplyStorageSettings(req->alter_storage_settings(), &code, &error)) {
- NYql::TIssues issues;
- issues.AddIssue(NYql::TIssue(error));
- return Reply(code, issues, ctx);
- }
- }
-
- for (const auto& familySettings : req->add_column_families()) {
- Ydb::StatusIds::StatusCode code;
- TString error;
- if (!families.ApplyFamilySettings(familySettings, &code, &error)) {
- NYql::TIssues issues;
- issues.AddIssue(NYql::TIssue(error));
- return Reply(code, issues, ctx);
- }
- }
-
- for (const auto& familySettings : req->alter_column_families()) {
- Ydb::StatusIds::StatusCode code;
- TString error;
- if (!families.ApplyFamilySettings(familySettings, &code, &error)) {
- NYql::TIssues issues;
- issues.AddIssue(NYql::TIssue(error));
- return Reply(code, issues, ctx);
- }
- }
-
- // Avoid altering partition config unless we changed something
- if (!families.Modified && !hadPartitionConfig) {
- desc->ClearPartitionConfig();
- }
-
- if (!FillAlterTableSettingsDesc(*desc, *req, Profiles, code, error, AppData())) {
+ if (!BuildAlterTableModifyScheme(req, modifyScheme, Profiles, ResolvedPathId, code, error)) {
NYql::TIssues issues;
issues.AddIssue(NYql::TIssue(error));
return Reply(code, issues, ctx);
@@ -644,69 +344,6 @@ private:
ctx.Send(MakeTxProxyID(), proposeRequest.release());
}
- void AlterUserAttributes(const TPathId& pathId, const TActorContext &ctx) {
- const auto req = GetProtoRequest();
-
- std::pair<TString, TString> pathPair;
- try {
- pathPair = SplitPath(req->path());
- } catch (const std::exception&) {
- return ReplyWithStatus(StatusIds::BAD_REQUEST, ctx);
- }
-
- const auto& workingDir = pathPair.first;
- const auto& name = pathPair.second;
-
- std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction();
- auto& record = proposeRequest->Record;
- auto& modifyScheme = *record.MutableTransaction()->MutableModifyScheme();
-
- modifyScheme.SetWorkingDir(workingDir);
- modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterUserAttributes);
- modifyScheme.AddApplyIf()->SetPathId(pathId.LocalPathId);
-
- auto& alter = *modifyScheme.MutableAlterUserAttributes();
- alter.SetPathName(name);
-
- for (auto [key, value] : req->alter_attributes()) {
- auto& attr = *alter.AddUserAttributes();
- attr.SetKey(key);
- if (value) {
- attr.SetValue(value);
- }
- }
-
- ctx.Send(MakeTxProxyID(), proposeRequest.release());
- }
-
- void RenameIndex(const TActorContext &ctx) {
- const auto req = GetProtoRequest();
-
- std::pair<TString, TString> pathPair;
- try {
- pathPair = SplitPath(req->path());
- } catch (const std::exception&) {
- return ReplyWithStatus(StatusIds::BAD_REQUEST, ctx);
- }
-
- const auto& workingDir = pathPair.first;
-
- std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction();
- auto& record = proposeRequest->Record;
- auto& modifyScheme = *record.MutableTransaction()->MutableModifyScheme();
-
- modifyScheme.SetWorkingDir(workingDir);
- modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex);
-
- auto& alter = *modifyScheme.MutableMoveIndex();
- alter.SetTablePath(req->path());
- alter.SetSrcPath(req->rename_indexes(0).source_name());
- alter.SetDstPath(req->rename_indexes(0).destination_name());
- alter.SetAllowOverwrite(req->rename_indexes(0).replace_destination());
-
- ctx.Send(MakeTxProxyID(), proposeRequest.release());
- }
-
void ReplyWithStatus(StatusIds::StatusCode status,
const TActorContext &ctx) {
Request_->ReplyWithYdbStatus(status);
@@ -719,9 +356,11 @@ private:
TIntrusivePtr<NTxProxy::TTxProxyMon> TxProxyMon;
TString LogPrefix;
TActorId SSPipeClient;
- THolder<const NACLib::TUserToken> UserToken;
+ TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
+ TPathId ResolvedPathId;
TTableProfiles Profiles;
EOp OpType;
+ NKikimrIndexBuilder::TIndexBuildSettings IndexBuildSettings;
const ui64 Flags;
};
diff --git a/ydb/core/grpc_services/table_settings.cpp b/ydb/core/grpc_services/table_settings.cpp
index c84ac1fa495..4f3cccbe58a 100644
--- a/ydb/core/grpc_services/table_settings.cpp
+++ b/ydb/core/grpc_services/table_settings.cpp
@@ -28,23 +28,6 @@ bool FillCreateTableSettingsDesc(NKikimrSchemeOp::TTableDescription& out,
return NKikimr::FillCreateTableSettingsDesc(out, in, code, error, warnings, tableProfileSet);
}
-bool FillAlterTableSettingsDesc(NKikimrSchemeOp::TTableDescription& out,
- const Ydb::Table::AlterTableRequest& in, const TTableProfiles& profiles,
- Ydb::StatusIds::StatusCode& code, TString& error, const TAppData* appData) {
-
- bool changed = false;
- auto &partitionConfig = *out.MutablePartitionConfig();
-
- if (in.set_compaction_policy()) {
- if (!profiles.ApplyCompactionPolicy(in.set_compaction_policy(), partitionConfig, code, error, appData)) {
- return false;
- }
-
- changed = true;
- }
-
- return NKikimr::FillAlterTableSettingsDesc(out, in, code, error, changed);
-}
} // namespace NGRpcService
} // namespace NKikimr
diff --git a/ydb/core/grpc_services/table_settings.h b/ydb/core/grpc_services/table_settings.h
index 55654ea1818..88cdfb5769f 100644
--- a/ydb/core/grpc_services/table_settings.h
+++ b/ydb/core/grpc_services/table_settings.h
@@ -10,9 +10,6 @@ bool FillCreateTableSettingsDesc(NKikimrSchemeOp::TTableDescription& out,
const Ydb::Table::CreateTableRequest& in, const TTableProfiles& profiles,
Ydb::StatusIds::StatusCode& code, TString& error, TList<TString>& warnings);
-bool FillAlterTableSettingsDesc(NKikimrSchemeOp::TTableDescription& out,
- const Ydb::Table::AlterTableRequest& in, const TTableProfiles& profiles,
- Ydb::StatusIds::StatusCode& code, TString& error, const TAppData* appData);
} // namespace NGRpcService
} // namespace NKikimr
diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp
index 08d711ac2a9..bd020c87039 100644
--- a/ydb/core/ydb_convert/table_description.cpp
+++ b/ydb/core/ydb_convert/table_description.cpp
@@ -3,6 +3,7 @@
#include "table_settings.h"
#include "ydb_convert.h"
+#include <ydb/core/base/path.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/engine/mkql_proto.h>
#include <ydb/library/ydb_issue/proto/issue_id.pb.h>
@@ -33,6 +34,322 @@ static void FillStoragePool(TStoragePoolHolder* out, TAddStoragePoolFunc<TStorag
std::invoke(func, out)->set_media(in.GetPreferredPoolKind());
}
+THashSet<EAlterOperationKind> GetAlterOperationKinds(const Ydb::Table::AlterTableRequest* req) {
+ THashSet<EAlterOperationKind> ops;
+
+ if (req->add_columns_size() || req->drop_columns_size() ||
+ req->alter_columns_size() ||
+ req->ttl_action_case() !=
+ Ydb::Table::AlterTableRequest::TTL_ACTION_NOT_SET ||
+ req->tiering_action_case() !=
+ Ydb::Table::AlterTableRequest::TIERING_ACTION_NOT_SET ||
+ req->has_alter_storage_settings() || req->add_column_families_size() ||
+ req->alter_column_families_size() || req->set_compaction_policy() ||
+ req->has_alter_partitioning_settings() ||
+ req->set_key_bloom_filter() != Ydb::FeatureFlag::STATUS_UNSPECIFIED ||
+ req->has_set_read_replicas_settings())
+ {
+ ops.emplace(EAlterOperationKind::Common);
+ }
+
+ if (req->add_indexes_size()) {
+ ops.emplace(EAlterOperationKind::AddIndex);
+ }
+
+ if (req->drop_indexes_size()) {
+ ops.emplace(EAlterOperationKind::DropIndex);
+ }
+
+ if (req->add_changefeeds_size()) {
+ ops.emplace(EAlterOperationKind::AddChangefeed);
+ }
+
+ if (req->drop_changefeeds_size()) {
+ ops.emplace(EAlterOperationKind::DropChangefeed);
+ }
+
+ if (req->alter_attributes_size()) {
+ ops.emplace(EAlterOperationKind::Attribute);
+ }
+
+ if (req->rename_indexes_size()) {
+ ops.emplace(EAlterOperationKind::RenameIndex);
+ }
+
+ return ops;
+}
+
+namespace {
+
+std::pair<TString, TString> SplitPathIntoWorkingDirAndName(const TString& path) {
+ auto splitPos = path.find_last_of('/');
+ if (splitPos == path.npos || splitPos + 1 == path.size()) {
+ ythrow yexception() << "wrong path format '" << path << "'" ;
+ }
+ return {path.substr(0, splitPos), path.substr(splitPos + 1)};
+}
+
+}
+
+
+bool FillAlterTableSettingsDesc(NKikimrSchemeOp::TTableDescription& out,
+ const Ydb::Table::AlterTableRequest& in, const TTableProfiles& profiles,
+ Ydb::StatusIds::StatusCode& code, TString& error, const TAppData* appData) {
+
+ bool changed = false;
+ auto &partitionConfig = *out.MutablePartitionConfig();
+
+ if (in.set_compaction_policy()) {
+ if (!profiles.ApplyCompactionPolicy(in.set_compaction_policy(), partitionConfig, code, error, appData)) {
+ return false;
+ }
+
+ changed = true;
+ }
+
+ return NKikimr::FillAlterTableSettingsDesc(out, in, code, error, changed);
+}
+
+bool BuildAlterTableAddIndexRequest(const Ydb::Table::AlterTableRequest* req, NKikimrIndexBuilder::TIndexBuildSettings* settings,
+ ui64 flags,
+ Ydb::StatusIds::StatusCode& code, TString& error)
+{
+ const auto ops = GetAlterOperationKinds(req);
+ if (ops.size() != 1 || *ops.begin() != EAlterOperationKind::AddIndex) {
+ code = Ydb::StatusIds::INTERNAL_ERROR;
+ error = "Unexpected build alter table add index call.";
+ return false;
+ }
+
+ if (req->add_indexes_size() != 1) {
+ code = Ydb::StatusIds::UNSUPPORTED;
+ error = "Only one index can be added by one operation";
+ return false;
+ }
+
+ const auto desc = req->add_indexes(0);
+
+ if (!desc.name()) {
+ code = Ydb::StatusIds::BAD_REQUEST;
+ error = "Index must have a name";
+ return false;
+ }
+
+ if (!desc.index_columns_size()) {
+ code = Ydb::StatusIds::BAD_REQUEST;
+ error = "At least one column must be specified";
+ return false;
+ }
+
+ if (!desc.data_columns().empty() && !AppData()->FeatureFlags.GetEnableDataColumnForIndexTable()) {
+ code = Ydb::StatusIds::UNSUPPORTED;
+ error = "Data column feature is not supported yet";
+ return false;
+ }
+
+ if (flags & NKqpProto::TKqpSchemeOperation::FLAG_PG_MODE) {
+ settings->set_pg_mode(true);
+ }
+
+ if (flags & NKqpProto::TKqpSchemeOperation::FLAG_IF_NOT_EXISTS) {
+ settings->set_if_not_exist(true);
+ }
+
+ settings->set_source_path(req->path());
+ auto tableIndex = settings->mutable_index();
+ tableIndex->CopyFrom(req->add_indexes(0));
+
+ return true;
+}
+
+bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme, const TTableProfiles& profiles,
+ const TPathId& resolvedPathId,
+ Ydb::StatusIds::StatusCode& code, TString& error)
+{
+ std::pair<TString, TString> pathPair;
+ const auto ops = GetAlterOperationKinds(req);
+ if (ops.empty()) {
+ code = Ydb::StatusIds::BAD_REQUEST;
+ error = "Empty alter";
+ return false;
+ }
+
+ if (ops.size() > 1) {
+ code = Ydb::StatusIds::UNSUPPORTED;
+ error = "Mixed alter is unsupported";
+ return false;
+ }
+
+ const auto OpType = *ops.begin();
+
+ try {
+ pathPair = SplitPathIntoWorkingDirAndName(req->path());
+ } catch (const std::exception&) {
+ code = Ydb::StatusIds::BAD_REQUEST;
+ return false;
+ }
+
+ if (!AppData()->FeatureFlags.GetEnableChangefeeds() && OpType == EAlterOperationKind::AddChangefeed) {
+ code = Ydb::StatusIds::UNSUPPORTED;
+ error = "Changefeeds are not supported yet";
+ return false;
+ }
+
+ if (req->rename_indexes_size() != 1 && OpType == EAlterOperationKind::RenameIndex) {
+ code = Ydb::StatusIds::UNSUPPORTED;
+ error = "Only one index can be renamed by one operation";
+ return false;
+ }
+
+ if (req->drop_changefeeds_size() != 1 && OpType == EAlterOperationKind::DropChangefeed) {
+ code = Ydb::StatusIds::UNSUPPORTED;
+ error = "Only one changefeed can be removed by one operation";
+ return false;
+ }
+
+ if (req->add_changefeeds_size() != 1 && OpType == EAlterOperationKind::AddChangefeed) {
+ code = Ydb::StatusIds::UNSUPPORTED;
+ error = "Only one changefeed can be added by one operation";
+ return false;
+ }
+
+ if (req->drop_indexes_size() != 1 && OpType == EAlterOperationKind::DropIndex) {
+ code = Ydb::StatusIds::UNSUPPORTED;
+ error = "Only one index can be removed by one operation";
+ return false;
+ }
+
+ const auto& workingDir = pathPair.first;
+ const auto& name = pathPair.second;
+ modifyScheme->SetWorkingDir(workingDir);
+
+ for(const auto& rename: req->rename_indexes()) {
+ modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex);
+ auto& alter = *modifyScheme->MutableMoveIndex();
+ alter.SetTablePath(req->path());
+ alter.SetSrcPath(rename.source_name());
+ alter.SetDstPath(rename.destination_name());
+ alter.SetAllowOverwrite(rename.replace_destination());
+ }
+
+ for (const auto& drop : req->drop_changefeeds()) {
+ modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStream);
+ auto op = modifyScheme->MutableDropCdcStream();
+ op->SetStreamName(drop);
+ op->SetTableName(name);
+ }
+
+ for (const auto &add : req->add_changefeeds()) {
+ modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStream);
+ auto op = modifyScheme->MutableCreateCdcStream();
+ op->SetTableName(name);
+
+ if (add.has_retention_period()) {
+ op->SetRetentionPeriodSeconds(add.retention_period().seconds());
+ }
+
+ if (add.has_topic_partitioning_settings()) {
+ i64 minActivePartitions =
+ add.topic_partitioning_settings().min_active_partitions();
+ if (minActivePartitions < 0) {
+ code = Ydb::StatusIds::BAD_REQUEST;
+ error = "Topic partitions count must be positive";
+ return false;
+ } else if (minActivePartitions == 0) {
+ minActivePartitions = 1;
+ }
+ op->SetTopicPartitions(minActivePartitions);
+ }
+
+ if (!FillChangefeedDescription(*op->MutableStreamDescription(), add, code, error)) {
+ return false;
+ }
+ }
+
+ for (const auto& drop : req->drop_indexes()) {
+ modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropIndex);
+ auto desc = modifyScheme->MutableDropIndex();
+ desc->SetIndexName(drop);
+ desc->SetTableName(name);
+ }
+
+ if (OpType == EAlterOperationKind::Common) {
+ modifyScheme->SetOperationType(
+ NKikimrSchemeOp::EOperationType::ESchemeOpAlterTable);
+
+ auto desc = modifyScheme->MutableAlterTable();
+ desc->SetName(name);
+
+ for (const auto &drop : req->drop_columns()) {
+ desc->AddDropColumns()->SetName(drop);
+ }
+
+ if (!FillColumnDescription(*desc, req->add_columns(), code, error)) {
+ return false;
+ }
+
+ for (const auto &alter : req->alter_columns()) {
+ auto column = desc->AddColumns();
+ column->SetName(alter.name());
+ if (!alter.family().empty()) {
+ column->SetFamilyName(alter.family());
+ }
+ }
+
+ bool hadPartitionConfig = desc->HasPartitionConfig();
+ TColumnFamilyManager families(desc->MutablePartitionConfig());
+
+ // Apply storage settings to the default column family
+ if (req->has_alter_storage_settings()) {
+ if (!families.ApplyStorageSettings(req->alter_storage_settings(), &code,
+ &error)) {
+ return false;
+ }
+ }
+
+ for (const auto &familySettings : req->add_column_families()) {
+ if (!families.ApplyFamilySettings(familySettings, &code, &error)) {
+ return false;
+ }
+ }
+
+ for (const auto &familySettings : req->alter_column_families()) {
+ if (!families.ApplyFamilySettings(familySettings, &code, &error)) {
+ return false;
+ }
+ }
+
+ // Avoid altering partition config unless we changed something
+ if (!families.Modified && !hadPartitionConfig) {
+ desc->ClearPartitionConfig();
+ }
+
+ if (!FillAlterTableSettingsDesc(*desc, *req, profiles, code, error,
+ AppData())) {
+ return false;
+ }
+ }
+
+ if (OpType == EAlterOperationKind::Attribute) {
+ modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterUserAttributes);
+ modifyScheme->AddApplyIf()->SetPathId(resolvedPathId.LocalPathId);
+
+ auto& alter = *modifyScheme->MutableAlterUserAttributes();
+ alter.SetPathName(name);
+
+ for (auto [key, value] : req->alter_attributes()) {
+ auto& attr = *alter.AddUserAttributes();
+ attr.SetKey(key);
+ if (value) {
+ attr.SetValue(value);
+ }
+ }
+ }
+
+ return true;
+}
+
+
template <typename TColumn>
static Ydb::Type* AddColumn(Ydb::Table::ColumnMeta* newColumn, const TColumn& column) {
newColumn->set_name(column.GetName());
diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h
index d0e140a5f55..346e877cbfc 100644
--- a/ydb/core/ydb_convert/table_description.h
+++ b/ydb/core/ydb_convert/table_description.h
@@ -9,6 +9,38 @@
namespace NKikimr {
+enum class EAlterOperationKind {
+ // columns, column families, storage, ttl
+ Common,
+ // add indices
+ AddIndex,
+ // drop indices
+ DropIndex,
+ // add/alter/drop attributes
+ Attribute,
+ // add changefeeds
+ AddChangefeed,
+ // drop changefeeds
+ DropChangefeed,
+ // rename index
+ RenameIndex,
+};
+
+
+
+THashSet<EAlterOperationKind> GetAlterOperationKinds(const Ydb::Table::AlterTableRequest* req);
+bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme,
+ const TTableProfiles& profiles, const TPathId& resolvedPathId,
+ Ydb::StatusIds::StatusCode& status, TString& error);
+
+bool FillAlterTableSettingsDesc(NKikimrSchemeOp::TTableDescription& out,
+ const Ydb::Table::AlterTableRequest& in, const TTableProfiles& profiles,
+ Ydb::StatusIds::StatusCode& code, TString& error, const TAppData* appData);
+
+bool BuildAlterTableAddIndexRequest(const Ydb::Table::AlterTableRequest* req, NKikimrIndexBuilder::TIndexBuildSettings* settings,
+ ui64 flags,
+ Ydb::StatusIds::StatusCode& status, TString& error);
+
// out
void FillColumnDescription(Ydb::Table::DescribeTableResult& out,
NKikimrMiniKQL::TType& splitKeyType, const NKikimrSchemeOp::TTableDescription& in);