diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2022-06-20 21:34:13 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2022-06-20 21:34:13 +0300 |
commit | 1452b5fc9c83a08cc4a35d674049ef82d9b6dc1a (patch) | |
tree | d77a6ecfca7b63304478d1aac2d926b218adb2a0 | |
parent | 33c65a59cda3a972e9d670f1bc81b864fc263acf (diff) | |
download | ydb-1452b5fc9c83a08cc4a35d674049ef82d9b6dc1a.tar.gz |
Index move support for SchemeShard and DataShard. KIKIMR-13799
ref:b58e9b8ef40fee5c36b03a0540f95333a60d25cc
37 files changed, 1561 insertions, 60 deletions
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 4840e8e264..8b1d4426d3 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -380,7 +380,7 @@ void TKikimrRunner::Initialize(const TKikimrSettings& settings) { // Server->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG); // Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_YQL, NActors::NLog::PRI_DEBUG); // Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_YQL, NActors::NLog::PRI_INFO); - // Server->GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_DEBUG); + // Server->GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_TRACE); // Server->GetRuntime()->SetLogPriority(NKikimrServices::TX_COORDINATOR, NActors::NLog::PRI_DEBUG); // Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG); // Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_TASKS_RUNNER, NActors::NLog::PRI_DEBUG); diff --git a/ydb/core/kqp/ut/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/kqp_scheme_ut.cpp index 875aaa08ba..a610e73539 100644 --- a/ydb/core/kqp/ut/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/kqp_scheme_ut.cpp @@ -411,6 +411,184 @@ Y_UNIT_TEST_SUITE(KqpScheme) { SchemaVersionMissmatchWithIndexTest<UseNewEngine>(true); } + template <bool UseNewEngine> + void TouchIndexAfterMoveIndex(bool write, bool replace) { + TKikimrRunner kikimr; + + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + TString query1; + TString query2; + if (write) { + query1 = Q_(R"( + UPSERT INTO [/Root/KeyValue] (Key, Value) VALUES (10u, "New"); + )"); + query2 = query1; + } else { + query1 = Q1_(R"( + SELECT * FROM `/Root/KeyValue` VIEW `value_index` WHERE Value = "New"; + )"); + query2 = Q1_(R"( + SELECT * FROM `/Root/KeyValue` VIEW `moved_value_index` WHERE Value = "New"; + )"); + + } + + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.KeepInQueryCache(true); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + { + TString create_index_query = Q1_(R"( + ALTER TABLE `/Root/KeyValue` ADD INDEX value_index GLOBAL SYNC ON (`Value`); + )"); + auto result = session.ExecuteSchemeQuery(create_index_query).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + } + + if (replace) { + TString create_index_query = Q1_(R"( + ALTER TABLE `/Root/KeyValue` ADD INDEX moved_value_index GLOBAL SYNC ON (`Value`); + )"); + auto result = session.ExecuteSchemeQuery(create_index_query).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = session.ExecuteDataQuery(query1, TTxControl::BeginTx(), + execSettings).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + kikimr.GetTestServer().GetRuntime()->GetAppData().AdministrationAllowedSIDs.push_back("root@builtin"); + auto reply = kikimr.GetTestClient().MoveIndex("/Root/KeyValue", "value_index", "moved_value_index", "root@builtin"); + const NKikimrClient::TResponse &response = reply->Record; + UNIT_ASSERT_VALUES_EQUAL((NMsgBusProxy::EResponseStatus)response.GetStatus(), NMsgBusProxy::MSTATUS_OK); + } + + { + auto result = session.ExecuteDataQuery(query2, + TTxControl::BeginTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + if (write) { + auto commit = result.GetTransaction()->Commit().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(commit.GetStatus(), EStatus::SUCCESS, commit.GetIssues().ToString()); + } + } + + { + auto result = session.ExecuteDataQuery(query2, + TTxControl::BeginTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto commit = result.GetTransaction()->Commit().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(commit.GetStatus(), EStatus::SUCCESS, commit.GetIssues().ToString()); + } + } + + Y_UNIT_TEST_NEW_ENGINE(TouchIndexAfterMoveIndexRead) { + TouchIndexAfterMoveIndex<UseNewEngine>(false, false); + } + + Y_UNIT_TEST_NEW_ENGINE(TouchIndexAfterMoveIndexWrite) { + TouchIndexAfterMoveIndex<UseNewEngine>(true, false); + } + + Y_UNIT_TEST_NEW_ENGINE(TouchIndexAfterMoveIndexReadReplace) { + TouchIndexAfterMoveIndex<UseNewEngine>(false, true); + } + + Y_UNIT_TEST_NEW_ENGINE(TouchIndexAfterMoveIndexWriteReplace) { + TouchIndexAfterMoveIndex<UseNewEngine>(true, true); + } + + template <bool UseNewEngine> + void TouchIndexAfterMoveTable(bool write) { + TKikimrRunner kikimr; + + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + TString query1; + TString query2; + if (write) { + query1 = Q_(R"( + UPSERT INTO [/Root/KeyValue] (Key, Value) VALUES (10u, "New"); + )"); + query2 = Q_(R"( + UPSERT INTO [/Root/KeyValueMoved] (Key, Value) VALUES (10u, "New"); + )"); + } else { + query1 = Q1_(R"( + SELECT * FROM `/Root/KeyValue` VIEW `value_index` WHERE Value = "New"; + )"); + query2 = Q1_(R"( + SELECT * FROM `/Root/KeyValueMoved` VIEW `value_index` WHERE Value = "New"; + )"); + + } + + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.KeepInQueryCache(true); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + { + TString create_index_query = Q1_(R"( + ALTER TABLE `/Root/KeyValue` ADD INDEX value_index GLOBAL SYNC ON (`Value`); + )"); + auto result = session.ExecuteSchemeQuery(create_index_query).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = session.ExecuteDataQuery(query1, TTxControl::BeginTx(), + execSettings).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + auto query = TStringBuilder() << R"( + --!syntax_v1 + ALTER TABLE `/Root/KeyValue` RENAME TO `/Root/KeyValueMoved`; + )"; + + const auto result = session.ExecuteSchemeQuery(query << ";").GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = session.ExecuteDataQuery(query2, + TTxControl::BeginTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto commit = result.GetTransaction()->Commit().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(commit.GetStatus(), EStatus::SUCCESS, commit.GetIssues().ToString()); + } + } + + Y_UNIT_TEST_NEW_ENGINE(TouchIndexAfterMoveTableRead) { + TouchIndexAfterMoveTable<UseNewEngine>(false); + } + + Y_UNIT_TEST_NEW_ENGINE(TouchIndexAfterMoveTableWrite) { + TouchIndexAfterMoveTable<UseNewEngine>(true); + } + void CheckInvalidationAfterDropCreateTable(bool withCompatSchema) { TKikimrRunner kikimr; diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index 9b0f887494..da67bd4a1c 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -166,6 +166,7 @@ enum ESimpleCounters { COUNTER_IN_FLIGHT_OPS_TxDropBlobDepot = 135 [(CounterOpts) = {Name: "InFlightOps/DropBlobDepot"}]; COUNTER_STATS_QUEUE_SIZE = 136 [(CounterOpts) = {Name: "StatsQueueSize"}]; + COUNTER_IN_FLIGHT_OPS_TxUpdateMainTableOnIndexMove = 137 [(CounterOpts) = {Name: "InFlightOps/UpdateMainTableOnIndexMove"}]; } enum ECumulativeCounters { @@ -266,6 +267,7 @@ enum ECumulativeCounters { COUNTER_FINISHED_OPS_TxDropBlobDepot = 80 [(CounterOpts) = {Name: "FinishedOps/DropBlobDepot"}]; COUNTER_STATS_WRITTEN = 81 [(CounterOpts) = {Name: "StatsWritten"}]; + COUNTER_FINISHED_OPS_TxUpdateMainTableOnIndexMove = 82 [(CounterOpts) = {Name: "FinishedOps/UpdateMainTableOnIndexMove"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index c4e595246c..8f6a0502c0 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1074,6 +1074,12 @@ message TMove { // private description of the operation optional string DstPath = 2; } +message TMoveIndex { + optional string TablePath = 1; + optional string SrcPath = 2; + optional string DstPath = 3; +} + message TSequenceDescription { optional string Name = 1; // mandatory optional NKikimrProto.TPathID PathId = 2; // sequence path id, assigned by schemeshard @@ -1221,6 +1227,9 @@ enum EOperationType { ESchemeOpCreateBlobDepot = 79; ESchemeOpAlterBlobDepot = 80; ESchemeOpDropBlobDepot = 81; + + // Move index + ESchemeOpMoveIndex = 82; } message TApplyIf { @@ -1324,6 +1333,7 @@ message TModifyScheme { optional TSequenceDescription Sequence = 51; optional TReplicationDescription Replication = 52; optional TBlobDepotDescription BlobDepot = 54; + optional TMoveIndex MoveIndex = 55; } // "Script", used by client to parse text files with multiple DDL commands diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index d2c342e82e..598d68a8fd 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -378,6 +378,13 @@ message TDropCdcStreamNotice { message TAsyncIndexInfo { } +message TRemapIndexPathId { + optional NKikimrProto.TPathID SrcPathId = 1; + optional NKikimrProto.TPathID DstPathId = 2; + optional NKikimrProto.TPathID ReplacedPathId = 3; + optional string DstName = 4; +} + message TMoveTable { optional NKikimrProto.TPathID PathId = 1; optional uint64 TableSchemaVersion = 2; @@ -385,14 +392,16 @@ message TMoveTable { optional NKikimrProto.TPathID DstPathId = 3; optional string DstPath = 4; - message TRemapIndexPathId { - optional NKikimrProto.TPathID PathId = 1; - optional NKikimrProto.TPathID DstPathId = 2; - } - repeated TRemapIndexPathId ReMapIndexes = 5; } +message TMoveIndex { + optional NKikimrProto.TPathID PathId = 1; + optional uint64 TableSchemaVersion = 2; + + optional TRemapIndexPathId ReMapIndex = 3; +} + message TFlatSchemeTransaction { optional NKikimrSchemeOp.TTableDescription CreateTable = 1; optional NKikimrSchemeOp.TTableDescription DropTable = 2; @@ -421,6 +430,7 @@ message TFlatSchemeTransaction { optional TCreateCdcStreamNotice CreateCdcStreamNotice = 18; optional TAlterCdcStreamNotice AlterCdcStreamNotice = 19; optional TDropCdcStreamNotice DropCdcStreamNotice = 20; + optional TMoveIndex MoveIndex = 21; } message TDistributedEraseTransaction { diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 309c4977e2..8164409a78 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -1526,6 +1526,23 @@ namespace Tests { return dynamic_cast<NMsgBusProxy::TBusResponse *>(reply.Release()); } + TAutoPtr<NMsgBusProxy::TBusResponse> TClient::MoveIndex(const TString& table, const TString& src, const TString& dst, const TString& userToken) { + TAutoPtr<NMsgBusProxy::TBusSchemeOperation> request(new NMsgBusProxy::TBusSchemeOperation()); + auto *op = request->Record.MutableTransaction()->MutableModifyScheme(); + op->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex); + auto descr = op->MutableMoveIndex(); + descr->SetTablePath(table); + descr->SetSrcPath(src); + descr->SetDstPath(dst); + TAutoPtr<NBus::TBusMessage> reply; + if (userToken) { + request->Record.SetSecurityToken(userToken); + } + NBus::EMessageStatus status = SendAndWaitCompletion(request.Release(), reply); + UNIT_ASSERT_VALUES_EQUAL(status, NBus::MESSAGE_OK); + return dynamic_cast<NMsgBusProxy::TBusResponse *>(reply.Release()); + } + TAutoPtr<NMsgBusProxy::TBusResponse> TClient::AlterTable(const TString& parent, const TString& alter, const TString& userToken) { NKikimrSchemeOp::TTableDescription table; bool parseOk = ::google::protobuf::TextFormat::ParseFromString(alter, &table); diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index 0ecd068a23..e2143be72a 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -390,6 +390,8 @@ namespace Tests { TAutoPtr<NMsgBusProxy::TBusResponse> AlterTable(const TString& parent, const NKikimrSchemeOp::TTableDescription& update, const TString& userToken); TAutoPtr<NMsgBusProxy::TBusResponse> AlterTable(const TString& parent, const TString& alter, const TString& userToken); + TAutoPtr<NMsgBusProxy::TBusResponse> MoveIndex(const TString& table, const TString& src, const TString& dst, const TString& userToken); + NMsgBusProxy::EResponseStatus CreateOlapStore(const TString& parent, const TString& scheme); NMsgBusProxy::EResponseStatus CreateOlapStore(const TString& parent, const NKikimrSchemeOp::TColumnStoreDescription& store); NMsgBusProxy::EResponseStatus CreateOlapTable(const TString& parent, const TString& scheme); diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt index cb451d7fed..8d3ff703fb 100644 --- a/ydb/core/tx/datashard/CMakeLists.txt +++ b/ydb/core/tx/datashard/CMakeLists.txt @@ -175,6 +175,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/load_tx_details_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/make_scan_snapshot_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/make_snapshot_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/move_index_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/move_table_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/operation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/plan_queue_unit.cpp diff --git a/ydb/core/tx/datashard/check_scheme_tx_unit.cpp b/ydb/core/tx/datashard/check_scheme_tx_unit.cpp index efcd47553d..c42c0f5c9b 100644 --- a/ydb/core/tx/datashard/check_scheme_tx_unit.cpp +++ b/ydb/core/tx/datashard/check_scheme_tx_unit.cpp @@ -34,6 +34,7 @@ private: bool CheckFinalizeBuildIndex(TActiveTransaction *activeTx); bool CheckDropIndexNotice(TActiveTransaction *activeTx); bool CheckMoveTable(TActiveTransaction *activeTx); + bool CheckMoveIndex(TActiveTransaction *activeTx); bool CheckCreateCdcStream(TActiveTransaction *activeTx); bool CheckAlterCdcStream(TActiveTransaction *activeTx); bool CheckDropCdcStream(TActiveTransaction *activeTx); @@ -354,6 +355,9 @@ bool TCheckSchemeTxUnit::CheckSchemeTx(TActiveTransaction *activeTx) case TSchemaOperation::ETypeMoveTable: res = CheckMoveTable(activeTx); break; + case TSchemaOperation::ETypeMoveIndex: + res = CheckMoveIndex(activeTx); + break; case TSchemaOperation::ETypeCreateCdcStream: res = CheckCreateCdcStream(activeTx); break; @@ -672,6 +676,20 @@ bool TCheckSchemeTxUnit::CheckMoveTable(TActiveTransaction *activeTx) { return CheckSchemaVersion(activeTx, mv); } +bool TCheckSchemeTxUnit::CheckMoveIndex(TActiveTransaction *activeTx) { + if (HasDuplicate(activeTx, "MoveIndex", &TPipeline::HasMoveIndex)) { + return false; + } + + const auto &mv = activeTx->GetSchemeTx().GetMoveIndex(); + if (!HasPathId(activeTx, mv, "MoveIndex")) { + return false; + } + + auto ret = CheckSchemaVersion(activeTx, mv); + return ret; +} + bool TCheckSchemeTxUnit::CheckCreateCdcStream(TActiveTransaction *activeTx) { if (HasDuplicate(activeTx, "CreateCdcStream", &TPipeline::HasCreateCdcStream)) { return false; diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index a4f722adc3..bdbae4d8ee 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -988,7 +988,7 @@ TUserTable::TPtr TDataShard::CreateUserTable(TTransactionContext& txc, THashMap<TPathId, TPathId> TDataShard::GetRemapIndexes(const NKikimrTxDataShard::TMoveTable& move) { THashMap<TPathId, TPathId> remap; for (const auto& item: move.GetReMapIndexes()) { - const auto prevId = PathIdFromPathId(item.GetPathId()); + const auto prevId = PathIdFromPathId(item.GetSrcPathId()); const auto newId = PathIdFromPathId(item.GetDstPathId()); remap[prevId] = newId; } @@ -1044,6 +1044,81 @@ TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxD return newTableInfo; } +TUserTable::TPtr TDataShard::MoveUserIndex(TOperation::TPtr op, const NKikimrTxDataShard::TMoveIndex& move, + const TActorContext& ctx, TTransactionContext& txc) +{ + const auto pathId = PathIdFromPathId(move.GetPathId()); + + Y_VERIFY(GetPathOwnerId() == pathId.OwnerId); + Y_VERIFY(TableInfos.contains(pathId.LocalPathId)); + + const auto version = move.GetTableSchemaVersion(); + Y_VERIFY(version); + + auto newTableInfo = AlterTableSchemaVersion(ctx, txc, pathId, version, false); + + NKikimrSchemeOp::TTableDescription schema; + newTableInfo->GetSchema(schema); + + if (move.GetReMapIndex().HasReplacedPathId()) { + const auto oldPathId = PathIdFromPathId(move.GetReMapIndex().GetReplacedPathId()); + newTableInfo->Indexes.erase(oldPathId); + + size_t id = 0; + bool found = false; + for (auto& indexDesc: *schema.MutableTableIndexes()) { + Y_VERIFY(indexDesc.HasPathOwnerId() && indexDesc.HasLocalPathId()); + auto pathId = TPathId(indexDesc.GetPathOwnerId(), indexDesc.GetLocalPathId()); + if (oldPathId == pathId) { + found = true; + break; + } else { + id++; + } + } + + if (found) { + schema.MutableTableIndexes()->DeleteSubrange(id, 1); + } + } + + const auto remapPrevId = PathIdFromPathId(move.GetReMapIndex().GetSrcPathId()); + const auto remapNewId = PathIdFromPathId(move.GetReMapIndex().GetDstPathId()); + Y_VERIFY(move.GetReMapIndex().HasDstName()); + const auto dstIndexName = move.GetReMapIndex().GetDstName(); + + for (auto& indexDesc: *schema.MutableTableIndexes()) { + Y_VERIFY(indexDesc.HasPathOwnerId() && indexDesc.HasLocalPathId()); + auto prevPathId = TPathId(indexDesc.GetPathOwnerId(), indexDesc.GetLocalPathId()); + if (remapPrevId != prevPathId) { + continue; + } + + indexDesc.SetPathOwnerId(remapNewId.OwnerId); + indexDesc.SetLocalPathId(remapNewId.LocalPathId); + + newTableInfo->Indexes[remapNewId] = newTableInfo->Indexes[prevPathId]; + newTableInfo->Indexes.erase(prevPathId); + + Y_VERIFY(move.GetReMapIndex().HasDstName()); + indexDesc.SetName(dstIndexName); + newTableInfo->Indexes[remapNewId].Name = dstIndexName; + } + + newTableInfo->SetSchema(schema); + + AddUserTable(pathId, newTableInfo); + + if (newTableInfo->NeedSchemaSnapshots()) { + AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); + } + + NIceDb::TNiceDb db(txc.DB); + PersistUserTable(db, pathId.LocalPathId, *newTableInfo); + + return newTableInfo; +} + TUserTable::TPtr TDataShard::AlterUserTable(const TActorContext& ctx, TTransactionContext& txc, const NKikimrSchemeOp::TTableDescription& alter) { diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index 72b36561e9..a325090a9a 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -428,7 +428,8 @@ bool TActiveTransaction::BuildSchemeTx() + (ui32)SchemeTx->HasMoveTable() + (ui32)SchemeTx->HasCreateCdcStreamNotice() + (ui32)SchemeTx->HasAlterCdcStreamNotice() - + (ui32)SchemeTx->HasDropCdcStreamNotice(); + + (ui32)SchemeTx->HasDropCdcStreamNotice() + + (ui32)SchemeTx->HasMoveIndex(); if (count != 1) return false; @@ -462,6 +463,8 @@ bool TActiveTransaction::BuildSchemeTx() SchemeTxType = TSchemaOperation::ETypeAlterCdcStream; else if (SchemeTx->HasDropCdcStreamNotice()) SchemeTxType = TSchemaOperation::ETypeDropCdcStream; + else if (SchemeTx->HasMoveIndex()) + SchemeTxType = TSchemaOperation::ETypeMoveIndex; else SchemeTxType = TSchemaOperation::ETypeUnknown; @@ -859,6 +862,7 @@ void TActiveTransaction::BuildExecutionPlan(bool loaded) plan.push_back(EExecutionUnitKind::FinalizeBuildIndex); plan.push_back(EExecutionUnitKind::DropIndexNotice); plan.push_back(EExecutionUnitKind::MoveTable); + plan.push_back(EExecutionUnitKind::MoveIndex); plan.push_back(EExecutionUnitKind::CreateCdcStream); plan.push_back(EExecutionUnitKind::AlterCdcStream); plan.push_back(EExecutionUnitKind::DropCdcStream); diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index b2289b055d..2330acacb2 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -52,6 +52,7 @@ struct TSchemaOperation { ETypeCreateCdcStream = 13, ETypeAlterCdcStream = 14, ETypeDropCdcStream = 15, + ETypeMoveIndex = 16, ETypeUnknown = Max<ui32>() }; @@ -103,6 +104,7 @@ struct TSchemaOperation { bool IsFinalizeBuildIndex() const { return Type == ETypeFinalizeBuildIndex; } bool IsDropIndexNotice() const { return Type == ETypeDropIndexNotice; } bool IsMove() const { return Type == ETypeMoveTable; } + bool IsMoveIndex() const { return Type == ETypeMoveIndex; } bool IsCreateCdcStream() const { return Type == ETypeCreateCdcStream; } bool IsAlterCdcStream() const { return Type == ETypeAlterCdcStream; } bool IsDropCdcStream() const { return Type == ETypeDropCdcStream; } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 2377567430..af75381e1c 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1440,6 +1440,8 @@ public: static THashMap<TPathId, TPathId> GetRemapIndexes(const NKikimrTxDataShard::TMoveTable& move); TUserTable::TPtr MoveUserTable(TOperation::TPtr op, const NKikimrTxDataShard::TMoveTable& move, const TActorContext& ctx, TTransactionContext& txc); + TUserTable::TPtr MoveUserIndex(TOperation::TPtr op, const NKikimrTxDataShard::TMoveIndex& move, + const TActorContext& ctx, TTransactionContext& txc); void DropUserTable(TTransactionContext& txc, ui64 tableId); ui32 GetLastLocalTid() const { return LastLocalTid; } diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index 7410d4b0fe..2e3a51c07d 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -163,6 +163,7 @@ public: bool HasFinalizeBuilIndex() const { return SchemaTx && SchemaTx->IsFinalizeBuildIndex(); } bool HasDropIndexNotice() const { return SchemaTx && SchemaTx->IsDropIndexNotice(); } bool HasMove() const { return SchemaTx && SchemaTx->IsMove(); } + bool HasMoveIndex() const { return SchemaTx && SchemaTx->IsMoveIndex(); } bool HasCreateCdcStream() const { return SchemaTx && SchemaTx->IsCreateCdcStream(); } bool HasAlterCdcStream() const { return SchemaTx && SchemaTx->IsAlterCdcStream(); } bool HasDropCdcStream() const { return SchemaTx && SchemaTx->IsDropCdcStream(); } diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp index 9f6e13fa5c..94f8ffc241 100644 --- a/ydb/core/tx/datashard/execution_unit.cpp +++ b/ydb/core/tx/datashard/execution_unit.cpp @@ -124,6 +124,8 @@ THolder<TExecutionUnit> CreateExecutionUnit(EExecutionUnitKind kind, return CreateAlterCdcStreamUnit(dataShard, pipeline); case EExecutionUnitKind::DropCdcStream: return CreateDropCdcStreamUnit(dataShard, pipeline); + case EExecutionUnitKind::MoveIndex: + return CreateMoveIndexUnit(dataShard, pipeline); default: Y_FAIL_S("Unexpected execution kind " << kind << " (" << (ui32)kind << ")"); } diff --git a/ydb/core/tx/datashard/execution_unit_ctors.h b/ydb/core/tx/datashard/execution_unit_ctors.h index f043a02fad..ec62449d1a 100644 --- a/ydb/core/tx/datashard/execution_unit_ctors.h +++ b/ydb/core/tx/datashard/execution_unit_ctors.h @@ -58,6 +58,7 @@ THolder<TExecutionUnit> CreateDropVolatileSnapshotUnit(TDataShard &dataShard, TP THolder<TExecutionUnit> CreateInitiateBuildIndexUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateFinalizeBuildIndexUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateDropIndexNoticeUnit(TDataShard &dataShard, TPipeline &pipeline); +THolder<TExecutionUnit> CreateMoveIndexUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateMoveTableUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateCreateCdcStreamUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateAlterCdcStreamUnit(TDataShard &dataShard, TPipeline &pipeline); diff --git a/ydb/core/tx/datashard/execution_unit_kind.h b/ydb/core/tx/datashard/execution_unit_kind.h index 88e763887f..4aefa0ca6e 100644 --- a/ydb/core/tx/datashard/execution_unit_kind.h +++ b/ydb/core/tx/datashard/execution_unit_kind.h @@ -62,6 +62,7 @@ enum class EExecutionUnitKind : ui32 { CreateCdcStream, AlterCdcStream, DropCdcStream, + MoveIndex, Count, Unspecified }; diff --git a/ydb/core/tx/datashard/move_index_unit.cpp b/ydb/core/tx/datashard/move_index_unit.cpp new file mode 100644 index 0000000000..0799b9daeb --- /dev/null +++ b/ydb/core/tx/datashard/move_index_unit.cpp @@ -0,0 +1,87 @@ +#include "datashard_impl.h" +#include "datashard_pipeline.h" +#include "execution_unit_ctors.h" + +namespace NKikimr { +namespace NDataShard { + +class TMoveIndexUnit : public TExecutionUnit { +public: + TMoveIndexUnit(TDataShard& dataShard, TPipeline& pipeline) + : TExecutionUnit(EExecutionUnitKind::MoveIndex, false, dataShard, pipeline) + { } + + bool IsReadyToExecute(TOperation::TPtr) const override { + return true; + } + + void MoveChangeRecords(NIceDb::TNiceDb& db, const NKikimrTxDataShard::TMoveIndex& move, TVector<NMiniKQL::IChangeCollector::TChange>& changeRecords) { + const auto remapPrevId = PathIdFromPathId(move.GetReMapIndex().GetSrcPathId()); + const auto remapNewId = PathIdFromPathId(move.GetReMapIndex().GetDstPathId()); + + for (auto& record: changeRecords) { + if (record.PathId() == remapPrevId) { + record.SetPathId(remapNewId); + DataShard.MoveChangeRecord(db, record.Order(), record.PathId()); + } + } + } + + EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override { + Y_VERIFY(op->IsSchemeTx()); + + TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); + Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); + + if (tx->GetSchemeTxType() != TSchemaOperation::ETypeMoveIndex) { + return EExecutionStatus::Executed; + } + + const auto& schemeTx = tx->GetSchemeTx(); + + if (!schemeTx.HasMoveIndex()) { + return EExecutionStatus::Executed; + } + + NIceDb::TNiceDb db(txc.DB); + TVector<NMiniKQL::IChangeCollector::TChange> changeRecords; + if (!DataShard.LoadChangeRecords(db, changeRecords)) { + return EExecutionStatus::Restart; + } + + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "TMoveIndexUnit Execute" + << ": schemeTx# " << schemeTx.DebugString() + << ": changeRecords size# " << changeRecords.size() + << ", at tablet# " << DataShard.TabletID()); + + const auto& params = schemeTx.GetMoveIndex(); + + DataShard.KillChangeSender(ctx); + + DataShard.MoveUserIndex(op, params, ctx, txc); + + DataShard.CreateChangeSender(ctx); + MoveChangeRecords(db, params, changeRecords); + DataShard.EnqueueChangeRecords(std::move(changeRecords)); + DataShard.MaybeActivateChangeSender(ctx); + + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); + op->Result()->SetStepOrderId(op->GetStepOrder().ToPair()); + + return EExecutionStatus::ExecutedNoMoreRestarts; + } + + void Complete(TOperation::TPtr, const TActorContext&) override { + // nothing + } +}; + +THolder<TExecutionUnit> CreateMoveIndexUnit( + TDataShard& dataShard, + TPipeline& pipeline) +{ + return THolder(new TMoveIndexUnit(dataShard, pipeline)); +} + +} // namespace NDataShard +} // namespace NKikimr diff --git a/ydb/core/tx/schemeshard/CMakeLists.txt b/ydb/core/tx/schemeshard/CMakeLists.txt index 35f51b27ca..480e801ccb 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.txt @@ -125,6 +125,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_mkdir.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_modify_acl.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index 2180805f4e..49455b4119 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -845,6 +845,8 @@ ISubOperationBase::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxSta return CreateFinalizeBuildIndexMainTable(NextPartId(), txState); case TTxState::ETxType::TxDropTableIndexAtMainTable: return CreateDropTableIndexAtMainTable(NextPartId(), txState); + case TTxState::ETxType::TxUpdateMainTableOnIndexMove: + return CreateUpdateMainTableOnIndexMove(NextPartId(), txState); case TTxState::ETxType::TxCreateLockForIndexBuild: return CreateLockForIndexBuild(NextPartId(), txState); case TTxState::ETxType::TxDropLock: @@ -1090,6 +1092,8 @@ ISubOperationBase::TPtr TOperation::ConstructPart(NKikimrSchemeOp::EOperationTyp return CreateMoveTable(NextPartId(), tx); case NKikimrSchemeOp::EOperationType::ESchemeOpMoveTableIndex: return CreateMoveTableIndex(NextPartId(), tx); + case NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex: + Y_FAIL("imposible"); // Replication case NKikimrSchemeOp::EOperationType::ESchemeOpCreateReplication: @@ -1148,6 +1152,8 @@ TVector<ISubOperationBase::TPtr> TOperation::ConstructParts(const TTxTransaction return CreateConsistentMoveTable(NextPartId(), tx, context); case NKikimrSchemeOp::EOperationType::ESchemeOpAlterTable: return CreateConsistentAlterTable(NextPartId(), tx, context); + case NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex: + return CreateConsistentMoveIndex(NextPartId(), tx, context); default: return {ConstructPart(opType, tx)}; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp index 1da7c610a4..4ef28ff926 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp @@ -268,6 +268,8 @@ void NTableState::UpdatePartitioningForTableModification(TOperationId operationI commonShardOp = TTxState::ConfigureParts; } else if (txState.TxType == TTxState::TxDropTableIndexAtMainTable) { commonShardOp = TTxState::ConfigureParts; + } else if (txState.TxType == TTxState::TxUpdateMainTableOnIndexMove) { + commonShardOp = TTxState::ConfigureParts; } else if (txState.TxType == TTxState::TxCreateCdcStreamAtTable) { commonShardOp = TTxState::ConfigureParts; } else if (txState.TxType == TTxState::TxAlterCdcStreamAtTable) { @@ -570,5 +572,31 @@ void IncParentDirAlterVersionWithRepublish(const TOperationId& opId, const TPath } } +NKikimrSchemeOp::TModifyScheme MoveTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst) { + NKikimrSchemeOp::TModifyScheme scheme; + + scheme.SetWorkingDir(dst.Parent().PathString()); + scheme.SetFailOnExist(true); + scheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveTable); + auto operation = scheme.MutableMoveTable(); + operation->SetSrcPath(src.PathString()); + operation->SetDstPath(dst.PathString()); + + return scheme; +} + +NKikimrSchemeOp::TModifyScheme MoveTableIndexTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst) { + NKikimrSchemeOp::TModifyScheme scheme; + + scheme.SetWorkingDir(dst.Parent().PathString()); + scheme.SetFailOnExist(true); + scheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveTableIndex); + auto operation = scheme.MutableMoveTableIndex(); + operation->SetSrcPath(src.PathString()); + operation->SetDstPath(dst.PathString()); + + return scheme; +} + } } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index efff3d0645..bd9a5cb9d3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -13,6 +13,9 @@ TSet<ui32> AllIncomingEvents(); void IncParentDirAlterVersionWithRepublishSafeWithUndo(const TOperationId& opId, const TPath& path, TSchemeShard* ss, TSideEffects& onComplete); void IncParentDirAlterVersionWithRepublish(const TOperationId& opId, const TPath& path, TOperationContext& context); +NKikimrSchemeOp::TModifyScheme MoveTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst); +NKikimrSchemeOp::TModifyScheme MoveTableIndexTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst); + namespace NTableState { bool CollectProposeTransactionResults(const TOperationId& operationId, const TEvDataShard::TEvProposeTransactionResult::TPtr& ev, TOperationContext& context); @@ -459,7 +462,6 @@ public: } - class TCreateParts: public TSubOperationState { private: TOperationId OperationId; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp new file mode 100644 index 0000000000..67f67a39bc --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp @@ -0,0 +1,650 @@ +#include "schemeshard__operation_part.h" +#include "schemeshard__operation_common.h" +#include "schemeshard_path_element.h" + +#include "schemeshard_impl.h" + +#include <ydb/core/base/path.h> +#include <ydb/core/protos/flat_tx_scheme.pb.h> +#include <ydb/core/protos/flat_scheme_op.pb.h> + +namespace { + +using namespace NKikimr; +using namespace NSchemeShard; + +class TConfigureParts: public TSubOperationState { +private: + TOperationId OperationId; + + TString DebugHint() const override { + return TStringBuilder() + << "TUpdateMainTableOnIndexMove TConfigureParts" + << " operationId#" << OperationId; + } + +public: + TConfigureParts(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), {}); + } + + bool HandleReply(TEvDataShard::TEvProposeTransactionResult::TPtr& ev, TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvProposeTransactionResult" + << " at tabletId# " << ssId); + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvProposeTransactionResult" + << " message# " << ev->Get()->Record.ShortDebugString()); + + if (!NTableState::CollectProposeTransactionResults(OperationId, ev, context)) { + return false; + } + + return true; + } + + + bool ProgressState(TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " ProgressState" + << ", at schemeshard: " << ssId); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_VERIFY(txState); + Y_VERIFY(txState->TxType == TTxState::TxUpdateMainTableOnIndexMove); + + //fill txShards + if (NTableState::CheckPartitioningChangedForTableModification(*txState, context)) { + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " UpdatePartitioningForTableModification"); + NTableState::UpdatePartitioningForTableModification(OperationId, *txState, context); + } + + txState->ClearShardsInProgress(); + + TString txBody; + { + TPathId pathId = txState->TargetPathId; + Y_VERIFY(context.SS->PathsById.contains(pathId)); + TPathElement::TPtr path = context.SS->PathsById.at(pathId); + Y_VERIFY(path); + + Y_VERIFY(context.SS->Tables.contains(pathId)); + TTableInfo::TPtr table = context.SS->Tables.at(pathId); + Y_VERIFY(table); + + auto seqNo = context.SS->StartRound(*txState); + + NKikimrTxDataShard::TFlatSchemeTransaction tx; + context.SS->FillSeqNo(tx, seqNo); + + auto notice = tx.MutableMoveIndex(); + PathIdFromPathId(pathId, notice->MutablePathId()); + notice->SetTableSchemaVersion(table->AlterVersion + 1); + + auto remap = notice->MutableReMapIndex(); + + auto opId = OperationId; + while (true) { + opId.second += 1; + TTxState* txState = context.SS->TxInFlight.FindPtr(opId); + if (!txState) { + TStringStream msg; + msg << "txState for opId: " << opId + << " has not been found, cur opId: " << OperationId; + Y_FAIL("%s", msg.Str().data()); + } + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " Trying to find txState with TxMoveTableIndex type" + << " cur opId: " << opId + << ", type: " << (int)txState->TxType); + + if (txState->TxType == TTxState::TxMoveTableIndex) { + TPath srcPath = TPath::Init(txState->SourcePathId, context.SS); + auto parent = srcPath.Parent(); + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " Checking pathId for " + << "opId: " << opId + << ", type: " << (int)txState->TxType + << ", parent pathId: " << pathId); + if (pathId == parent.Base()->PathId) { + PathIdFromPathId(txState->SourcePathId, remap->MutableSrcPathId()); + PathIdFromPathId(txState->TargetPathId, remap->MutableDstPathId()); + auto targetIndexName = context.SS->PathsById.at(txState->TargetPathId); + + for (const auto& [_, childPathId] : path->GetChildren()) { + Y_VERIFY(context.SS->PathsById.contains(childPathId)); + auto childPath = context.SS->PathsById.at(childPathId); + + if (childPath->Name == targetIndexName->Name) { + PathIdFromPathId(childPathId, remap->MutableReplacedPathId()); + remap->SetDstName(childPath->Name); + } + } + break; + } + } + } + Y_VERIFY(remap->HasSrcPathId()); + Y_VERIFY(remap->HasDstPathId()); + Y_VERIFY(remap->HasDstName()); + + Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&txBody); + } + + Y_VERIFY(txState->Shards.size()); + for (ui32 i = 0; i < txState->Shards.size(); ++i) { + auto idx = txState->Shards[i].Idx; + auto datashardId = context.SS->ShardInfos[idx].TabletID; + + THolder<TEvDataShard::TEvProposeTransaction> event = + THolder(new TEvDataShard::TEvProposeTransaction(NKikimrTxDataShard::TX_KIND_SCHEME, + context.SS->TabletID(), + context.Ctx.SelfID, + ui64(OperationId.GetTxId()), + txBody, + context.SS->SelectProcessingPrarams(txState->TargetPathId))); + + context.OnComplete.BindMsgToPipe(OperationId, datashardId, idx, event.Release()); + } + + txState->UpdateShardsInProgress(TTxState::ConfigureParts); + return false; + } +}; + +class TPropose: public TSubOperationState { +private: + TOperationId OperationId; + + TString DebugHint() const override { + return TStringBuilder() + << "TMoveIndex TPropose" + << ", operationId: " << OperationId; + } +public: + TPropose(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), {TEvHive::TEvCreateTabletReply::EventType, TEvDataShard::TEvProposeTransactionResult::EventType}); + } + + bool HandleReply(TEvDataShard::TEvSchemaChanged::TPtr& ev, TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + const auto& evRecord = ev->Get()->Record; + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvSchemaChanged" + << " at tablet: " << ssId); + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvSchemaChanged" + << " triggered early" + << ", message: " << evRecord.ShortDebugString()); + + NTableState::CollectSchemaChanged(OperationId, ev, context); + return false; + } + + bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { + TStepId step = TStepId(ev->Get()->StepId); + TTabletId ssId = context.SS->SelfTabletId(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvOperationPlan" + << ", step: " << step + << ", at schemeshard: " << ssId); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_VERIFY(txState); + Y_VERIFY(txState->TxType == TTxState::TxUpdateMainTableOnIndexMove); + + TPath path = TPath::Init(txState->TargetPathId, context.SS); + Y_VERIFY(path.IsResolved()); + + NIceDb::TNiceDb db(context.GetDB()); + + txState->PlanStep = step; + context.SS->PersistTxPlanStep(db, OperationId, step); + + context.SS->ChangeTxState(db, OperationId, TTxState::DeletePathBarrier); + + return true; + } + + bool ProgressState(TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " ProgressState" + << ", at schemeshard: " << ssId); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_VERIFY(txState); + Y_VERIFY(txState->TxType == TTxState::TxUpdateMainTableOnIndexMove); + Y_VERIFY(txState->MinStep); + + TSet<TTabletId> shardSet; + for (const auto& shard : txState->Shards) { + TShardIdx idx = shard.Idx; + TTabletId tablet = context.SS->ShardInfos.at(idx).TabletID; + shardSet.insert(tablet); + } + + context.OnComplete.ProposeToCoordinator(OperationId, txState->TargetPathId, txState->MinStep, std::move(shardSet)); + return false; + } +}; + +class TDeleteTableBarrier: public TSubOperationState { +private: + TOperationId OperationId; + + TString DebugHint() const override { + return TStringBuilder() + << "TMoveIndex TDeleteTableBarrier" + << " operationId: " << OperationId; + } + +public: + TDeleteTableBarrier(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), {TEvHive::TEvCreateTabletReply::EventType, TEvDataShard::TEvProposeTransactionResult::EventType, TEvPrivate::TEvOperationPlan::EventType}); + } + + bool HandleReply(TEvDataShard::TEvSchemaChanged::TPtr& ev, TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvDataShard::TEvSchemaChanged" + << ", save it" + << ", at schemeshard: " << ssId); + + NTableState::CollectSchemaChanged(OperationId, ev, context); + return false; + } + + bool HandleReply(TEvPrivate::TEvCompleteBarrier::TPtr& ev, TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvPrivate:TEvCompleteBarrier" + << ", msg: " << ev->Get()->ToString() + << ", at tablet" << ssId); + + + TTxState* txState = context.SS->FindTx(OperationId); + Y_VERIFY(txState); + + + TPath path = TPath::Init(txState->TargetPathId, context.SS); + TTableInfo::TPtr table = context.SS->Tables.at(txState->TargetPathId); + + Y_VERIFY(txState->PlanStep); + + NIceDb::TNiceDb db(context.GetDB()); + table->AlterVersion += 1; + + context.SS->PersistTableAlterVersion(db, path->PathId, table); + + context.OnComplete.PublishToSchemeBoard(OperationId, path->PathId); + context.SS->ChangeTxState(db, OperationId, TTxState::ProposedWaitParts); + return true; + } + + bool ProgressState(TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + context.OnComplete.RouteByTabletsFromOperation(OperationId); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_VERIFY(txState); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " ProgressState" + << ", operation type: " << TTxState::TypeName(txState->TxType) + << ", at tablet" << ssId); + + context.OnComplete.Barrier(OperationId, "RenamePathBarrier"); + return false; + } +}; + +class TUpdateMainTableOnIndexMove: public TSubOperation { + const TOperationId OperationId; + const TTxTransaction Transaction; + TTxState::ETxState State = TTxState::Invalid; + + TTxState::ETxState NextState() { + return TTxState::ConfigureParts; + } + + TTxState::ETxState NextState(TTxState::ETxState state) { + switch(state) { + case TTxState::Waiting: + case TTxState::ConfigureParts: + return TTxState::Propose; + case TTxState::Propose: + return TTxState::DeletePathBarrier; + case TTxState::DeletePathBarrier: + return TTxState::ProposedWaitParts; + + case TTxState::ProposedWaitParts: + return TTxState::Done; + default: + return TTxState::Invalid; + } + return TTxState::Invalid; + } + + TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) { + switch(state) { + case TTxState::Waiting: + case TTxState::ConfigureParts: + return MakeHolder<TConfigureParts>(OperationId); + case TTxState::Propose: + return MakeHolder<TPropose>(OperationId); + case TTxState::DeletePathBarrier: + return MakeHolder<TDeleteTableBarrier>(OperationId); + case TTxState::ProposedWaitParts: + return MakeHolder<NTableState::TProposedWaitParts>(OperationId); + case TTxState::Done: + return MakeHolder<TDone>(OperationId); + default: + return nullptr; + } + } + + void StateDone(TOperationContext& context) override { + State = NextState(State); + + if (State != TTxState::Invalid) { + SetState(SelectStateFunc(State)); + context.OnComplete.ActivateTx(OperationId); + } + } + +public: + TUpdateMainTableOnIndexMove(TOperationId id, const TTxTransaction& tx) + : OperationId(id) + , Transaction(tx) + { + } + + TUpdateMainTableOnIndexMove(TOperationId id, TTxState::ETxState state) + : OperationId(id) + , State(state) + { + SetState(SelectStateFunc(state)); + } + + THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override { + const TTabletId ssId = context.SS->SelfTabletId(); + + auto opDescr = Transaction.GetAlterTable(); + + const TString workingDir = Transaction.GetWorkingDir(); + const TString mainTableName = opDescr.GetName(); + + LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TUpdateMainTableOnIndexMove Propose" + << ", path: " << workingDir << "/" << mainTableName + << ", opId: " << OperationId + << ", at schemeshard: " << ssId); + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TUpdateMainTableOnIndexMove Propose" + << ", message: " << Transaction.ShortDebugString() + << ", at schemeshard: " << ssId); + + auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), ui64(ssId)); + + TPath tablePath = TPath::Resolve(workingDir, context.SS).Dive(mainTableName); + { + TPath::TChecker checks = tablePath.Check(); + checks + .NotEmpty() + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsTable() + .NotUnderOperation() + .IsCommonSensePath(); + + if (!checks) { + TString explain = TStringBuilder() << "path fail checks" + << ", path: " << tablePath.PathString(); + auto status = checks.GetStatus(&explain); + result->SetError(status, explain); + return result; + } + } + + TString errStr; + + if (!context.SS->CheckApplyIf(Transaction, errStr)) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); + return result; + } + + if (!context.SS->CheckLocks(tablePath.Base()->PathId, Transaction, errStr)) { + result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); + return result; + } + + Y_VERIFY(context.SS->Tables.contains(tablePath.Base()->PathId)); + TTableInfo::TPtr table = context.SS->Tables.at(tablePath.Base()->PathId); + + Y_VERIFY(table->AlterVersion != 0); + Y_VERIFY(!table->AlterData); + + Y_VERIFY(!context.SS->FindTx(OperationId)); + + auto guard = context.DbGuard(); + context.MemChanges.GrabPath(context.SS, tablePath.Base()->PathId); + context.MemChanges.GrabNewTxState(context.SS, OperationId); + + context.DbChanges.PersistPath(tablePath.Base()->PathId); + context.DbChanges.PersistTxState(OperationId); + + TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxUpdateMainTableOnIndexMove, tablePath.Base()->PathId); + txState.State = TTxState::ConfigureParts; + + tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAlter; + tablePath.Base()->LastTxId = OperationId.GetTxId(); + + for (auto splitTx: table->GetSplitOpsInFlight()) { + context.OnComplete.Dependence(splitTx.GetTxId(), OperationId.GetTxId()); + } + + context.OnComplete.ActivateTx(OperationId); + + State = NextState(); + SetState(SelectStateFunc(State)); + return result; + } + + void AbortPropose(TOperationContext& context) override { + LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TUpdateMainTableOnIndexMove AbortPropose" + << ", opId: " << OperationId + << ", at schemeshard: " << context.SS->TabletID()); + } + + void AbortUnsafe(TTxId forceDropTxId, TOperationContext& context) override { + LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TUpdateMainTableOnIndexMove AbortUnsafe" + << ", opId: " << OperationId + << ", forceDropId: " << forceDropTxId + << ", at schemeshard: " << context.SS->TabletID()); + + context.OnComplete.DoneOperation(OperationId); + } + +}; + +} + +namespace NKikimr { +namespace NSchemeShard { + +TVector<ISubOperationBase::TPtr> CreateConsistentMoveIndex(TOperationId nextId, const TTxTransaction& tx, TOperationContext& context) { + Y_VERIFY(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex); + + TVector<ISubOperationBase::TPtr> result; + + { + TString errStr; + if (!context.SS->CheckApplyIf(tx, errStr)) { + return {CreateReject(nextId, NKikimrScheme::EStatus::StatusPreconditionFailed, errStr)}; + } + } + + auto moving = tx.GetMoveIndex(); + + const auto& mainTable = moving.GetTablePath(); + const auto& srcIndex = moving.GetSrcPath(); + const auto& dstIndex = moving.GetDstPath(); + + TPath mainTablePath = TPath::Resolve(mainTable, context.SS); + { + TPath::TChecker checks = mainTablePath.Check(); + checks + .NotEmpty() + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .IsTable() + .NotUnderDeleting() + .NotUnderOperation() + .IsCommonSensePath(); + + if (!checks) { + TString explain = TStringBuilder() << "path fail checks" + << ", path: " << mainTablePath.PathString(); + auto status = checks.GetStatus(&explain); + return {CreateReject(nextId, status, explain)}; + } + } + + TPath workingDirPath = mainTablePath.Parent(); + + { + TStringBuilder explain = TStringBuilder() << "fail checks"; + + if (!context.SS->CheckLocks(mainTablePath.Base()->PathId, tx, explain)) { + return {CreateReject(nextId, NKikimrScheme::StatusMultipleModifications, explain)}; + } + } + + TPath srcIndexPath = mainTablePath.Child(srcIndex); + { + TPath::TChecker checks = srcIndexPath.Check(); + checks + .NotEmpty() + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .IsTableIndex() + .NotUnderDeleting() + .NotUnderOperation(); + + if (!checks) { + TString explain = TStringBuilder() << "path fail checks" + << ", path: " << srcIndexPath.PathString(); + auto status = checks.GetStatus(&explain); + return {CreateReject(nextId, status, explain)}; + } + } + + TString errStr; + if (!context.SS->CheckApplyIf(tx, errStr)) { + return {CreateReject(nextId, NKikimrScheme::StatusPreconditionFailed, errStr)}; + } + + if (!context.SS->CheckLocks(mainTablePath.Base()->PathId, tx, errStr)) { + return {CreateReject(nextId, NKikimrScheme::StatusMultipleModifications, errStr)}; + } + + TPath dstIndexPath = mainTablePath.Child(dstIndex); + + { + auto mainTableAlter = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTable); + auto operation = mainTableAlter.MutableAlterTable(); + operation->SetName(mainTablePath.LeafName()); + result.push_back(new TUpdateMainTableOnIndexMove(NextPartId(nextId, result), mainTableAlter)); + } + + { + TPath::TChecker checks = dstIndexPath.Check(); + checks + .NotEmpty() + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderOperation() + .IsTableIndex(); + + if (checks) { + { + auto indexDropping = TransactionTemplate(mainTablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTableIndex); + auto operation = indexDropping.MutableDrop(); + operation->SetName(dstIndex); + + result.push_back(CreateDropTableIndex(NextPartId(nextId, result), indexDropping)); + } + + for (const auto& items: dstIndexPath.Base()->GetChildren()) { + Y_VERIFY(context.SS->PathsById.contains(items.second)); + auto implPath = context.SS->PathsById.at(items.second); + if (implPath->Dropped()) { + continue; + } + + auto implTable = context.SS->PathsById.at(items.second); + Y_VERIFY(implTable->IsTable()); + + auto implTableDropping = TransactionTemplate(dstIndexPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTable); + auto operation = implTableDropping.MutableDrop(); + operation->SetName(items.first); + + result.push_back(CreateDropTable(NextPartId(nextId, result), implTableDropping)); + } + } + } + + result.push_back(CreateMoveTableIndex(TOperationId(nextId.GetTxId(), + nextId.GetSubTxId() + result.size()), + MoveTableIndexTask(srcIndexPath, dstIndexPath))); + + TString srcImplTableName = srcIndexPath.Base()->GetChildren().begin()->first; + TPath srcImplTable = srcIndexPath.Child(srcImplTableName); + + Y_VERIFY(srcImplTable.Base()->PathId == srcIndexPath.Base()->GetChildren().begin()->second); + + TPath dstImplTable = dstIndexPath.Child(srcImplTableName); + + result.push_back(CreateMoveTable(TOperationId(nextId.GetTxId(), + nextId.GetSubTxId() + result.size()), + MoveTableTask(srcImplTable, dstImplTable))); + return result; +} + +ISubOperationBase::TPtr CreateUpdateMainTableOnIndexMove(TOperationId id, const TTxTransaction& tx) { + return new TUpdateMainTableOnIndexMove(id, tx); +} + +ISubOperationBase::TPtr CreateUpdateMainTableOnIndexMove(TOperationId id, TTxState::ETxState state) { + return new TUpdateMainTableOnIndexMove(id, state); +} + +} +} diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp index 3e9b02e0ad..dc46563bab 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp @@ -109,7 +109,7 @@ public: Y_VERIFY(dstIndexPath.IsResolved()); auto remap = move->AddReMapIndexes(); - PathIdFromPathId(srcIndexPath->PathId, remap->MutablePathId()); + PathIdFromPathId(srcIndexPath->PathId, remap->MutableSrcPathId()); PathIdFromPathId(dstIndexPath->PathId, remap->MutableDstPathId()); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp index 6ae858ea09..360126a2b5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp @@ -438,23 +438,13 @@ public: .IsAtLocalSchemeShard() .IsResolved(); - if (dstParentPath.IsUnderDeleting()) { - checks - .IsUnderDeleting() - .IsUnderTheSameOperation(OperationId.GetTxId()); - } else if (dstParentPath.IsUnderMoving()) { - // it means that dstPath is free enough to be the move destination - checks - .IsUnderMoving() - .IsUnderTheSameOperation(OperationId.GetTxId()); - } else if (dstParentPath.IsUnderCreating()) { - checks - .IsUnderCreating() - .IsUnderTheSameOperation(OperationId.GetTxId()); - } else { - checks - .NotUnderOperation(); - } + if (dstParentPath.IsUnderOperation()) { + checks + .IsUnderTheSameOperation(OperationId.GetTxId()); + } else { + checks + .NotUnderOperation(); + } if (!checks) { TString explain = TStringBuilder() << "parent dst path fail checks" diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp index bcca666ffd..57b66f0ca8 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp @@ -8,37 +8,6 @@ #include <ydb/core/protos/flat_tx_scheme.pb.h> #include <ydb/core/protos/flat_scheme_op.pb.h> -namespace { - - -NKikimrSchemeOp::TModifyScheme MoveTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst) { - NKikimrSchemeOp::TModifyScheme scheme; - - scheme.SetWorkingDir(dst.Parent().PathString()); - scheme.SetFailOnExist(true); - scheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveTable); - auto operation = scheme.MutableMoveTable(); - operation->SetSrcPath(src.PathString()); - operation->SetDstPath(dst.PathString()); - - return scheme; -} - -NKikimrSchemeOp::TModifyScheme MoveTableIndexTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst) { - NKikimrSchemeOp::TModifyScheme scheme; - - scheme.SetWorkingDir(dst.Parent().PathString()); - scheme.SetFailOnExist(true); - scheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveTableIndex); - auto operation = scheme.MutableMoveTableIndex(); - operation->SetSrcPath(src.PathString()); - operation->SetDstPath(dst.PathString()); - - return scheme; -} - -} - namespace NKikimr { namespace NSchemeShard { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index 6047d1d537..37f49d691c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -285,6 +285,9 @@ TVector<ISubOperationBase::TPtr> CreateDropIndex(TOperationId id, const TTxTrans ISubOperationBase::TPtr CreateDropTableIndexAtMainTable(TOperationId id, const TTxTransaction& tx); ISubOperationBase::TPtr CreateDropTableIndexAtMainTable(TOperationId id, TTxState::ETxState state); +ISubOperationBase::TPtr CreateUpdateMainTableOnIndexMove(TOperationId id, const TTxTransaction& tx); +ISubOperationBase::TPtr CreateUpdateMainTableOnIndexMove(TOperationId id, TTxState::ETxState state); + /// CDC // Create TVector<ISubOperationBase::TPtr> CreateNewCdcStream(TOperationId id, const TTxTransaction& tx, TOperationContext& context); @@ -441,10 +444,14 @@ ISubOperationBase::TPtr CreateAlterLogin(TOperationId id, const TTxTransaction& ISubOperationBase::TPtr CreateAlterLogin(TOperationId id, TTxState::ETxState state); TVector<ISubOperationBase::TPtr> CreateConsistentMoveTable(TOperationId id, const TTxTransaction& tx, TOperationContext& context); +TVector<ISubOperationBase::TPtr> CreateConsistentMoveIndex(TOperationId id, const TTxTransaction& tx, TOperationContext& context); ISubOperationBase::TPtr CreateMoveTable(TOperationId id, const TTxTransaction& tx); ISubOperationBase::TPtr CreateMoveTable(TOperationId id, TTxState::ETxState state); +ISubOperationBase::TPtr CreateMoveIndex(TOperationId id, const TTxTransaction& tx); +ISubOperationBase::TPtr CreateMoveIndex(TOperationId id, TTxState::ETxState state); + ISubOperationBase::TPtr CreateMoveTableIndex(TOperationId id, const TTxTransaction& tx); ISubOperationBase::TPtr CreateMoveTableIndex(TOperationId id, TTxState::ETxState state); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp index 3b59bcfad0..9979742fc6 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp @@ -848,6 +848,14 @@ void TSideEffects::DoDoneTransactions(TSchemeShard *ss, NTabletFlatExecutor::TTr << ", publications: " << operation->Publications.size() << ", subscribers: " << operation->Subscribers.size()); + for (const auto& pub : operation->Publications) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Publication details: " + << " tx: " << txId + << ", " << pub.first + << ", " << pub.second); + } + ss->Publications[txId] = { std::move(operation->Publications), std::move(operation->Subscribers) diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 7d7efa62ef..15bb6bc42e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1279,6 +1279,7 @@ TPathElement::EPathState TSchemeShard::CalcPathState(TTxState::ETxType txType, T case TTxState::TxAlterSequence: case TTxState::TxAlterReplication: case TTxState::TxAlterBlobDepot: + case TTxState::TxUpdateMainTableOnIndexMove: return TPathElement::EPathState::EPathStateAlter; case TTxState::TxDropTable: case TTxState::TxDropPQGroup: diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index 6301cda130..0bc90d41c7 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -327,6 +327,10 @@ void TPathDescriber::DescribeTable(const TActorContext& ctx, TPathId pathId, TPa continue; } + if (!childPath->IsCreateFinished()) { + continue; + } + switch (childPath->PathType) { case NKikimrSchemeOp::EPathTypeTableIndex: Self->DescribeTableIndex(childPathId, childName, *entry->AddTableIndexes()); diff --git a/ydb/core/tx/schemeshard/schemeshard_path_element.h b/ydb/core/tx/schemeshard/schemeshard_path_element.h index df44d08ec9..e3870d3530 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_element.h +++ b/ydb/core/tx/schemeshard/schemeshard_path_element.h @@ -515,6 +515,14 @@ public: return PathState == EPathState::EPathStateMigrated; } + bool IsUnderMoving() const { + return PathState == EPathState::EPathStateMoving; + } + + bool IsUnderCreating() const { + return PathState == EPathState::EPathStateCreate; + } + bool PlannedToCreate() const { return PathState == EPathState::EPathStateCreate; } diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h index 27773215ac..e6a6773858 100644 --- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h +++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h @@ -112,7 +112,8 @@ struct TTxState { item(TxDropReplication, 67) \ item(TxCreateBlobDepot, 68) \ item(TxAlterBlobDepot, 69) \ - item(TxDropBlobDepot, 70) + item(TxDropBlobDepot, 70) \ + item(TxUpdateMainTableOnIndexMove, 71) // TX_STATE_TYPE_ENUM @@ -340,6 +341,7 @@ struct TTxState { case TxDropSequence: case TxDropReplication: case TxDropBlobDepot: + case TxUpdateMainTableOnIndexMove: return false; case TxAlterPQGroup: case TxAlterTable: @@ -422,6 +424,7 @@ struct TTxState { case TxFinalizeBuildIndex: case TxDropTableIndexAtMainTable: // just increments schemaversion at main table case TxDropCdcStreamAtTable: + case TxUpdateMainTableOnIndexMove: return false; case TxAlterPQGroup: case TxAlterTable: @@ -506,6 +509,7 @@ struct TTxState { case TxDropLock: case TxDropTableIndexAtMainTable: case TxDropCdcStreamAtTable: + case TxUpdateMainTableOnIndexMove: return false; case TxAlterPQGroup: case TxAlterTable: diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index 9ed954553c..c94d2d4232 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -391,6 +391,33 @@ namespace NSchemeShardUT_Private { TestModificationResults(runtime, txId, expectedResults); } + TEvSchemeShard::TEvModifySchemeTransaction* MoveIndexRequest(ui64 txId, const TString& tablePath, const TString& srcPath, const TString& dstPath, ui64 schemeShard, const TApplyIf& applyIf) { + THolder<TEvSchemeShard::TEvModifySchemeTransaction> evTx = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(txId, schemeShard); + auto transaction = evTx->Record.AddTransaction(); + transaction->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex); + SetApplyIf(*transaction, applyIf); + + auto descr = transaction->MutableMoveIndex(); + descr->SetTablePath(tablePath); + descr->SetSrcPath(srcPath); + descr->SetDstPath(dstPath); + + return evTx.Release(); + } + + void AsyncMoveIndex(TTestActorRuntime& runtime, ui64 txId, const TString& tablePath, const TString& srcPath, const TString& dstPath, ui64 schemeShard) { + TActorId sender = runtime.AllocateEdgeActor(); + ForwardToTablet(runtime, schemeShard, sender, MoveIndexRequest(txId, tablePath, srcPath, dstPath, schemeShard)); + } + + void TestMoveIndex(TTestActorRuntime& runtime, ui64 txId, const TString& tablePath, const TString& src, const TString& dst, const TVector<TEvSchemeShard::EStatus>& expectedResults) { + TestMoveIndex(runtime, TTestTxConfig::SchemeShard, txId, tablePath, src, dst, expectedResults); + } + + void TestMoveIndex(TTestActorRuntime& runtime, ui64 schemeShard, ui64 txId, const TString& tablePath, const TString& src, const TString& dst, const TVector<TEvSchemeShard::EStatus>& expectedResults) { + AsyncMoveIndex(runtime, txId, tablePath, src, dst, schemeShard); + TestModificationResults(runtime, txId, expectedResults); + } TEvSchemeShard::TEvModifySchemeTransaction* LockRequest(ui64 txId, const TString &parentPath, const TString& name) { THolder<TEvSchemeShard::TEvModifySchemeTransaction> evTx = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(txId, TTestTxConfig::SchemeShard); diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h index 24d6ddd6d7..bda05c59c4 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h @@ -248,6 +248,12 @@ namespace NSchemeShardUT_Private { void TestMoveTable(TTestActorRuntime& runtime, ui64 txId, const TString& srcMove, const TString& dstMove, const TVector<TEvSchemeShard::EStatus>& expectedResults = {NKikimrScheme::StatusAccepted}); void TestMoveTable(TTestActorRuntime& runtime, ui64 schemeShard, ui64 txId, const TString& srcMove, const TString& dstMove, const TVector<TEvSchemeShard::EStatus>& expectedResults = {NKikimrScheme::StatusAccepted}); + // move index + TEvTx* MoveIndexRequest(ui64 txId, const TString& tablePath, const TString& srcPath, const TString& dstPath, ui64 schemeShard = TTestTxConfig::SchemeShard, const TApplyIf& applyIf = {}); + void AsyncMoveIndex(TTestActorRuntime& runtime, ui64 txId, const TString& tablePath, const TString& srcPath, const TString& dstPath, ui64 schemeShard = TTestTxConfig::SchemeShard); + void TestMoveIndex(TTestActorRuntime& runtime, ui64 txId, const TString& tablePath, const TString& srcMove, const TString& dstMove, const TVector<TEvSchemeShard::EStatus>& expectedResults = {NKikimrScheme::StatusAccepted}); + void TestMoveIndex(TTestActorRuntime& runtime, ui64 schemeShard, ui64 txId, const TString& tablePath, const TString& srcMove, const TString& dstMove, const TVector<TEvSchemeShard::EStatus>& expectedResults = {NKikimrScheme::StatusAccepted}); + // locks TEvTx* LockRequest(ui64 txId, const TString &parentPath, const TString& name); void AsyncLock(TTestActorRuntime& runtime, ui64 schemeShard, ui64 txId, const TString& parentPath, const TString& name); diff --git a/ydb/core/tx/schemeshard/ut_move.cpp b/ydb/core/tx/schemeshard/ut_move.cpp index d021c6dd72..225dae7ada 100644 --- a/ydb/core/tx/schemeshard/ut_move.cpp +++ b/ydb/core/tx/schemeshard/ut_move.cpp @@ -728,6 +728,263 @@ Y_UNIT_TEST_SUITE(TSchemeShardMoveTest) { NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)}); } + Y_UNIT_TEST(MoveIndex) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, + TTestEnvOptions() + .EnableAsyncIndexes(true)); + ui64 txId = 100; + + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value0" Type: "Utf8" } + Columns { Name: "value1" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Sync" + KeyColumnNames: ["value0"] + } + IndexDescription { + Name: "Async" + KeyColumnNames: ["value1"] + Type: EIndexTypeGlobalAsync + } + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePath(runtime, "/MyRoot"), + {NLs::ChildrenCount(1), + NLs::PathsInsideDomain(5), + NLs::ShardsInsideDomain(3)}); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), + {NLs::IsTable, + NLs::PathVersionEqual(3), + NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"}), + NLs::IndexesCount(2)}); + + TestMoveIndex(runtime, ++txId, "/MyRoot/Table", "Sync", "MovedSync"); + env.TestWaitNotification(runtime, txId); + + TestMoveIndex(runtime, ++txId, "/MyRoot/Table", "Async", "MovedAsync"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), + {NLs::IsTable, + NLs::PathVersionEqual(5), + NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"})}); + + TestDescribeResult(DescribePath(runtime, "/MyRoot"), + {NLs::ChildrenCount(1), + NLs::PathsInsideDomain(5), + NLs::ShardsInsideDomain(3)}); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/MovedSync", true, true, true), + {NLs::PathExist, + NLs::IndexType(NKikimrSchemeOp::EIndexTypeGlobal), + NLs::IndexKeys({"value0"}), + NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)}); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/MovedAsync", true, true, true), + {NLs::PathExist, + NLs::IndexType(NKikimrSchemeOp::EIndexTypeGlobalAsync), + NLs::IndexKeys({"value1"}), + NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)}); + } + + Y_UNIT_TEST(MoveIndexSameDst) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, + TTestEnvOptions() + .EnableAsyncIndexes(true)); + ui64 txId = 100; + + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value0" Type: "Utf8" } + Columns { Name: "value1" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Sync" + KeyColumnNames: ["value0"] + } + IndexDescription { + Name: "Async" + KeyColumnNames: ["value1"] + Type: EIndexTypeGlobalAsync + } + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePath(runtime, "/MyRoot"), + {NLs::ChildrenCount(1), + NLs::PathsInsideDomain(5), + NLs::ShardsInsideDomain(3)}); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), + {NLs::IsTable, + NLs::PathVersionEqual(3), + NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"}), + NLs::IndexesCount(2)}); + + TestMoveIndex(runtime, ++txId, "/MyRoot/Table", "Sync", "Sync", {NKikimrScheme::StatusInvalidParameter}); + env.TestWaitNotification(runtime, txId); + + TestMoveIndex(runtime, ++txId, "/MyRoot/Table", "Async", "Async", {NKikimrScheme::StatusInvalidParameter}); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), + {NLs::IsTable, + NLs::PathVersionEqual(3), + NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"})}); + + TestDescribeResult(DescribePath(runtime, "/MyRoot"), + {NLs::ChildrenCount(1), + NLs::PathsInsideDomain(5), + NLs::ShardsInsideDomain(3)}); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/Sync", true, true, true), + {NLs::PathExist, + NLs::IndexType(NKikimrSchemeOp::EIndexTypeGlobal), + NLs::IndexKeys({"value0"}), + NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)}); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/Async", true, true, true), + {NLs::PathExist, + NLs::IndexType(NKikimrSchemeOp::EIndexTypeGlobalAsync), + NLs::IndexKeys({"value1"}), + NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)}); + } + + Y_UNIT_TEST(MoveIndexDoesNonExisted) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, + TTestEnvOptions() + .EnableAsyncIndexes(true)); + ui64 txId = 100; + + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value0" Type: "Utf8" } + Columns { Name: "value1" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Sync" + KeyColumnNames: ["value0"] + } + IndexDescription { + Name: "Async" + KeyColumnNames: ["value1"] + Type: EIndexTypeGlobalAsync + } + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePath(runtime, "/MyRoot"), + {NLs::ChildrenCount(1), + NLs::PathsInsideDomain(5), + NLs::ShardsInsideDomain(3)}); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), + {NLs::IsTable, + NLs::PathVersionEqual(3), + NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"}), + NLs::IndexesCount(2)}); + + TestMoveIndex(runtime, ++txId, "/MyRoot/Table", "BlaBla", "Sync", {NKikimrScheme::StatusPathDoesNotExist}); + env.TestWaitNotification(runtime, txId); + + TestMoveIndex(runtime, ++txId, "/MyRoot/TableBlaBla", "Async", "Async", {NKikimrScheme::StatusPathDoesNotExist}); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), + {NLs::IsTable, + NLs::PathVersionEqual(3), + NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"})}); + + TestDescribeResult(DescribePath(runtime, "/MyRoot"), + {NLs::ChildrenCount(1), + NLs::PathsInsideDomain(5), + NLs::ShardsInsideDomain(3)}); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/Sync", true, true, true), + {NLs::PathExist, + NLs::IndexType(NKikimrSchemeOp::EIndexTypeGlobal), + NLs::IndexKeys({"value0"}), + NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)}); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/Async", true, true, true), + {NLs::PathExist, + NLs::IndexType(NKikimrSchemeOp::EIndexTypeGlobalAsync), + NLs::IndexKeys({"value1"}), + NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)}); + } + + Y_UNIT_TEST(MoveIntoBuildingIndex) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, + TTestEnvOptions() + .EnableAsyncIndexes(true)); + ui64 txId = 100; + + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value0" Type: "Utf8" } + Columns { Name: "value1" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "SomeIndex" + KeyColumnNames: ["value1"] + } + )"); + env.TestWaitNotification(runtime, txId); + + AsyncBuilIndex(runtime, ++txId, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/Table", "Sync", {"value0"}); + + TVector<THolder<IEventHandle>> suppressed; + auto id = txId; + + auto observer = SetSuppressObserver(runtime, suppressed, TEvDataShard::TEvBuildIndexCreateRequest::EventType); + + WaitForSuppressed(runtime, suppressed, 1, observer); + + { + TestMoveIndex(runtime, ++txId, "/MyRoot/Table", "Sync", "MovedSync", {NKikimrScheme::StatusMultipleModifications}); + env.TestWaitNotification(runtime, txId); + } + + { + TestMoveIndex(runtime, ++txId, "/MyRoot/Table", "SomeIndex", "Sync", {NKikimrScheme::StatusMultipleModifications}); + env.TestWaitNotification(runtime, txId); + } + + for (auto &msg : suppressed) { + runtime.Send(msg.Release()); + } + + suppressed.clear(); + + env.TestWaitNotification(runtime, id); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), + {NLs::IsTable, + NLs::PathVersionEqual(6), + NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"}), + NLs::IndexesCount(2)}); + } + Y_UNIT_TEST(AsyncIndexWithSyncInFly) { TTestBasicRuntime runtime; TTestEnv env(runtime, diff --git a/ydb/core/tx/schemeshard/ut_move_reboots.cpp b/ydb/core/tx/schemeshard/ut_move_reboots.cpp index 7edaea585a..6a90c1bc4f 100644 --- a/ydb/core/tx/schemeshard/ut_move_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_move_reboots.cpp @@ -208,6 +208,114 @@ Y_UNIT_TEST_SUITE(TSchemeShardMoveRebootsTest) { }); } + Y_UNIT_TEST(MoveIndex) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + TPathVersion pathVersion; + { + TInactiveZone inactive(activeZone); + TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value0" Type: "Utf8" } + Columns { Name: "value1" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Sync" + KeyColumnNames: ["value0"] + } + IndexDescription { + Name: "Async" + KeyColumnNames: ["value1"] + Type: EIndexTypeGlobalAsync + } + )"); + t.TestEnv->TestWaitNotification(runtime, {t.TxId, t.TxId - 1}); + pathVersion = TestDescribeResult(DescribePath(runtime, "/MyRoot"), + {NLs::PathExist}); + } + + TestMoveIndex(runtime, ++t.TxId, "/MyRoot/Table", "Sync", "MovedSync"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + TestMoveIndex(runtime, ++t.TxId, "/MyRoot/Table", "Async", "MovedAsync"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + { + TInactiveZone inactive(activeZone); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), + {NLs::IsTable, + NLs::PathVersionEqual(5), + NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"})}); + + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/MovedSync", true, true, true), + {NLs::PathExist, + NLs::IndexType(NKikimrSchemeOp::EIndexTypeGlobal), + NLs::IndexKeys({"value0"}), + NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)}); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/MovedAsync", true, true, true), + {NLs::PathExist, + NLs::IndexType(NKikimrSchemeOp::EIndexTypeGlobalAsync), + NLs::IndexKeys({"value1"}), + NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)}); + } + }); + } + + Y_UNIT_TEST(ReplaceIndex) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + TPathVersion pathVersion; + { + TInactiveZone inactive(activeZone); + TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value0" Type: "Utf8" } + Columns { Name: "value1" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Sync" + KeyColumnNames: ["value0"] + } + IndexDescription { + Name: "Sync1" + KeyColumnNames: ["value1"] + } + )"); + t.TestEnv->TestWaitNotification(runtime, {t.TxId, t.TxId - 1}); + pathVersion = TestDescribeResult(DescribePath(runtime, "/MyRoot"), + {NLs::PathExist}); + } + + TestMoveIndex(runtime, ++t.TxId, "/MyRoot/Table", "Sync", "Sync1"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), + {NLs::IsTable, + NLs::PathVersionEqual(5), + NLs::CheckColumns("Table", {"key", "value0", "value1", "valueFloat"}, {}, {"key"})}); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/Sync1", true, true, true), + {NLs::PathExist, + NLs::IndexKeys({"value0"}), + NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)}); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table/Sync", true, true, true), + {NLs::PathNotExist}); + } + }); + } + Y_UNIT_TEST(Replace) { TTestWithReboots t(true); t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { @@ -240,7 +348,6 @@ Y_UNIT_TEST_SUITE(TSchemeShardMoveRebootsTest) { )"); t.TestEnv->TestWaitNotification(runtime, {t.TxId, t.TxId - 1}); - pathVersion = TestDescribeResult(DescribePath(runtime, "/MyRoot"), {NLs::PathExist}); } diff --git a/ydb/core/tx/tx_proxy/schemereq.cpp b/ydb/core/tx/tx_proxy/schemereq.cpp index 0fc9083f6e..ff29572d94 100644 --- a/ydb/core/tx/tx_proxy/schemereq.cpp +++ b/ydb/core/tx/tx_proxy/schemereq.cpp @@ -301,6 +301,9 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> { case NKikimrSchemeOp::ESchemeOpMoveTableIndex: Y_FAIL("no implementation for ESchemeOpMoveTableIndex"); + case NKikimrSchemeOp::ESchemeOpMoveIndex: + Y_FAIL("no implementation for ESchemeOpMoveIndex"); + case NKikimrSchemeOp::ESchemeOpCreateSequence: case NKikimrSchemeOp::ESchemeOpAlterSequence: return *modifyScheme.MutableSequence()->MutableName(); @@ -670,6 +673,16 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> { } break; } + case NKikimrSchemeOp::ESchemeOpMoveIndex: { + auto& descr = pbModifyScheme.GetMoveIndex(); + { + auto toResolve = TPathToResolve(pbModifyScheme.GetOperationType()); + toResolve.Path = SplitPath(descr.GetTablePath()); + toResolve.RequiredAccess = NACLib::EAccessRights::AlterSchema; + ResolveForACL.push_back(toResolve); + } + break; + } case NKikimrSchemeOp::ESchemeOpMkDir: { auto toResolve = TPathToResolve(pbModifyScheme.GetOperationType()); |