aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2022-06-20 21:34:13 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2022-06-20 21:34:13 +0300
commit1452b5fc9c83a08cc4a35d674049ef82d9b6dc1a (patch)
treed77a6ecfca7b63304478d1aac2d926b218adb2a0
parent33c65a59cda3a972e9d670f1bc81b864fc263acf (diff)
downloadydb-1452b5fc9c83a08cc4a35d674049ef82d9b6dc1a.tar.gz
Index move support for SchemeShard and DataShard. KIKIMR-13799
ref:b58e9b8ef40fee5c36b03a0540f95333a60d25cc
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp2
-rw-r--r--ydb/core/kqp/ut/kqp_scheme_ut.cpp178
-rw-r--r--ydb/core/protos/counters_schemeshard.proto2
-rw-r--r--ydb/core/protos/flat_scheme_op.proto10
-rw-r--r--ydb/core/protos/tx_datashard.proto20
-rw-r--r--ydb/core/testlib/test_client.cpp17
-rw-r--r--ydb/core/testlib/test_client.h2
-rw-r--r--ydb/core/tx/datashard/CMakeLists.txt1
-rw-r--r--ydb/core/tx/datashard/check_scheme_tx_unit.cpp18
-rw-r--r--ydb/core/tx/datashard/datashard.cpp77
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.h2
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h2
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.h1
-rw-r--r--ydb/core/tx/datashard/execution_unit.cpp2
-rw-r--r--ydb/core/tx/datashard/execution_unit_ctors.h1
-rw-r--r--ydb/core/tx/datashard/execution_unit_kind.h1
-rw-r--r--ydb/core/tx/datashard/move_index_unit.cpp87
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.txt1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation.cpp6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.cpp28
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.h4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp650
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp24
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp31
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_part.h7
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_element.h8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_tx_infly.h6
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/helpers.cpp27
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/helpers.h6
-rw-r--r--ydb/core/tx/schemeshard/ut_move.cpp257
-rw-r--r--ydb/core/tx/schemeshard/ut_move_reboots.cpp109
-rw-r--r--ydb/core/tx/tx_proxy/schemereq.cpp13
37 files changed, 1561 insertions, 60 deletions
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 4840e8e264..8b1d4426d3 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -380,7 +380,7 @@ void TKikimrRunner::Initialize(const TKikimrSettings& settings) {
// Server->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG);
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_YQL, NActors::NLog::PRI_DEBUG);
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_YQL, NActors::NLog::PRI_INFO);
- // Server->GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_DEBUG);
+ // Server->GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_TRACE);
// Server->GetRuntime()->SetLogPriority(NKikimrServices::TX_COORDINATOR, NActors::NLog::PRI_DEBUG);
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG);
// Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_TASKS_RUNNER, NActors::NLog::PRI_DEBUG);
diff --git a/ydb/core/kqp/ut/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/kqp_scheme_ut.cpp
index 875aaa08ba..a610e73539 100644
--- a/ydb/core/kqp/ut/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_scheme_ut.cpp
@@ -411,6 +411,184 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
SchemaVersionMissmatchWithIndexTest<UseNewEngine>(true);
}
+ template <bool UseNewEngine>
+ void TouchIndexAfterMoveIndex(bool write, bool replace) {
+ TKikimrRunner kikimr;
+
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ TString query1;
+ TString query2;
+ if (write) {
+ query1 = Q_(R"(
+ UPSERT INTO [/Root/KeyValue] (Key, Value) VALUES (10u, "New");
+ )");
+ query2 = query1;
+ } else {
+ query1 = Q1_(R"(
+ SELECT * FROM `/Root/KeyValue` VIEW `value_index` WHERE Value = "New";
+ )");
+ query2 = Q1_(R"(
+ SELECT * FROM `/Root/KeyValue` VIEW `moved_value_index` WHERE Value = "New";
+ )");
+
+ }
+
+ NYdb::NTable::TExecDataQuerySettings execSettings;
+ execSettings.KeepInQueryCache(true);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+
+ {
+ TString create_index_query = Q1_(R"(
+ ALTER TABLE `/Root/KeyValue` ADD INDEX value_index GLOBAL SYNC ON (`Value`);
+ )");
+ auto result = session.ExecuteSchemeQuery(create_index_query).ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ }
+
+ if (replace) {
+ TString create_index_query = Q1_(R"(
+ ALTER TABLE `/Root/KeyValue` ADD INDEX moved_value_index GLOBAL SYNC ON (`Value`);
+ )");
+ auto result = session.ExecuteSchemeQuery(create_index_query).ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = session.ExecuteDataQuery(query1, TTxControl::BeginTx(),
+ execSettings).ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
+ UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false);
+ }
+
+ {
+ kikimr.GetTestServer().GetRuntime()->GetAppData().AdministrationAllowedSIDs.push_back("root@builtin");
+ auto reply = kikimr.GetTestClient().MoveIndex("/Root/KeyValue", "value_index", "moved_value_index", "root@builtin");
+ const NKikimrClient::TResponse &response = reply->Record;
+ UNIT_ASSERT_VALUES_EQUAL((NMsgBusProxy::EResponseStatus)response.GetStatus(), NMsgBusProxy::MSTATUS_OK);
+ }
+
+ {
+ auto result = session.ExecuteDataQuery(query2,
+ TTxControl::BeginTx(), execSettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ if (write) {
+ auto commit = result.GetTransaction()->Commit().GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(commit.GetStatus(), EStatus::SUCCESS, commit.GetIssues().ToString());
+ }
+ }
+
+ {
+ auto result = session.ExecuteDataQuery(query2,
+ TTxControl::BeginTx(), execSettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto commit = result.GetTransaction()->Commit().GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(commit.GetStatus(), EStatus::SUCCESS, commit.GetIssues().ToString());
+ }
+ }
+
+ Y_UNIT_TEST_NEW_ENGINE(TouchIndexAfterMoveIndexRead) {
+ TouchIndexAfterMoveIndex<UseNewEngine>(false, false);
+ }
+
+ Y_UNIT_TEST_NEW_ENGINE(TouchIndexAfterMoveIndexWrite) {
+ TouchIndexAfterMoveIndex<UseNewEngine>(true, false);
+ }
+
+ Y_UNIT_TEST_NEW_ENGINE(TouchIndexAfterMoveIndexReadReplace) {
+ TouchIndexAfterMoveIndex<UseNewEngine>(false, true);
+ }
+
+ Y_UNIT_TEST_NEW_ENGINE(TouchIndexAfterMoveIndexWriteReplace) {
+ TouchIndexAfterMoveIndex<UseNewEngine>(true, true);
+ }
+
+ template <bool UseNewEngine>
+ void TouchIndexAfterMoveTable(bool write) {
+ TKikimrRunner kikimr;
+
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ TString query1;
+ TString query2;
+ if (write) {
+ query1 = Q_(R"(
+ UPSERT INTO [/Root/KeyValue] (Key, Value) VALUES (10u, "New");
+ )");
+ query2 = Q_(R"(
+ UPSERT INTO [/Root/KeyValueMoved] (Key, Value) VALUES (10u, "New");
+ )");
+ } else {
+ query1 = Q1_(R"(
+ SELECT * FROM `/Root/KeyValue` VIEW `value_index` WHERE Value = "New";
+ )");
+ query2 = Q1_(R"(
+ SELECT * FROM `/Root/KeyValueMoved` VIEW `value_index` WHERE Value = "New";
+ )");
+
+ }
+
+ NYdb::NTable::TExecDataQuerySettings execSettings;
+ execSettings.KeepInQueryCache(true);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+
+ {
+ TString create_index_query = Q1_(R"(
+ ALTER TABLE `/Root/KeyValue` ADD INDEX value_index GLOBAL SYNC ON (`Value`);
+ )");
+ auto result = session.ExecuteSchemeQuery(create_index_query).ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = session.ExecuteDataQuery(query1, TTxControl::BeginTx(),
+ execSettings).ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
+ UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false);
+ }
+
+ {
+ auto query = TStringBuilder() << R"(
+ --!syntax_v1
+ ALTER TABLE `/Root/KeyValue` RENAME TO `/Root/KeyValueMoved`;
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query << ";").GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = session.ExecuteDataQuery(query2,
+ TTxControl::BeginTx(), execSettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto commit = result.GetTransaction()->Commit().GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(commit.GetStatus(), EStatus::SUCCESS, commit.GetIssues().ToString());
+ }
+ }
+
+ Y_UNIT_TEST_NEW_ENGINE(TouchIndexAfterMoveTableRead) {
+ TouchIndexAfterMoveTable<UseNewEngine>(false);
+ }
+
+ Y_UNIT_TEST_NEW_ENGINE(TouchIndexAfterMoveTableWrite) {
+ TouchIndexAfterMoveTable<UseNewEngine>(true);
+ }
+
void CheckInvalidationAfterDropCreateTable(bool withCompatSchema) {
TKikimrRunner kikimr;
diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto
index 9b0f887494..da67bd4a1c 100644
--- a/ydb/core/protos/counters_schemeshard.proto
+++ b/ydb/core/protos/counters_schemeshard.proto
@@ -166,6 +166,7 @@ enum ESimpleCounters {
COUNTER_IN_FLIGHT_OPS_TxDropBlobDepot = 135 [(CounterOpts) = {Name: "InFlightOps/DropBlobDepot"}];
COUNTER_STATS_QUEUE_SIZE = 136 [(CounterOpts) = {Name: "StatsQueueSize"}];
+ COUNTER_IN_FLIGHT_OPS_TxUpdateMainTableOnIndexMove = 137 [(CounterOpts) = {Name: "InFlightOps/UpdateMainTableOnIndexMove"}];
}
enum ECumulativeCounters {
@@ -266,6 +267,7 @@ enum ECumulativeCounters {
COUNTER_FINISHED_OPS_TxDropBlobDepot = 80 [(CounterOpts) = {Name: "FinishedOps/DropBlobDepot"}];
COUNTER_STATS_WRITTEN = 81 [(CounterOpts) = {Name: "StatsWritten"}];
+ COUNTER_FINISHED_OPS_TxUpdateMainTableOnIndexMove = 82 [(CounterOpts) = {Name: "FinishedOps/UpdateMainTableOnIndexMove"}];
}
enum EPercentileCounters {
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index c4e595246c..8f6a0502c0 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -1074,6 +1074,12 @@ message TMove { // private description of the operation
optional string DstPath = 2;
}
+message TMoveIndex {
+ optional string TablePath = 1;
+ optional string SrcPath = 2;
+ optional string DstPath = 3;
+}
+
message TSequenceDescription {
optional string Name = 1; // mandatory
optional NKikimrProto.TPathID PathId = 2; // sequence path id, assigned by schemeshard
@@ -1221,6 +1227,9 @@ enum EOperationType {
ESchemeOpCreateBlobDepot = 79;
ESchemeOpAlterBlobDepot = 80;
ESchemeOpDropBlobDepot = 81;
+
+ // Move index
+ ESchemeOpMoveIndex = 82;
}
message TApplyIf {
@@ -1324,6 +1333,7 @@ message TModifyScheme {
optional TSequenceDescription Sequence = 51;
optional TReplicationDescription Replication = 52;
optional TBlobDepotDescription BlobDepot = 54;
+ optional TMoveIndex MoveIndex = 55;
}
// "Script", used by client to parse text files with multiple DDL commands
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index d2c342e82e..598d68a8fd 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -378,6 +378,13 @@ message TDropCdcStreamNotice {
message TAsyncIndexInfo {
}
+message TRemapIndexPathId {
+ optional NKikimrProto.TPathID SrcPathId = 1;
+ optional NKikimrProto.TPathID DstPathId = 2;
+ optional NKikimrProto.TPathID ReplacedPathId = 3;
+ optional string DstName = 4;
+}
+
message TMoveTable {
optional NKikimrProto.TPathID PathId = 1;
optional uint64 TableSchemaVersion = 2;
@@ -385,14 +392,16 @@ message TMoveTable {
optional NKikimrProto.TPathID DstPathId = 3;
optional string DstPath = 4;
- message TRemapIndexPathId {
- optional NKikimrProto.TPathID PathId = 1;
- optional NKikimrProto.TPathID DstPathId = 2;
- }
-
repeated TRemapIndexPathId ReMapIndexes = 5;
}
+message TMoveIndex {
+ optional NKikimrProto.TPathID PathId = 1;
+ optional uint64 TableSchemaVersion = 2;
+
+ optional TRemapIndexPathId ReMapIndex = 3;
+}
+
message TFlatSchemeTransaction {
optional NKikimrSchemeOp.TTableDescription CreateTable = 1;
optional NKikimrSchemeOp.TTableDescription DropTable = 2;
@@ -421,6 +430,7 @@ message TFlatSchemeTransaction {
optional TCreateCdcStreamNotice CreateCdcStreamNotice = 18;
optional TAlterCdcStreamNotice AlterCdcStreamNotice = 19;
optional TDropCdcStreamNotice DropCdcStreamNotice = 20;
+ optional TMoveIndex MoveIndex = 21;
}
message TDistributedEraseTransaction {
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 309c4977e2..8164409a78 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -1526,6 +1526,23 @@ namespace Tests {
return dynamic_cast<NMsgBusProxy::TBusResponse *>(reply.Release());
}
+ TAutoPtr<NMsgBusProxy::TBusResponse> TClient::MoveIndex(const TString& table, const TString& src, const TString& dst, const TString& userToken) {
+ TAutoPtr<NMsgBusProxy::TBusSchemeOperation> request(new NMsgBusProxy::TBusSchemeOperation());
+ auto *op = request->Record.MutableTransaction()->MutableModifyScheme();
+ op->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex);
+ auto descr = op->MutableMoveIndex();
+ descr->SetTablePath(table);
+ descr->SetSrcPath(src);
+ descr->SetDstPath(dst);
+ TAutoPtr<NBus::TBusMessage> reply;
+ if (userToken) {
+ request->Record.SetSecurityToken(userToken);
+ }
+ NBus::EMessageStatus status = SendAndWaitCompletion(request.Release(), reply);
+ UNIT_ASSERT_VALUES_EQUAL(status, NBus::MESSAGE_OK);
+ return dynamic_cast<NMsgBusProxy::TBusResponse *>(reply.Release());
+ }
+
TAutoPtr<NMsgBusProxy::TBusResponse> TClient::AlterTable(const TString& parent, const TString& alter, const TString& userToken) {
NKikimrSchemeOp::TTableDescription table;
bool parseOk = ::google::protobuf::TextFormat::ParseFromString(alter, &table);
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index 0ecd068a23..e2143be72a 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -390,6 +390,8 @@ namespace Tests {
TAutoPtr<NMsgBusProxy::TBusResponse> AlterTable(const TString& parent, const NKikimrSchemeOp::TTableDescription& update, const TString& userToken);
TAutoPtr<NMsgBusProxy::TBusResponse> AlterTable(const TString& parent, const TString& alter, const TString& userToken);
+ TAutoPtr<NMsgBusProxy::TBusResponse> MoveIndex(const TString& table, const TString& src, const TString& dst, const TString& userToken);
+
NMsgBusProxy::EResponseStatus CreateOlapStore(const TString& parent, const TString& scheme);
NMsgBusProxy::EResponseStatus CreateOlapStore(const TString& parent, const NKikimrSchemeOp::TColumnStoreDescription& store);
NMsgBusProxy::EResponseStatus CreateOlapTable(const TString& parent, const TString& scheme);
diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt
index cb451d7fed..8d3ff703fb 100644
--- a/ydb/core/tx/datashard/CMakeLists.txt
+++ b/ydb/core/tx/datashard/CMakeLists.txt
@@ -175,6 +175,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/load_tx_details_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/make_scan_snapshot_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/make_snapshot_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/move_index_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/move_table_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/operation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/plan_queue_unit.cpp
diff --git a/ydb/core/tx/datashard/check_scheme_tx_unit.cpp b/ydb/core/tx/datashard/check_scheme_tx_unit.cpp
index efcd47553d..c42c0f5c9b 100644
--- a/ydb/core/tx/datashard/check_scheme_tx_unit.cpp
+++ b/ydb/core/tx/datashard/check_scheme_tx_unit.cpp
@@ -34,6 +34,7 @@ private:
bool CheckFinalizeBuildIndex(TActiveTransaction *activeTx);
bool CheckDropIndexNotice(TActiveTransaction *activeTx);
bool CheckMoveTable(TActiveTransaction *activeTx);
+ bool CheckMoveIndex(TActiveTransaction *activeTx);
bool CheckCreateCdcStream(TActiveTransaction *activeTx);
bool CheckAlterCdcStream(TActiveTransaction *activeTx);
bool CheckDropCdcStream(TActiveTransaction *activeTx);
@@ -354,6 +355,9 @@ bool TCheckSchemeTxUnit::CheckSchemeTx(TActiveTransaction *activeTx)
case TSchemaOperation::ETypeMoveTable:
res = CheckMoveTable(activeTx);
break;
+ case TSchemaOperation::ETypeMoveIndex:
+ res = CheckMoveIndex(activeTx);
+ break;
case TSchemaOperation::ETypeCreateCdcStream:
res = CheckCreateCdcStream(activeTx);
break;
@@ -672,6 +676,20 @@ bool TCheckSchemeTxUnit::CheckMoveTable(TActiveTransaction *activeTx) {
return CheckSchemaVersion(activeTx, mv);
}
+bool TCheckSchemeTxUnit::CheckMoveIndex(TActiveTransaction *activeTx) {
+ if (HasDuplicate(activeTx, "MoveIndex", &TPipeline::HasMoveIndex)) {
+ return false;
+ }
+
+ const auto &mv = activeTx->GetSchemeTx().GetMoveIndex();
+ if (!HasPathId(activeTx, mv, "MoveIndex")) {
+ return false;
+ }
+
+ auto ret = CheckSchemaVersion(activeTx, mv);
+ return ret;
+}
+
bool TCheckSchemeTxUnit::CheckCreateCdcStream(TActiveTransaction *activeTx) {
if (HasDuplicate(activeTx, "CreateCdcStream", &TPipeline::HasCreateCdcStream)) {
return false;
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index a4f722adc3..bdbae4d8ee 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -988,7 +988,7 @@ TUserTable::TPtr TDataShard::CreateUserTable(TTransactionContext& txc,
THashMap<TPathId, TPathId> TDataShard::GetRemapIndexes(const NKikimrTxDataShard::TMoveTable& move) {
THashMap<TPathId, TPathId> remap;
for (const auto& item: move.GetReMapIndexes()) {
- const auto prevId = PathIdFromPathId(item.GetPathId());
+ const auto prevId = PathIdFromPathId(item.GetSrcPathId());
const auto newId = PathIdFromPathId(item.GetDstPathId());
remap[prevId] = newId;
}
@@ -1044,6 +1044,81 @@ TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxD
return newTableInfo;
}
+TUserTable::TPtr TDataShard::MoveUserIndex(TOperation::TPtr op, const NKikimrTxDataShard::TMoveIndex& move,
+ const TActorContext& ctx, TTransactionContext& txc)
+{
+ const auto pathId = PathIdFromPathId(move.GetPathId());
+
+ Y_VERIFY(GetPathOwnerId() == pathId.OwnerId);
+ Y_VERIFY(TableInfos.contains(pathId.LocalPathId));
+
+ const auto version = move.GetTableSchemaVersion();
+ Y_VERIFY(version);
+
+ auto newTableInfo = AlterTableSchemaVersion(ctx, txc, pathId, version, false);
+
+ NKikimrSchemeOp::TTableDescription schema;
+ newTableInfo->GetSchema(schema);
+
+ if (move.GetReMapIndex().HasReplacedPathId()) {
+ const auto oldPathId = PathIdFromPathId(move.GetReMapIndex().GetReplacedPathId());
+ newTableInfo->Indexes.erase(oldPathId);
+
+ size_t id = 0;
+ bool found = false;
+ for (auto& indexDesc: *schema.MutableTableIndexes()) {
+ Y_VERIFY(indexDesc.HasPathOwnerId() && indexDesc.HasLocalPathId());
+ auto pathId = TPathId(indexDesc.GetPathOwnerId(), indexDesc.GetLocalPathId());
+ if (oldPathId == pathId) {
+ found = true;
+ break;
+ } else {
+ id++;
+ }
+ }
+
+ if (found) {
+ schema.MutableTableIndexes()->DeleteSubrange(id, 1);
+ }
+ }
+
+ const auto remapPrevId = PathIdFromPathId(move.GetReMapIndex().GetSrcPathId());
+ const auto remapNewId = PathIdFromPathId(move.GetReMapIndex().GetDstPathId());
+ Y_VERIFY(move.GetReMapIndex().HasDstName());
+ const auto dstIndexName = move.GetReMapIndex().GetDstName();
+
+ for (auto& indexDesc: *schema.MutableTableIndexes()) {
+ Y_VERIFY(indexDesc.HasPathOwnerId() && indexDesc.HasLocalPathId());
+ auto prevPathId = TPathId(indexDesc.GetPathOwnerId(), indexDesc.GetLocalPathId());
+ if (remapPrevId != prevPathId) {
+ continue;
+ }
+
+ indexDesc.SetPathOwnerId(remapNewId.OwnerId);
+ indexDesc.SetLocalPathId(remapNewId.LocalPathId);
+
+ newTableInfo->Indexes[remapNewId] = newTableInfo->Indexes[prevPathId];
+ newTableInfo->Indexes.erase(prevPathId);
+
+ Y_VERIFY(move.GetReMapIndex().HasDstName());
+ indexDesc.SetName(dstIndexName);
+ newTableInfo->Indexes[remapNewId].Name = dstIndexName;
+ }
+
+ newTableInfo->SetSchema(schema);
+
+ AddUserTable(pathId, newTableInfo);
+
+ if (newTableInfo->NeedSchemaSnapshots()) {
+ AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+ PersistUserTable(db, pathId.LocalPathId, *newTableInfo);
+
+ return newTableInfo;
+}
+
TUserTable::TPtr TDataShard::AlterUserTable(const TActorContext& ctx, TTransactionContext& txc,
const NKikimrSchemeOp::TTableDescription& alter)
{
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp
index 72b36561e9..a325090a9a 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.cpp
+++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp
@@ -428,7 +428,8 @@ bool TActiveTransaction::BuildSchemeTx()
+ (ui32)SchemeTx->HasMoveTable()
+ (ui32)SchemeTx->HasCreateCdcStreamNotice()
+ (ui32)SchemeTx->HasAlterCdcStreamNotice()
- + (ui32)SchemeTx->HasDropCdcStreamNotice();
+ + (ui32)SchemeTx->HasDropCdcStreamNotice()
+ + (ui32)SchemeTx->HasMoveIndex();
if (count != 1)
return false;
@@ -462,6 +463,8 @@ bool TActiveTransaction::BuildSchemeTx()
SchemeTxType = TSchemaOperation::ETypeAlterCdcStream;
else if (SchemeTx->HasDropCdcStreamNotice())
SchemeTxType = TSchemaOperation::ETypeDropCdcStream;
+ else if (SchemeTx->HasMoveIndex())
+ SchemeTxType = TSchemaOperation::ETypeMoveIndex;
else
SchemeTxType = TSchemaOperation::ETypeUnknown;
@@ -859,6 +862,7 @@ void TActiveTransaction::BuildExecutionPlan(bool loaded)
plan.push_back(EExecutionUnitKind::FinalizeBuildIndex);
plan.push_back(EExecutionUnitKind::DropIndexNotice);
plan.push_back(EExecutionUnitKind::MoveTable);
+ plan.push_back(EExecutionUnitKind::MoveIndex);
plan.push_back(EExecutionUnitKind::CreateCdcStream);
plan.push_back(EExecutionUnitKind::AlterCdcStream);
plan.push_back(EExecutionUnitKind::DropCdcStream);
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h
index b2289b055d..2330acacb2 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.h
+++ b/ydb/core/tx/datashard/datashard_active_transaction.h
@@ -52,6 +52,7 @@ struct TSchemaOperation {
ETypeCreateCdcStream = 13,
ETypeAlterCdcStream = 14,
ETypeDropCdcStream = 15,
+ ETypeMoveIndex = 16,
ETypeUnknown = Max<ui32>()
};
@@ -103,6 +104,7 @@ struct TSchemaOperation {
bool IsFinalizeBuildIndex() const { return Type == ETypeFinalizeBuildIndex; }
bool IsDropIndexNotice() const { return Type == ETypeDropIndexNotice; }
bool IsMove() const { return Type == ETypeMoveTable; }
+ bool IsMoveIndex() const { return Type == ETypeMoveIndex; }
bool IsCreateCdcStream() const { return Type == ETypeCreateCdcStream; }
bool IsAlterCdcStream() const { return Type == ETypeAlterCdcStream; }
bool IsDropCdcStream() const { return Type == ETypeDropCdcStream; }
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 2377567430..af75381e1c 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1440,6 +1440,8 @@ public:
static THashMap<TPathId, TPathId> GetRemapIndexes(const NKikimrTxDataShard::TMoveTable& move);
TUserTable::TPtr MoveUserTable(TOperation::TPtr op, const NKikimrTxDataShard::TMoveTable& move,
const TActorContext& ctx, TTransactionContext& txc);
+ TUserTable::TPtr MoveUserIndex(TOperation::TPtr op, const NKikimrTxDataShard::TMoveIndex& move,
+ const TActorContext& ctx, TTransactionContext& txc);
void DropUserTable(TTransactionContext& txc, ui64 tableId);
ui32 GetLastLocalTid() const { return LastLocalTid; }
diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h
index 7410d4b0fe..2e3a51c07d 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.h
+++ b/ydb/core/tx/datashard/datashard_pipeline.h
@@ -163,6 +163,7 @@ public:
bool HasFinalizeBuilIndex() const { return SchemaTx && SchemaTx->IsFinalizeBuildIndex(); }
bool HasDropIndexNotice() const { return SchemaTx && SchemaTx->IsDropIndexNotice(); }
bool HasMove() const { return SchemaTx && SchemaTx->IsMove(); }
+ bool HasMoveIndex() const { return SchemaTx && SchemaTx->IsMoveIndex(); }
bool HasCreateCdcStream() const { return SchemaTx && SchemaTx->IsCreateCdcStream(); }
bool HasAlterCdcStream() const { return SchemaTx && SchemaTx->IsAlterCdcStream(); }
bool HasDropCdcStream() const { return SchemaTx && SchemaTx->IsDropCdcStream(); }
diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp
index 9f6e13fa5c..94f8ffc241 100644
--- a/ydb/core/tx/datashard/execution_unit.cpp
+++ b/ydb/core/tx/datashard/execution_unit.cpp
@@ -124,6 +124,8 @@ THolder<TExecutionUnit> CreateExecutionUnit(EExecutionUnitKind kind,
return CreateAlterCdcStreamUnit(dataShard, pipeline);
case EExecutionUnitKind::DropCdcStream:
return CreateDropCdcStreamUnit(dataShard, pipeline);
+ case EExecutionUnitKind::MoveIndex:
+ return CreateMoveIndexUnit(dataShard, pipeline);
default:
Y_FAIL_S("Unexpected execution kind " << kind << " (" << (ui32)kind << ")");
}
diff --git a/ydb/core/tx/datashard/execution_unit_ctors.h b/ydb/core/tx/datashard/execution_unit_ctors.h
index f043a02fad..ec62449d1a 100644
--- a/ydb/core/tx/datashard/execution_unit_ctors.h
+++ b/ydb/core/tx/datashard/execution_unit_ctors.h
@@ -58,6 +58,7 @@ THolder<TExecutionUnit> CreateDropVolatileSnapshotUnit(TDataShard &dataShard, TP
THolder<TExecutionUnit> CreateInitiateBuildIndexUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateFinalizeBuildIndexUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateDropIndexNoticeUnit(TDataShard &dataShard, TPipeline &pipeline);
+THolder<TExecutionUnit> CreateMoveIndexUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateMoveTableUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateCreateCdcStreamUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateAlterCdcStreamUnit(TDataShard &dataShard, TPipeline &pipeline);
diff --git a/ydb/core/tx/datashard/execution_unit_kind.h b/ydb/core/tx/datashard/execution_unit_kind.h
index 88e763887f..4aefa0ca6e 100644
--- a/ydb/core/tx/datashard/execution_unit_kind.h
+++ b/ydb/core/tx/datashard/execution_unit_kind.h
@@ -62,6 +62,7 @@ enum class EExecutionUnitKind : ui32 {
CreateCdcStream,
AlterCdcStream,
DropCdcStream,
+ MoveIndex,
Count,
Unspecified
};
diff --git a/ydb/core/tx/datashard/move_index_unit.cpp b/ydb/core/tx/datashard/move_index_unit.cpp
new file mode 100644
index 0000000000..0799b9daeb
--- /dev/null
+++ b/ydb/core/tx/datashard/move_index_unit.cpp
@@ -0,0 +1,87 @@
+#include "datashard_impl.h"
+#include "datashard_pipeline.h"
+#include "execution_unit_ctors.h"
+
+namespace NKikimr {
+namespace NDataShard {
+
+class TMoveIndexUnit : public TExecutionUnit {
+public:
+ TMoveIndexUnit(TDataShard& dataShard, TPipeline& pipeline)
+ : TExecutionUnit(EExecutionUnitKind::MoveIndex, false, dataShard, pipeline)
+ { }
+
+ bool IsReadyToExecute(TOperation::TPtr) const override {
+ return true;
+ }
+
+ void MoveChangeRecords(NIceDb::TNiceDb& db, const NKikimrTxDataShard::TMoveIndex& move, TVector<NMiniKQL::IChangeCollector::TChange>& changeRecords) {
+ const auto remapPrevId = PathIdFromPathId(move.GetReMapIndex().GetSrcPathId());
+ const auto remapNewId = PathIdFromPathId(move.GetReMapIndex().GetDstPathId());
+
+ for (auto& record: changeRecords) {
+ if (record.PathId() == remapPrevId) {
+ record.SetPathId(remapNewId);
+ DataShard.MoveChangeRecord(db, record.Order(), record.PathId());
+ }
+ }
+ }
+
+ EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override {
+ Y_VERIFY(op->IsSchemeTx());
+
+ TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
+ Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
+
+ if (tx->GetSchemeTxType() != TSchemaOperation::ETypeMoveIndex) {
+ return EExecutionStatus::Executed;
+ }
+
+ const auto& schemeTx = tx->GetSchemeTx();
+
+ if (!schemeTx.HasMoveIndex()) {
+ return EExecutionStatus::Executed;
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+ TVector<NMiniKQL::IChangeCollector::TChange> changeRecords;
+ if (!DataShard.LoadChangeRecords(db, changeRecords)) {
+ return EExecutionStatus::Restart;
+ }
+
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "TMoveIndexUnit Execute"
+ << ": schemeTx# " << schemeTx.DebugString()
+ << ": changeRecords size# " << changeRecords.size()
+ << ", at tablet# " << DataShard.TabletID());
+
+ const auto& params = schemeTx.GetMoveIndex();
+
+ DataShard.KillChangeSender(ctx);
+
+ DataShard.MoveUserIndex(op, params, ctx, txc);
+
+ DataShard.CreateChangeSender(ctx);
+ MoveChangeRecords(db, params, changeRecords);
+ DataShard.EnqueueChangeRecords(std::move(changeRecords));
+ DataShard.MaybeActivateChangeSender(ctx);
+
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
+ op->Result()->SetStepOrderId(op->GetStepOrder().ToPair());
+
+ return EExecutionStatus::ExecutedNoMoreRestarts;
+ }
+
+ void Complete(TOperation::TPtr, const TActorContext&) override {
+ // nothing
+ }
+};
+
+THolder<TExecutionUnit> CreateMoveIndexUnit(
+ TDataShard& dataShard,
+ TPipeline& pipeline)
+{
+ return THolder(new TMoveIndexUnit(dataShard, pipeline));
+}
+
+} // namespace NDataShard
+} // namespace NKikimr
diff --git a/ydb/core/tx/schemeshard/CMakeLists.txt b/ydb/core/tx/schemeshard/CMakeLists.txt
index 35f51b27ca..480e801ccb 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.txt
@@ -125,6 +125,7 @@ target_sources(core-tx-schemeshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_mkdir.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_modify_acl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp
index 2180805f4e..49455b4119 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp
@@ -845,6 +845,8 @@ ISubOperationBase::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxSta
return CreateFinalizeBuildIndexMainTable(NextPartId(), txState);
case TTxState::ETxType::TxDropTableIndexAtMainTable:
return CreateDropTableIndexAtMainTable(NextPartId(), txState);
+ case TTxState::ETxType::TxUpdateMainTableOnIndexMove:
+ return CreateUpdateMainTableOnIndexMove(NextPartId(), txState);
case TTxState::ETxType::TxCreateLockForIndexBuild:
return CreateLockForIndexBuild(NextPartId(), txState);
case TTxState::ETxType::TxDropLock:
@@ -1090,6 +1092,8 @@ ISubOperationBase::TPtr TOperation::ConstructPart(NKikimrSchemeOp::EOperationTyp
return CreateMoveTable(NextPartId(), tx);
case NKikimrSchemeOp::EOperationType::ESchemeOpMoveTableIndex:
return CreateMoveTableIndex(NextPartId(), tx);
+ case NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex:
+ Y_FAIL("imposible");
// Replication
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateReplication:
@@ -1148,6 +1152,8 @@ TVector<ISubOperationBase::TPtr> TOperation::ConstructParts(const TTxTransaction
return CreateConsistentMoveTable(NextPartId(), tx, context);
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterTable:
return CreateConsistentAlterTable(NextPartId(), tx, context);
+ case NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex:
+ return CreateConsistentMoveIndex(NextPartId(), tx, context);
default:
return {ConstructPart(opType, tx)};
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
index 1da7c610a4..4ef28ff926 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
@@ -268,6 +268,8 @@ void NTableState::UpdatePartitioningForTableModification(TOperationId operationI
commonShardOp = TTxState::ConfigureParts;
} else if (txState.TxType == TTxState::TxDropTableIndexAtMainTable) {
commonShardOp = TTxState::ConfigureParts;
+ } else if (txState.TxType == TTxState::TxUpdateMainTableOnIndexMove) {
+ commonShardOp = TTxState::ConfigureParts;
} else if (txState.TxType == TTxState::TxCreateCdcStreamAtTable) {
commonShardOp = TTxState::ConfigureParts;
} else if (txState.TxType == TTxState::TxAlterCdcStreamAtTable) {
@@ -570,5 +572,31 @@ void IncParentDirAlterVersionWithRepublish(const TOperationId& opId, const TPath
}
}
+NKikimrSchemeOp::TModifyScheme MoveTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst) {
+ NKikimrSchemeOp::TModifyScheme scheme;
+
+ scheme.SetWorkingDir(dst.Parent().PathString());
+ scheme.SetFailOnExist(true);
+ scheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveTable);
+ auto operation = scheme.MutableMoveTable();
+ operation->SetSrcPath(src.PathString());
+ operation->SetDstPath(dst.PathString());
+
+ return scheme;
+}
+
+NKikimrSchemeOp::TModifyScheme MoveTableIndexTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst) {
+ NKikimrSchemeOp::TModifyScheme scheme;
+
+ scheme.SetWorkingDir(dst.Parent().PathString());
+ scheme.SetFailOnExist(true);
+ scheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveTableIndex);
+ auto operation = scheme.MutableMoveTableIndex();
+ operation->SetSrcPath(src.PathString());
+ operation->SetDstPath(dst.PathString());
+
+ return scheme;
+}
+
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
index efff3d0645..bd9a5cb9d3 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
@@ -13,6 +13,9 @@ TSet<ui32> AllIncomingEvents();
void IncParentDirAlterVersionWithRepublishSafeWithUndo(const TOperationId& opId, const TPath& path, TSchemeShard* ss, TSideEffects& onComplete);
void IncParentDirAlterVersionWithRepublish(const TOperationId& opId, const TPath& path, TOperationContext& context);
+NKikimrSchemeOp::TModifyScheme MoveTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst);
+NKikimrSchemeOp::TModifyScheme MoveTableIndexTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst);
+
namespace NTableState {
bool CollectProposeTransactionResults(const TOperationId& operationId, const TEvDataShard::TEvProposeTransactionResult::TPtr& ev, TOperationContext& context);
@@ -459,7 +462,6 @@ public:
}
-
class TCreateParts: public TSubOperationState {
private:
TOperationId OperationId;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp
new file mode 100644
index 0000000000..67f67a39bc
--- /dev/null
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp
@@ -0,0 +1,650 @@
+#include "schemeshard__operation_part.h"
+#include "schemeshard__operation_common.h"
+#include "schemeshard_path_element.h"
+
+#include "schemeshard_impl.h"
+
+#include <ydb/core/base/path.h>
+#include <ydb/core/protos/flat_tx_scheme.pb.h>
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+
+namespace {
+
+using namespace NKikimr;
+using namespace NSchemeShard;
+
+class TConfigureParts: public TSubOperationState {
+private:
+ TOperationId OperationId;
+
+ TString DebugHint() const override {
+ return TStringBuilder()
+ << "TUpdateMainTableOnIndexMove TConfigureParts"
+ << " operationId#" << OperationId;
+ }
+
+public:
+ TConfigureParts(TOperationId id)
+ : OperationId(id)
+ {
+ IgnoreMessages(DebugHint(), {});
+ }
+
+ bool HandleReply(TEvDataShard::TEvProposeTransactionResult::TPtr& ev, TOperationContext& context) override {
+ TTabletId ssId = context.SS->SelfTabletId();
+
+ LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " HandleReply TEvProposeTransactionResult"
+ << " at tabletId# " << ssId);
+ LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " HandleReply TEvProposeTransactionResult"
+ << " message# " << ev->Get()->Record.ShortDebugString());
+
+ if (!NTableState::CollectProposeTransactionResults(OperationId, ev, context)) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ bool ProgressState(TOperationContext& context) override {
+ TTabletId ssId = context.SS->SelfTabletId();
+ LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " ProgressState"
+ << ", at schemeshard: " << ssId);
+
+ TTxState* txState = context.SS->FindTx(OperationId);
+ Y_VERIFY(txState);
+ Y_VERIFY(txState->TxType == TTxState::TxUpdateMainTableOnIndexMove);
+
+ //fill txShards
+ if (NTableState::CheckPartitioningChangedForTableModification(*txState, context)) {
+ LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " UpdatePartitioningForTableModification");
+ NTableState::UpdatePartitioningForTableModification(OperationId, *txState, context);
+ }
+
+ txState->ClearShardsInProgress();
+
+ TString txBody;
+ {
+ TPathId pathId = txState->TargetPathId;
+ Y_VERIFY(context.SS->PathsById.contains(pathId));
+ TPathElement::TPtr path = context.SS->PathsById.at(pathId);
+ Y_VERIFY(path);
+
+ Y_VERIFY(context.SS->Tables.contains(pathId));
+ TTableInfo::TPtr table = context.SS->Tables.at(pathId);
+ Y_VERIFY(table);
+
+ auto seqNo = context.SS->StartRound(*txState);
+
+ NKikimrTxDataShard::TFlatSchemeTransaction tx;
+ context.SS->FillSeqNo(tx, seqNo);
+
+ auto notice = tx.MutableMoveIndex();
+ PathIdFromPathId(pathId, notice->MutablePathId());
+ notice->SetTableSchemaVersion(table->AlterVersion + 1);
+
+ auto remap = notice->MutableReMapIndex();
+
+ auto opId = OperationId;
+ while (true) {
+ opId.second += 1;
+ TTxState* txState = context.SS->TxInFlight.FindPtr(opId);
+ if (!txState) {
+ TStringStream msg;
+ msg << "txState for opId: " << opId
+ << " has not been found, cur opId: " << OperationId;
+ Y_FAIL("%s", msg.Str().data());
+ }
+ LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " Trying to find txState with TxMoveTableIndex type"
+ << " cur opId: " << opId
+ << ", type: " << (int)txState->TxType);
+
+ if (txState->TxType == TTxState::TxMoveTableIndex) {
+ TPath srcPath = TPath::Init(txState->SourcePathId, context.SS);
+ auto parent = srcPath.Parent();
+ LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " Checking pathId for "
+ << "opId: " << opId
+ << ", type: " << (int)txState->TxType
+ << ", parent pathId: " << pathId);
+ if (pathId == parent.Base()->PathId) {
+ PathIdFromPathId(txState->SourcePathId, remap->MutableSrcPathId());
+ PathIdFromPathId(txState->TargetPathId, remap->MutableDstPathId());
+ auto targetIndexName = context.SS->PathsById.at(txState->TargetPathId);
+
+ for (const auto& [_, childPathId] : path->GetChildren()) {
+ Y_VERIFY(context.SS->PathsById.contains(childPathId));
+ auto childPath = context.SS->PathsById.at(childPathId);
+
+ if (childPath->Name == targetIndexName->Name) {
+ PathIdFromPathId(childPathId, remap->MutableReplacedPathId());
+ remap->SetDstName(childPath->Name);
+ }
+ }
+ break;
+ }
+ }
+ }
+ Y_VERIFY(remap->HasSrcPathId());
+ Y_VERIFY(remap->HasDstPathId());
+ Y_VERIFY(remap->HasDstName());
+
+ Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&txBody);
+ }
+
+ Y_VERIFY(txState->Shards.size());
+ for (ui32 i = 0; i < txState->Shards.size(); ++i) {
+ auto idx = txState->Shards[i].Idx;
+ auto datashardId = context.SS->ShardInfos[idx].TabletID;
+
+ THolder<TEvDataShard::TEvProposeTransaction> event =
+ THolder(new TEvDataShard::TEvProposeTransaction(NKikimrTxDataShard::TX_KIND_SCHEME,
+ context.SS->TabletID(),
+ context.Ctx.SelfID,
+ ui64(OperationId.GetTxId()),
+ txBody,
+ context.SS->SelectProcessingPrarams(txState->TargetPathId)));
+
+ context.OnComplete.BindMsgToPipe(OperationId, datashardId, idx, event.Release());
+ }
+
+ txState->UpdateShardsInProgress(TTxState::ConfigureParts);
+ return false;
+ }
+};
+
+class TPropose: public TSubOperationState {
+private:
+ TOperationId OperationId;
+
+ TString DebugHint() const override {
+ return TStringBuilder()
+ << "TMoveIndex TPropose"
+ << ", operationId: " << OperationId;
+ }
+public:
+ TPropose(TOperationId id)
+ : OperationId(id)
+ {
+ IgnoreMessages(DebugHint(), {TEvHive::TEvCreateTabletReply::EventType, TEvDataShard::TEvProposeTransactionResult::EventType});
+ }
+
+ bool HandleReply(TEvDataShard::TEvSchemaChanged::TPtr& ev, TOperationContext& context) override {
+ TTabletId ssId = context.SS->SelfTabletId();
+ const auto& evRecord = ev->Get()->Record;
+
+ LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " HandleReply TEvSchemaChanged"
+ << " at tablet: " << ssId);
+ LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " HandleReply TEvSchemaChanged"
+ << " triggered early"
+ << ", message: " << evRecord.ShortDebugString());
+
+ NTableState::CollectSchemaChanged(OperationId, ev, context);
+ return false;
+ }
+
+ bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override {
+ TStepId step = TStepId(ev->Get()->StepId);
+ TTabletId ssId = context.SS->SelfTabletId();
+
+ LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " HandleReply TEvOperationPlan"
+ << ", step: " << step
+ << ", at schemeshard: " << ssId);
+
+ TTxState* txState = context.SS->FindTx(OperationId);
+ Y_VERIFY(txState);
+ Y_VERIFY(txState->TxType == TTxState::TxUpdateMainTableOnIndexMove);
+
+ TPath path = TPath::Init(txState->TargetPathId, context.SS);
+ Y_VERIFY(path.IsResolved());
+
+ NIceDb::TNiceDb db(context.GetDB());
+
+ txState->PlanStep = step;
+ context.SS->PersistTxPlanStep(db, OperationId, step);
+
+ context.SS->ChangeTxState(db, OperationId, TTxState::DeletePathBarrier);
+
+ return true;
+ }
+
+ bool ProgressState(TOperationContext& context) override {
+ TTabletId ssId = context.SS->SelfTabletId();
+
+ LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " ProgressState"
+ << ", at schemeshard: " << ssId);
+
+ TTxState* txState = context.SS->FindTx(OperationId);
+ Y_VERIFY(txState);
+ Y_VERIFY(txState->TxType == TTxState::TxUpdateMainTableOnIndexMove);
+ Y_VERIFY(txState->MinStep);
+
+ TSet<TTabletId> shardSet;
+ for (const auto& shard : txState->Shards) {
+ TShardIdx idx = shard.Idx;
+ TTabletId tablet = context.SS->ShardInfos.at(idx).TabletID;
+ shardSet.insert(tablet);
+ }
+
+ context.OnComplete.ProposeToCoordinator(OperationId, txState->TargetPathId, txState->MinStep, std::move(shardSet));
+ return false;
+ }
+};
+
+class TDeleteTableBarrier: public TSubOperationState {
+private:
+ TOperationId OperationId;
+
+ TString DebugHint() const override {
+ return TStringBuilder()
+ << "TMoveIndex TDeleteTableBarrier"
+ << " operationId: " << OperationId;
+ }
+
+public:
+ TDeleteTableBarrier(TOperationId id)
+ : OperationId(id)
+ {
+ IgnoreMessages(DebugHint(), {TEvHive::TEvCreateTabletReply::EventType, TEvDataShard::TEvProposeTransactionResult::EventType, TEvPrivate::TEvOperationPlan::EventType});
+ }
+
+ bool HandleReply(TEvDataShard::TEvSchemaChanged::TPtr& ev, TOperationContext& context) override {
+ TTabletId ssId = context.SS->SelfTabletId();
+
+ LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " HandleReply TEvDataShard::TEvSchemaChanged"
+ << ", save it"
+ << ", at schemeshard: " << ssId);
+
+ NTableState::CollectSchemaChanged(OperationId, ev, context);
+ return false;
+ }
+
+ bool HandleReply(TEvPrivate::TEvCompleteBarrier::TPtr& ev, TOperationContext& context) override {
+ TTabletId ssId = context.SS->SelfTabletId();
+
+ LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " HandleReply TEvPrivate:TEvCompleteBarrier"
+ << ", msg: " << ev->Get()->ToString()
+ << ", at tablet" << ssId);
+
+
+ TTxState* txState = context.SS->FindTx(OperationId);
+ Y_VERIFY(txState);
+
+
+ TPath path = TPath::Init(txState->TargetPathId, context.SS);
+ TTableInfo::TPtr table = context.SS->Tables.at(txState->TargetPathId);
+
+ Y_VERIFY(txState->PlanStep);
+
+ NIceDb::TNiceDb db(context.GetDB());
+ table->AlterVersion += 1;
+
+ context.SS->PersistTableAlterVersion(db, path->PathId, table);
+
+ context.OnComplete.PublishToSchemeBoard(OperationId, path->PathId);
+ context.SS->ChangeTxState(db, OperationId, TTxState::ProposedWaitParts);
+ return true;
+ }
+
+ bool ProgressState(TOperationContext& context) override {
+ TTabletId ssId = context.SS->SelfTabletId();
+ context.OnComplete.RouteByTabletsFromOperation(OperationId);
+
+ TTxState* txState = context.SS->FindTx(OperationId);
+ Y_VERIFY(txState);
+
+ LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " ProgressState"
+ << ", operation type: " << TTxState::TypeName(txState->TxType)
+ << ", at tablet" << ssId);
+
+ context.OnComplete.Barrier(OperationId, "RenamePathBarrier");
+ return false;
+ }
+};
+
+class TUpdateMainTableOnIndexMove: public TSubOperation {
+ const TOperationId OperationId;
+ const TTxTransaction Transaction;
+ TTxState::ETxState State = TTxState::Invalid;
+
+ TTxState::ETxState NextState() {
+ return TTxState::ConfigureParts;
+ }
+
+ TTxState::ETxState NextState(TTxState::ETxState state) {
+ switch(state) {
+ case TTxState::Waiting:
+ case TTxState::ConfigureParts:
+ return TTxState::Propose;
+ case TTxState::Propose:
+ return TTxState::DeletePathBarrier;
+ case TTxState::DeletePathBarrier:
+ return TTxState::ProposedWaitParts;
+
+ case TTxState::ProposedWaitParts:
+ return TTxState::Done;
+ default:
+ return TTxState::Invalid;
+ }
+ return TTxState::Invalid;
+ }
+
+ TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) {
+ switch(state) {
+ case TTxState::Waiting:
+ case TTxState::ConfigureParts:
+ return MakeHolder<TConfigureParts>(OperationId);
+ case TTxState::Propose:
+ return MakeHolder<TPropose>(OperationId);
+ case TTxState::DeletePathBarrier:
+ return MakeHolder<TDeleteTableBarrier>(OperationId);
+ case TTxState::ProposedWaitParts:
+ return MakeHolder<NTableState::TProposedWaitParts>(OperationId);
+ case TTxState::Done:
+ return MakeHolder<TDone>(OperationId);
+ default:
+ return nullptr;
+ }
+ }
+
+ void StateDone(TOperationContext& context) override {
+ State = NextState(State);
+
+ if (State != TTxState::Invalid) {
+ SetState(SelectStateFunc(State));
+ context.OnComplete.ActivateTx(OperationId);
+ }
+ }
+
+public:
+ TUpdateMainTableOnIndexMove(TOperationId id, const TTxTransaction& tx)
+ : OperationId(id)
+ , Transaction(tx)
+ {
+ }
+
+ TUpdateMainTableOnIndexMove(TOperationId id, TTxState::ETxState state)
+ : OperationId(id)
+ , State(state)
+ {
+ SetState(SelectStateFunc(state));
+ }
+
+ THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override {
+ const TTabletId ssId = context.SS->SelfTabletId();
+
+ auto opDescr = Transaction.GetAlterTable();
+
+ const TString workingDir = Transaction.GetWorkingDir();
+ const TString mainTableName = opDescr.GetName();
+
+ LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "TUpdateMainTableOnIndexMove Propose"
+ << ", path: " << workingDir << "/" << mainTableName
+ << ", opId: " << OperationId
+ << ", at schemeshard: " << ssId);
+ LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "TUpdateMainTableOnIndexMove Propose"
+ << ", message: " << Transaction.ShortDebugString()
+ << ", at schemeshard: " << ssId);
+
+ auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), ui64(ssId));
+
+ TPath tablePath = TPath::Resolve(workingDir, context.SS).Dive(mainTableName);
+ {
+ TPath::TChecker checks = tablePath.Check();
+ checks
+ .NotEmpty()
+ .NotUnderDomainUpgrade()
+ .IsAtLocalSchemeShard()
+ .IsResolved()
+ .NotDeleted()
+ .NotUnderDeleting()
+ .IsTable()
+ .NotUnderOperation()
+ .IsCommonSensePath();
+
+ if (!checks) {
+ TString explain = TStringBuilder() << "path fail checks"
+ << ", path: " << tablePath.PathString();
+ auto status = checks.GetStatus(&explain);
+ result->SetError(status, explain);
+ return result;
+ }
+ }
+
+ TString errStr;
+
+ if (!context.SS->CheckApplyIf(Transaction, errStr)) {
+ result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
+ return result;
+ }
+
+ if (!context.SS->CheckLocks(tablePath.Base()->PathId, Transaction, errStr)) {
+ result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
+ return result;
+ }
+
+ Y_VERIFY(context.SS->Tables.contains(tablePath.Base()->PathId));
+ TTableInfo::TPtr table = context.SS->Tables.at(tablePath.Base()->PathId);
+
+ Y_VERIFY(table->AlterVersion != 0);
+ Y_VERIFY(!table->AlterData);
+
+ Y_VERIFY(!context.SS->FindTx(OperationId));
+
+ auto guard = context.DbGuard();
+ context.MemChanges.GrabPath(context.SS, tablePath.Base()->PathId);
+ context.MemChanges.GrabNewTxState(context.SS, OperationId);
+
+ context.DbChanges.PersistPath(tablePath.Base()->PathId);
+ context.DbChanges.PersistTxState(OperationId);
+
+ TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxUpdateMainTableOnIndexMove, tablePath.Base()->PathId);
+ txState.State = TTxState::ConfigureParts;
+
+ tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAlter;
+ tablePath.Base()->LastTxId = OperationId.GetTxId();
+
+ for (auto splitTx: table->GetSplitOpsInFlight()) {
+ context.OnComplete.Dependence(splitTx.GetTxId(), OperationId.GetTxId());
+ }
+
+ context.OnComplete.ActivateTx(OperationId);
+
+ State = NextState();
+ SetState(SelectStateFunc(State));
+ return result;
+ }
+
+ void AbortPropose(TOperationContext& context) override {
+ LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "TUpdateMainTableOnIndexMove AbortPropose"
+ << ", opId: " << OperationId
+ << ", at schemeshard: " << context.SS->TabletID());
+ }
+
+ void AbortUnsafe(TTxId forceDropTxId, TOperationContext& context) override {
+ LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "TUpdateMainTableOnIndexMove AbortUnsafe"
+ << ", opId: " << OperationId
+ << ", forceDropId: " << forceDropTxId
+ << ", at schemeshard: " << context.SS->TabletID());
+
+ context.OnComplete.DoneOperation(OperationId);
+ }
+
+};
+
+}
+
+namespace NKikimr {
+namespace NSchemeShard {
+
+TVector<ISubOperationBase::TPtr> CreateConsistentMoveIndex(TOperationId nextId, const TTxTransaction& tx, TOperationContext& context) {
+ Y_VERIFY(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex);
+
+ TVector<ISubOperationBase::TPtr> result;
+
+ {
+ TString errStr;
+ if (!context.SS->CheckApplyIf(tx, errStr)) {
+ return {CreateReject(nextId, NKikimrScheme::EStatus::StatusPreconditionFailed, errStr)};
+ }
+ }
+
+ auto moving = tx.GetMoveIndex();
+
+ const auto& mainTable = moving.GetTablePath();
+ const auto& srcIndex = moving.GetSrcPath();
+ const auto& dstIndex = moving.GetDstPath();
+
+ TPath mainTablePath = TPath::Resolve(mainTable, context.SS);
+ {
+ TPath::TChecker checks = mainTablePath.Check();
+ checks
+ .NotEmpty()
+ .NotUnderDomainUpgrade()
+ .IsAtLocalSchemeShard()
+ .IsResolved()
+ .NotDeleted()
+ .IsTable()
+ .NotUnderDeleting()
+ .NotUnderOperation()
+ .IsCommonSensePath();
+
+ if (!checks) {
+ TString explain = TStringBuilder() << "path fail checks"
+ << ", path: " << mainTablePath.PathString();
+ auto status = checks.GetStatus(&explain);
+ return {CreateReject(nextId, status, explain)};
+ }
+ }
+
+ TPath workingDirPath = mainTablePath.Parent();
+
+ {
+ TStringBuilder explain = TStringBuilder() << "fail checks";
+
+ if (!context.SS->CheckLocks(mainTablePath.Base()->PathId, tx, explain)) {
+ return {CreateReject(nextId, NKikimrScheme::StatusMultipleModifications, explain)};
+ }
+ }
+
+ TPath srcIndexPath = mainTablePath.Child(srcIndex);
+ {
+ TPath::TChecker checks = srcIndexPath.Check();
+ checks
+ .NotEmpty()
+ .NotUnderDomainUpgrade()
+ .IsAtLocalSchemeShard()
+ .IsResolved()
+ .NotDeleted()
+ .IsTableIndex()
+ .NotUnderDeleting()
+ .NotUnderOperation();
+
+ if (!checks) {
+ TString explain = TStringBuilder() << "path fail checks"
+ << ", path: " << srcIndexPath.PathString();
+ auto status = checks.GetStatus(&explain);
+ return {CreateReject(nextId, status, explain)};
+ }
+ }
+
+ TString errStr;
+ if (!context.SS->CheckApplyIf(tx, errStr)) {
+ return {CreateReject(nextId, NKikimrScheme::StatusPreconditionFailed, errStr)};
+ }
+
+ if (!context.SS->CheckLocks(mainTablePath.Base()->PathId, tx, errStr)) {
+ return {CreateReject(nextId, NKikimrScheme::StatusMultipleModifications, errStr)};
+ }
+
+ TPath dstIndexPath = mainTablePath.Child(dstIndex);
+
+ {
+ auto mainTableAlter = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTable);
+ auto operation = mainTableAlter.MutableAlterTable();
+ operation->SetName(mainTablePath.LeafName());
+ result.push_back(new TUpdateMainTableOnIndexMove(NextPartId(nextId, result), mainTableAlter));
+ }
+
+ {
+ TPath::TChecker checks = dstIndexPath.Check();
+ checks
+ .NotEmpty()
+ .NotUnderDomainUpgrade()
+ .IsAtLocalSchemeShard()
+ .IsResolved()
+ .NotDeleted()
+ .NotUnderOperation()
+ .IsTableIndex();
+
+ if (checks) {
+ {
+ auto indexDropping = TransactionTemplate(mainTablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTableIndex);
+ auto operation = indexDropping.MutableDrop();
+ operation->SetName(dstIndex);
+
+ result.push_back(CreateDropTableIndex(NextPartId(nextId, result), indexDropping));
+ }
+
+ for (const auto& items: dstIndexPath.Base()->GetChildren()) {
+ Y_VERIFY(context.SS->PathsById.contains(items.second));
+ auto implPath = context.SS->PathsById.at(items.second);
+ if (implPath->Dropped()) {
+ continue;
+ }
+
+ auto implTable = context.SS->PathsById.at(items.second);
+ Y_VERIFY(implTable->IsTable());
+
+ auto implTableDropping = TransactionTemplate(dstIndexPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTable);
+ auto operation = implTableDropping.MutableDrop();
+ operation->SetName(items.first);
+
+ result.push_back(CreateDropTable(NextPartId(nextId, result), implTableDropping));
+ }
+ }
+ }
+
+ result.push_back(CreateMoveTableIndex(TOperationId(nextId.GetTxId(),
+ nextId.GetSubTxId() + result.size()),
+ MoveTableIndexTask(srcIndexPath, dstIndexPath)));
+
+ TString srcImplTableName = srcIndexPath.Base()->GetChildren().begin()->first;
+ TPath srcImplTable = srcIndexPath.Child(srcImplTableName);
+
+ Y_VERIFY(srcImplTable.Base()->PathId == srcIndexPath.Base()->GetChildren().begin()->second);
+
+ TPath dstImplTable = dstIndexPath.Child(srcImplTableName);
+
+ result.push_back(CreateMoveTable(TOperationId(nextId.GetTxId(),
+ nextId.GetSubTxId() + result.size()),
+ MoveTableTask(srcImplTable, dstImplTable)));
+ return result;
+}
+
+ISubOperationBase::TPtr CreateUpdateMainTableOnIndexMove(TOperationId id, const TTxTransaction& tx) {
+ return new TUpdateMainTableOnIndexMove(id, tx);
+}
+
+ISubOperationBase::TPtr CreateUpdateMainTableOnIndexMove(TOperationId id, TTxState::ETxState state) {
+ return new TUpdateMainTableOnIndexMove(id, state);
+}
+
+}
+}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp
index 3e9b02e0ad..dc46563bab 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp
@@ -109,7 +109,7 @@ public:
Y_VERIFY(dstIndexPath.IsResolved());
auto remap = move->AddReMapIndexes();
- PathIdFromPathId(srcIndexPath->PathId, remap->MutablePathId());
+ PathIdFromPathId(srcIndexPath->PathId, remap->MutableSrcPathId());
PathIdFromPathId(dstIndexPath->PathId, remap->MutableDstPathId());
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp
index 6ae858ea09..360126a2b5 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp
@@ -438,23 +438,13 @@ public:
.IsAtLocalSchemeShard()
.IsResolved();
- if (dstParentPath.IsUnderDeleting()) {
- checks
- .IsUnderDeleting()
- .IsUnderTheSameOperation(OperationId.GetTxId());
- } else if (dstParentPath.IsUnderMoving()) {
- // it means that dstPath is free enough to be the move destination
- checks
- .IsUnderMoving()
- .IsUnderTheSameOperation(OperationId.GetTxId());
- } else if (dstParentPath.IsUnderCreating()) {
- checks
- .IsUnderCreating()
- .IsUnderTheSameOperation(OperationId.GetTxId());
- } else {
- checks
- .NotUnderOperation();
- }
+ if (dstParentPath.IsUnderOperation()) {
+ checks
+ .IsUnderTheSameOperation(OperationId.GetTxId());
+ } else {
+ checks
+ .NotUnderOperation();
+ }
if (!checks) {
TString explain = TStringBuilder() << "parent dst path fail checks"
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp
index bcca666ffd..57b66f0ca8 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp
@@ -8,37 +8,6 @@
#include <ydb/core/protos/flat_tx_scheme.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
-namespace {
-
-
-NKikimrSchemeOp::TModifyScheme MoveTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst) {
- NKikimrSchemeOp::TModifyScheme scheme;
-
- scheme.SetWorkingDir(dst.Parent().PathString());
- scheme.SetFailOnExist(true);
- scheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveTable);
- auto operation = scheme.MutableMoveTable();
- operation->SetSrcPath(src.PathString());
- operation->SetDstPath(dst.PathString());
-
- return scheme;
-}
-
-NKikimrSchemeOp::TModifyScheme MoveTableIndexTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst) {
- NKikimrSchemeOp::TModifyScheme scheme;
-
- scheme.SetWorkingDir(dst.Parent().PathString());
- scheme.SetFailOnExist(true);
- scheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveTableIndex);
- auto operation = scheme.MutableMoveTableIndex();
- operation->SetSrcPath(src.PathString());
- operation->SetDstPath(dst.PathString());
-
- return scheme;
-}
-
-}
-
namespace NKikimr {
namespace NSchemeShard {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h
index 6047d1d537..37f49d691c 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h
@@ -285,6 +285,9 @@ TVector<ISubOperationBase::TPtr> CreateDropIndex(TOperationId id, const TTxTrans
ISubOperationBase::TPtr CreateDropTableIndexAtMainTable(TOperationId id, const TTxTransaction& tx);
ISubOperationBase::TPtr CreateDropTableIndexAtMainTable(TOperationId id, TTxState::ETxState state);
+ISubOperationBase::TPtr CreateUpdateMainTableOnIndexMove(TOperationId id, const TTxTransaction& tx);
+ISubOperationBase::TPtr CreateUpdateMainTableOnIndexMove(TOperationId id, TTxState::ETxState state);
+
/// CDC
// Create
TVector<ISubOperationBase::TPtr> CreateNewCdcStream(TOperationId id, const TTxTransaction& tx, TOperationContext& context);
@@ -441,10 +444,14 @@ ISubOperationBase::TPtr CreateAlterLogin(TOperationId id, const TTxTransaction&
ISubOperationBase::TPtr CreateAlterLogin(TOperationId id, TTxState::ETxState state);
TVector<ISubOperationBase::TPtr> CreateConsistentMoveTable(TOperationId id, const TTxTransaction& tx, TOperationContext& context);
+TVector<ISubOperationBase::TPtr> CreateConsistentMoveIndex(TOperationId id, const TTxTransaction& tx, TOperationContext& context);
ISubOperationBase::TPtr CreateMoveTable(TOperationId id, const TTxTransaction& tx);
ISubOperationBase::TPtr CreateMoveTable(TOperationId id, TTxState::ETxState state);
+ISubOperationBase::TPtr CreateMoveIndex(TOperationId id, const TTxTransaction& tx);
+ISubOperationBase::TPtr CreateMoveIndex(TOperationId id, TTxState::ETxState state);
+
ISubOperationBase::TPtr CreateMoveTableIndex(TOperationId id, const TTxTransaction& tx);
ISubOperationBase::TPtr CreateMoveTableIndex(TOperationId id, TTxState::ETxState state);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp
index 3b59bcfad0..9979742fc6 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp
@@ -848,6 +848,14 @@ void TSideEffects::DoDoneTransactions(TSchemeShard *ss, NTabletFlatExecutor::TTr
<< ", publications: " << operation->Publications.size()
<< ", subscribers: " << operation->Subscribers.size());
+ for (const auto& pub : operation->Publications) {
+ LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Publication details: "
+ << " tx: " << txId
+ << ", " << pub.first
+ << ", " << pub.second);
+ }
+
ss->Publications[txId] = {
std::move(operation->Publications),
std::move(operation->Subscribers)
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 7d7efa62ef..15bb6bc42e 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -1279,6 +1279,7 @@ TPathElement::EPathState TSchemeShard::CalcPathState(TTxState::ETxType txType, T
case TTxState::TxAlterSequence:
case TTxState::TxAlterReplication:
case TTxState::TxAlterBlobDepot:
+ case TTxState::TxUpdateMainTableOnIndexMove:
return TPathElement::EPathState::EPathStateAlter;
case TTxState::TxDropTable:
case TTxState::TxDropPQGroup:
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
index 6301cda130..0bc90d41c7 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
@@ -327,6 +327,10 @@ void TPathDescriber::DescribeTable(const TActorContext& ctx, TPathId pathId, TPa
continue;
}
+ if (!childPath->IsCreateFinished()) {
+ continue;
+ }
+
switch (childPath->PathType) {
case NKikimrSchemeOp::EPathTypeTableIndex:
Self->DescribeTableIndex(childPathId, childName, *entry->AddTableIndexes());
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_element.h b/ydb/core/tx/schemeshard/schemeshard_path_element.h
index df44d08ec9..e3870d3530 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_element.h
+++ b/ydb/core/tx/schemeshard/schemeshard_path_element.h
@@ -515,6 +515,14 @@ public:
return PathState == EPathState::EPathStateMigrated;
}
+ bool IsUnderMoving() const {
+ return PathState == EPathState::EPathStateMoving;
+ }
+
+ bool IsUnderCreating() const {
+ return PathState == EPathState::EPathStateCreate;
+ }
+
bool PlannedToCreate() const {
return PathState == EPathState::EPathStateCreate;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h
index 27773215ac..e6a6773858 100644
--- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h
+++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h
@@ -112,7 +112,8 @@ struct TTxState {
item(TxDropReplication, 67) \
item(TxCreateBlobDepot, 68) \
item(TxAlterBlobDepot, 69) \
- item(TxDropBlobDepot, 70)
+ item(TxDropBlobDepot, 70) \
+ item(TxUpdateMainTableOnIndexMove, 71)
// TX_STATE_TYPE_ENUM
@@ -340,6 +341,7 @@ struct TTxState {
case TxDropSequence:
case TxDropReplication:
case TxDropBlobDepot:
+ case TxUpdateMainTableOnIndexMove:
return false;
case TxAlterPQGroup:
case TxAlterTable:
@@ -422,6 +424,7 @@ struct TTxState {
case TxFinalizeBuildIndex:
case TxDropTableIndexAtMainTable: // just increments schemaversion at main table
case TxDropCdcStreamAtTable:
+ case TxUpdateMainTableOnIndexMove:
return false;
case TxAlterPQGroup:
case TxAlterTable:
@@ -506,6 +509,7 @@ struct TTxState {
case TxDropLock:
case TxDropTableIndexAtMainTable:
case TxDropCdcStreamAtTable:
+ case TxUpdateMainTableOnIndexMove:
return false;
case TxAlterPQGroup:
case TxAlterTable:
diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
index 9ed954553c..c94d2d4232 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
@@ -391,6 +391,33 @@ namespace NSchemeShardUT_Private {
TestModificationResults(runtime, txId, expectedResults);
}
+ TEvSchemeShard::TEvModifySchemeTransaction* MoveIndexRequest(ui64 txId, const TString& tablePath, const TString& srcPath, const TString& dstPath, ui64 schemeShard, const TApplyIf& applyIf) {
+ THolder<TEvSchemeShard::TEvModifySchemeTransaction> evTx = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(txId, schemeShard);
+ auto transaction = evTx->Record.AddTransaction();
+ transaction->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex);
+ SetApplyIf(*transaction, applyIf);
+
+ auto descr = transaction->MutableMoveIndex();
+ descr->SetTablePath(tablePath);
+ descr->SetSrcPath(srcPath);
+ descr->SetDstPath(dstPath);
+
+ return evTx.Release();
+ }
+
+ void AsyncMoveIndex(TTestActorRuntime& runtime, ui64 txId, const TString& tablePath, const TString& srcPath, const TString& dstPath, ui64 schemeShard) {
+ TActorId sender = runtime.AllocateEdgeActor();
+ ForwardToTablet(runtime, schemeShard, sender, MoveIndexRequest(txId, tablePath, srcPath, dstPath, schemeShard));
+ }
+
+ void TestMoveIndex(TTestActorRuntime& runtime, ui64 txId, const TString& tablePath, const TString& src, const TString& dst, const TVector<TEvSchemeShard::EStatus>& expectedResults) {
+ TestMoveIndex(runtime, TTestTxConfig::SchemeShard, txId, tablePath, src, dst, expectedResults);
+ }
+
+ void TestMoveIndex(TTestActorRuntime& runtime, ui64 schemeShard, ui64 txId, const TString& tablePath, const TString& src, const TString& dst, const TVector<TEvSchemeShard::EStatus>& expectedResults) {
+ AsyncMoveIndex(runtime, txId, tablePath, src, dst, schemeShard);
+ TestModificationResults(runtime, txId, expectedResults);
+ }
TEvSchemeShard::TEvModifySchemeTransaction* LockRequest(ui64 txId, const TString &parentPath, const TString& name) {
THolder<TEvSchemeShard::TEvModifySchemeTransaction> evTx = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(txId, TTestTxConfig::SchemeShard);
diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h
index 24d6ddd6d7..bda05c59c4 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h
@@ -248,6 +248,12 @@ namespace NSchemeShardUT_Private {
void TestMoveTable(TTestActorRuntime& runtime, ui64 txId, const TString& srcMove, const TString& dstMove, const TVector<TEvSchemeShard::EStatus>& expectedResults = {NKikimrScheme::StatusAccepted});
void TestMoveTable(TTestActorRuntime& runtime, ui64 schemeShard, ui64 txId, const TString& srcMove, const TString& dstMove, const TVector<TEvSchemeShard::EStatus>& expectedResults = {NKikimrScheme::StatusAccepted});
+ // move index
+ TEvTx* MoveIndexRequest(ui64 txId, const TString& tablePath, const TString& srcPath, const TString& dstPath, ui64 schemeShard = TTestTxConfig::SchemeShard, const TApplyIf& applyIf = {});
+ void AsyncMoveIndex(TTestActorRuntime& runtime, ui64 txId, const TString& tablePath, const TString& srcPath, const TString& dstPath, ui64 schemeShard = TTestTxConfig::SchemeShard);
+ void TestMoveIndex(TTestActorRuntime& runtime, ui64 txId, const TString& tablePath, const TString& srcMove, const TString& dstMove, const TVector<TEvSchemeShard::EStatus>& expectedResults = {NKikimrScheme::StatusAccepted});
+ void TestMoveIndex(TTestActorRuntime& runtime, ui64 schemeShard, ui64 txId, const TString& tablePath, const TString& srcMove, const TString& dstMove, const TVector<TEvSchemeShard::EStatus>& expectedResults = {NKikimrScheme::StatusAccepted});
+
// locks
TEvTx* LockRequest(ui64 txId, const TString &parentPath, const TString& name);
void AsyncLock(TTestActorRuntime& runtime, ui64 schemeShard, ui64 txId, const TString& parentPath, const TString& name);
diff --git a/ydb/core/tx/schemeshard/ut_move.cpp b/ydb/core/tx/schemeshard/ut_move.cpp
index d021c6dd72..225dae7ada 100644
--- a/ydb/core/tx/schemeshard/ut_move.cpp
+++ b/ydb/core/tx/schemeshard/ut_move.cpp
@@ -728,6 +728,263 @@ Y_UNIT_TEST_SUITE(TSchemeShardMoveTest) {
NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)});
}
+ Y_UNIT_TEST(MoveIndex) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime,
+ TTestEnvOptions()
+ .EnableAsyncIndexes(true));
+ ui64 txId = 100;
+
+ TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
+ TableDescription {
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value0" Type: "Utf8" }
+ Columns { Name: "value1" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ }
+ IndexDescription {
+ Name: "Sync"
+ KeyColumnNames: ["value0"]
+ }
+ IndexDescription {
+ Name: "Async"
+ KeyColumnNames: ["value1"]
+ Type: EIndexTypeGlobalAsync
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot"),
+ {NLs::ChildrenCount(1),
+ NLs::PathsInsideDomain(5),
+ NLs::ShardsInsideDomain(3)});
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"),
+ {NLs::IsTable,
+ NLs::PathVersionEqual(3),
+ NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"}),
+ NLs::IndexesCount(2)});
+
+ TestMoveIndex(runtime, ++txId, "/MyRoot/Table", "Sync", "MovedSync");
+ env.TestWaitNotification(runtime, txId);
+
+ TestMoveIndex(runtime, ++txId, "/MyRoot/Table", "Async", "MovedAsync");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"),
+ {NLs::IsTable,
+ NLs::PathVersionEqual(5),
+ NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"})});
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot"),
+ {NLs::ChildrenCount(1),
+ NLs::PathsInsideDomain(5),
+ NLs::ShardsInsideDomain(3)});
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/MovedSync", true, true, true),
+ {NLs::PathExist,
+ NLs::IndexType(NKikimrSchemeOp::EIndexTypeGlobal),
+ NLs::IndexKeys({"value0"}),
+ NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)});
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/MovedAsync", true, true, true),
+ {NLs::PathExist,
+ NLs::IndexType(NKikimrSchemeOp::EIndexTypeGlobalAsync),
+ NLs::IndexKeys({"value1"}),
+ NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)});
+ }
+
+ Y_UNIT_TEST(MoveIndexSameDst) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime,
+ TTestEnvOptions()
+ .EnableAsyncIndexes(true));
+ ui64 txId = 100;
+
+ TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
+ TableDescription {
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value0" Type: "Utf8" }
+ Columns { Name: "value1" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ }
+ IndexDescription {
+ Name: "Sync"
+ KeyColumnNames: ["value0"]
+ }
+ IndexDescription {
+ Name: "Async"
+ KeyColumnNames: ["value1"]
+ Type: EIndexTypeGlobalAsync
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot"),
+ {NLs::ChildrenCount(1),
+ NLs::PathsInsideDomain(5),
+ NLs::ShardsInsideDomain(3)});
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"),
+ {NLs::IsTable,
+ NLs::PathVersionEqual(3),
+ NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"}),
+ NLs::IndexesCount(2)});
+
+ TestMoveIndex(runtime, ++txId, "/MyRoot/Table", "Sync", "Sync", {NKikimrScheme::StatusInvalidParameter});
+ env.TestWaitNotification(runtime, txId);
+
+ TestMoveIndex(runtime, ++txId, "/MyRoot/Table", "Async", "Async", {NKikimrScheme::StatusInvalidParameter});
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"),
+ {NLs::IsTable,
+ NLs::PathVersionEqual(3),
+ NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"})});
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot"),
+ {NLs::ChildrenCount(1),
+ NLs::PathsInsideDomain(5),
+ NLs::ShardsInsideDomain(3)});
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/Sync", true, true, true),
+ {NLs::PathExist,
+ NLs::IndexType(NKikimrSchemeOp::EIndexTypeGlobal),
+ NLs::IndexKeys({"value0"}),
+ NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)});
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/Async", true, true, true),
+ {NLs::PathExist,
+ NLs::IndexType(NKikimrSchemeOp::EIndexTypeGlobalAsync),
+ NLs::IndexKeys({"value1"}),
+ NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)});
+ }
+
+ Y_UNIT_TEST(MoveIndexDoesNonExisted) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime,
+ TTestEnvOptions()
+ .EnableAsyncIndexes(true));
+ ui64 txId = 100;
+
+ TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
+ TableDescription {
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value0" Type: "Utf8" }
+ Columns { Name: "value1" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ }
+ IndexDescription {
+ Name: "Sync"
+ KeyColumnNames: ["value0"]
+ }
+ IndexDescription {
+ Name: "Async"
+ KeyColumnNames: ["value1"]
+ Type: EIndexTypeGlobalAsync
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot"),
+ {NLs::ChildrenCount(1),
+ NLs::PathsInsideDomain(5),
+ NLs::ShardsInsideDomain(3)});
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"),
+ {NLs::IsTable,
+ NLs::PathVersionEqual(3),
+ NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"}),
+ NLs::IndexesCount(2)});
+
+ TestMoveIndex(runtime, ++txId, "/MyRoot/Table", "BlaBla", "Sync", {NKikimrScheme::StatusPathDoesNotExist});
+ env.TestWaitNotification(runtime, txId);
+
+ TestMoveIndex(runtime, ++txId, "/MyRoot/TableBlaBla", "Async", "Async", {NKikimrScheme::StatusPathDoesNotExist});
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"),
+ {NLs::IsTable,
+ NLs::PathVersionEqual(3),
+ NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"})});
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot"),
+ {NLs::ChildrenCount(1),
+ NLs::PathsInsideDomain(5),
+ NLs::ShardsInsideDomain(3)});
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/Sync", true, true, true),
+ {NLs::PathExist,
+ NLs::IndexType(NKikimrSchemeOp::EIndexTypeGlobal),
+ NLs::IndexKeys({"value0"}),
+ NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)});
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/Async", true, true, true),
+ {NLs::PathExist,
+ NLs::IndexType(NKikimrSchemeOp::EIndexTypeGlobalAsync),
+ NLs::IndexKeys({"value1"}),
+ NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)});
+ }
+
+ Y_UNIT_TEST(MoveIntoBuildingIndex) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime,
+ TTestEnvOptions()
+ .EnableAsyncIndexes(true));
+ ui64 txId = 100;
+
+ TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
+ TableDescription {
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value0" Type: "Utf8" }
+ Columns { Name: "value1" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ }
+ IndexDescription {
+ Name: "SomeIndex"
+ KeyColumnNames: ["value1"]
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ AsyncBuilIndex(runtime, ++txId, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/Table", "Sync", {"value0"});
+
+ TVector<THolder<IEventHandle>> suppressed;
+ auto id = txId;
+
+ auto observer = SetSuppressObserver(runtime, suppressed, TEvDataShard::TEvBuildIndexCreateRequest::EventType);
+
+ WaitForSuppressed(runtime, suppressed, 1, observer);
+
+ {
+ TestMoveIndex(runtime, ++txId, "/MyRoot/Table", "Sync", "MovedSync", {NKikimrScheme::StatusMultipleModifications});
+ env.TestWaitNotification(runtime, txId);
+ }
+
+ {
+ TestMoveIndex(runtime, ++txId, "/MyRoot/Table", "SomeIndex", "Sync", {NKikimrScheme::StatusMultipleModifications});
+ env.TestWaitNotification(runtime, txId);
+ }
+
+ for (auto &msg : suppressed) {
+ runtime.Send(msg.Release());
+ }
+
+ suppressed.clear();
+
+ env.TestWaitNotification(runtime, id);
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"),
+ {NLs::IsTable,
+ NLs::PathVersionEqual(6),
+ NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"}),
+ NLs::IndexesCount(2)});
+ }
+
Y_UNIT_TEST(AsyncIndexWithSyncInFly) {
TTestBasicRuntime runtime;
TTestEnv env(runtime,
diff --git a/ydb/core/tx/schemeshard/ut_move_reboots.cpp b/ydb/core/tx/schemeshard/ut_move_reboots.cpp
index 7edaea585a..6a90c1bc4f 100644
--- a/ydb/core/tx/schemeshard/ut_move_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_move_reboots.cpp
@@ -208,6 +208,114 @@ Y_UNIT_TEST_SUITE(TSchemeShardMoveRebootsTest) {
});
}
+ Y_UNIT_TEST(MoveIndex) {
+ TTestWithReboots t;
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ TPathVersion pathVersion;
+ {
+ TInactiveZone inactive(activeZone);
+ TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", R"(
+ TableDescription {
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value0" Type: "Utf8" }
+ Columns { Name: "value1" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ }
+ IndexDescription {
+ Name: "Sync"
+ KeyColumnNames: ["value0"]
+ }
+ IndexDescription {
+ Name: "Async"
+ KeyColumnNames: ["value1"]
+ Type: EIndexTypeGlobalAsync
+ }
+ )");
+ t.TestEnv->TestWaitNotification(runtime, {t.TxId, t.TxId - 1});
+ pathVersion = TestDescribeResult(DescribePath(runtime, "/MyRoot"),
+ {NLs::PathExist});
+ }
+
+ TestMoveIndex(runtime, ++t.TxId, "/MyRoot/Table", "Sync", "MovedSync");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ TestMoveIndex(runtime, ++t.TxId, "/MyRoot/Table", "Async", "MovedAsync");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+ {
+ TInactiveZone inactive(activeZone);
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"),
+ {NLs::IsTable,
+ NLs::PathVersionEqual(5),
+ NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"})});
+
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/MovedSync", true, true, true),
+ {NLs::PathExist,
+ NLs::IndexType(NKikimrSchemeOp::EIndexTypeGlobal),
+ NLs::IndexKeys({"value0"}),
+ NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)});
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/MovedAsync", true, true, true),
+ {NLs::PathExist,
+ NLs::IndexType(NKikimrSchemeOp::EIndexTypeGlobalAsync),
+ NLs::IndexKeys({"value1"}),
+ NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)});
+ }
+ });
+ }
+
+ Y_UNIT_TEST(ReplaceIndex) {
+ TTestWithReboots t;
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ TPathVersion pathVersion;
+ {
+ TInactiveZone inactive(activeZone);
+ TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", R"(
+ TableDescription {
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value0" Type: "Utf8" }
+ Columns { Name: "value1" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ }
+ IndexDescription {
+ Name: "Sync"
+ KeyColumnNames: ["value0"]
+ }
+ IndexDescription {
+ Name: "Sync1"
+ KeyColumnNames: ["value1"]
+ }
+ )");
+ t.TestEnv->TestWaitNotification(runtime, {t.TxId, t.TxId - 1});
+ pathVersion = TestDescribeResult(DescribePath(runtime, "/MyRoot"),
+ {NLs::PathExist});
+ }
+
+ TestMoveIndex(runtime, ++t.TxId, "/MyRoot/Table", "Sync", "Sync1");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ {
+ TInactiveZone inactive(activeZone);
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"),
+ {NLs::IsTable,
+ NLs::PathVersionEqual(5),
+ NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"})});
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/Sync1", true, true, true),
+ {NLs::PathExist,
+ NLs::IndexKeys({"value0"}),
+ NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)});
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/Sync", true, true, true),
+ {NLs::PathNotExist});
+ }
+ });
+ }
+
Y_UNIT_TEST(Replace) {
TTestWithReboots t(true);
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
@@ -240,7 +348,6 @@ Y_UNIT_TEST_SUITE(TSchemeShardMoveRebootsTest) {
)");
t.TestEnv->TestWaitNotification(runtime, {t.TxId, t.TxId - 1});
-
pathVersion = TestDescribeResult(DescribePath(runtime, "/MyRoot"),
{NLs::PathExist});
}
diff --git a/ydb/core/tx/tx_proxy/schemereq.cpp b/ydb/core/tx/tx_proxy/schemereq.cpp
index 0fc9083f6e..ff29572d94 100644
--- a/ydb/core/tx/tx_proxy/schemereq.cpp
+++ b/ydb/core/tx/tx_proxy/schemereq.cpp
@@ -301,6 +301,9 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> {
case NKikimrSchemeOp::ESchemeOpMoveTableIndex:
Y_FAIL("no implementation for ESchemeOpMoveTableIndex");
+ case NKikimrSchemeOp::ESchemeOpMoveIndex:
+ Y_FAIL("no implementation for ESchemeOpMoveIndex");
+
case NKikimrSchemeOp::ESchemeOpCreateSequence:
case NKikimrSchemeOp::ESchemeOpAlterSequence:
return *modifyScheme.MutableSequence()->MutableName();
@@ -670,6 +673,16 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> {
}
break;
}
+ case NKikimrSchemeOp::ESchemeOpMoveIndex: {
+ auto& descr = pbModifyScheme.GetMoveIndex();
+ {
+ auto toResolve = TPathToResolve(pbModifyScheme.GetOperationType());
+ toResolve.Path = SplitPath(descr.GetTablePath());
+ toResolve.RequiredAccess = NACLib::EAccessRights::AlterSchema;
+ ResolveForACL.push_back(toResolve);
+ }
+ break;
+ }
case NKikimrSchemeOp::ESchemeOpMkDir:
{
auto toResolve = TPathToResolve(pbModifyScheme.GetOperationType());