diff options
| author | Ilnaz Nizametdinov <[email protected]> | 2022-04-08 15:28:13 +0300 |
|---|---|---|
| committer | Ilnaz Nizametdinov <[email protected]> | 2022-04-08 15:28:13 +0300 |
| commit | 5d3536387c9c9efffb378f4821b0e9add3df1cb4 (patch) | |
| tree | e8ff78af09c1337eaa585708256b3edd46aa956e | |
| parent | 07343356d2608e3663009ff48811e67557b24957 (diff) | |
Drop index info at main table upon cancellation KIKIMR-14664
ref:244254b2154f375f1f98473980d9992e6cc02a9e
| -rw-r--r-- | ydb/core/protos/flat_scheme_op.proto | 14 | ||||
| -rw-r--r-- | ydb/core/protos/tx_datashard.proto | 2 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 97 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.cpp | 17 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.h | 5 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/drop_index_notice_unit.cpp | 6 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/finalize_build_index_unit.cpp | 41 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/initiate_build_index_unit.cpp | 11 | ||||
| -rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__init.cpp | 6 | ||||
| -rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp | 10 | ||||
| -rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.cpp | 5 | ||||
| -rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_tx_infly.h | 1 |
13 files changed, 192 insertions, 25 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index ce65302fa1c..e26c0bd845c 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -13,6 +13,7 @@ import "ydb/public/api/protos/ydb_coordination.proto"; import "ydb/public/api/protos/ydb_export.proto"; import "ydb/library/mkql_proto/protos/minikql.proto"; +import "google/protobuf/empty.proto"; package NKikimrSchemeOp; option java_package = "ru.yandex.kikimr.proto"; @@ -802,10 +803,23 @@ message TInitiateBuildIndexMainTable { optional string TableName = 1; } +message TBuildIndexOutcome { + message TCancel { + // Path id of the index whose creation was cancelled + optional NKikimrProto.TPathID IndexPathId = 1; + } + + oneof Result { + google.protobuf.Empty Apply = 1; + TCancel Cancel = 2; + } +} + message TFinalizeBuildIndexMainTable { optional string TableName = 1; optional uint64 SnapshotTxId = 2; optional uint64 BuildIndexId = 3; + optional TBuildIndexOutcome Outcome = 4; } message TCopyTableConfig { //TTableDescription implemets copying a table in original and full way diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index e197ecf4662..fd3b16512fe 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -341,6 +341,8 @@ message TFinalizeBuildIndex { optional uint64 TableSchemaVersion = 4; optional uint64 BuildIndexId = 5; + + optional NKikimrSchemeOp.TBuildIndexOutcome Outcome = 6; } message TDropIndexNotice { diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 74aa9ac3322..eb1df20a3a7 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -341,6 +341,103 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { UNIT_ASSERT_VALUES_EQUAL(enqueued.size(), removed.size()); } + Y_UNIT_TEST(ShouldRemoveRecordsAfterCancelIndexBuild) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings + .SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableDataColumnForIndexTable(true) + .SetEnableAsyncIndexes(true); + + TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + const TActorId sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::CHANGE_EXCHANGE, NLog::PRI_DEBUG); + InitRoot(server, sender); + + TVector<THolder<IEventHandle>> delayed; + bool inited = false; + runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvChangeExchange::EvEnqueueRecords: + delayed.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + + case TEvDataShard::EvBuildIndexCreateRequest: + inited = true; + return TTestActorRuntime::EEventAction::DROP; + + default: + return TTestActorRuntime::EEventAction::PROCESS; + } + }); + + CreateShardedTable(server, sender, "/Root", "Table", TableWoIndexes()); + ExecSQL(server, sender, R"(INSERT INTO `/Root/Table` (pkey, ikey) VALUES + (1, 10), + (2, 20), + (3, 30); + )"); + + const auto buildIndexId = AsyncAlterAddIndex(server, "/Root", "/Root/Table", SimpleAsyncIndex()); + if (!inited) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&inited](IEventHandle&) { + return inited; + }); + runtime.DispatchEvents(opts); + } + + ExecSQL(server, sender, "INSERT INTO `/Root/Table` (pkey, ikey) VALUES (4, 40);"); + if (delayed.empty()) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&delayed](IEventHandle&) { + return !delayed.empty(); + }); + runtime.DispatchEvents(opts); + } + + CancelAddIndex(server, "/Root", buildIndexId); + WaitTxNotification(server, sender, buildIndexId); + + THashSet<ui64> enqueued; + THashSet<ui64> removed; + runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvChangeExchange::EvEnqueueRecords: + for (const auto& record : ev->Get<TEvChangeExchange::TEvEnqueueRecords>()->Records) { + enqueued.insert(record.Order); + } + break; + + case TEvChangeExchange::EvRemoveRecords: + for (const auto& record : ev->Get<TEvChangeExchange::TEvRemoveRecords>()->Records) { + removed.insert(record); + } + break; + + default: + break; + } + return TTestActorRuntime::EEventAction::PROCESS; + }); + + for (auto& ev : std::exchange(delayed, TVector<THolder<IEventHandle>>())) { + runtime.Send(ev.Release(), 0, true); + } + + if (removed.empty()) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&enqueued, &removed](IEventHandle&) { + return removed && enqueued == removed; + }); + runtime.DispatchEvents(opts); + } + } + void WaitForContent(TServer::TPtr server, const TString& tablePath, const TString& expected) { while (true) { auto content = ReadShardedTable(server, tablePath); diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index 820d7f2d2c1..c30c35d6963 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -1656,6 +1656,23 @@ ui64 AsyncAlterAddIndex( return txId; } +void CancelAddIndex(Tests::TServer::TPtr server, const TString& dbName, ui64 buildIndexId) { + auto &runtime = *server->GetRuntime(); + auto &settings = server->GetSettings(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.Send(new IEventHandle(MakeTxProxyID(), sender, new TEvTxUserProxy::TEvAllocateTxId())); + auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvAllocateTxIdResult>(sender); + const auto txId = ev->Get()->TxId; + + auto req = MakeHolder<TEvIndexBuilder::TEvCancelRequest>(txId, dbName, buildIndexId); + auto tabletId = ChangeStateStorage(SchemeRoot, settings.Domain); + runtime.SendToPipe(tabletId, sender, req.Release(), 0, GetPipeConfigWithRetries()); + + auto resp = runtime.GrabEdgeEventRethrow<TEvIndexBuilder::TEvCancelResponse>(sender); + UNIT_ASSERT_EQUAL(resp->Get()->Record.GetStatus(), Ydb::StatusIds::SUCCESS); +} + ui64 AsyncAlterDropIndex( Tests::TServer::TPtr server, const TString& workingDir, diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index 7f903afe5f6..352c94e1b82 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -568,6 +568,11 @@ ui64 AsyncAlterAddIndex( const TString& tablePath, const TShardedTableOptions::TIndex& indexDesc); +void CancelAddIndex( + Tests::TServer::TPtr server, + const TString& dbName, + ui64 buildIndexId); + ui64 AsyncAlterDropIndex( Tests::TServer::TPtr server, const TString& workingDir, diff --git a/ydb/core/tx/datashard/drop_index_notice_unit.cpp b/ydb/core/tx/datashard/drop_index_notice_unit.cpp index d1c877dccf4..5960fad0321 100644 --- a/ydb/core/tx/datashard/drop_index_notice_unit.cpp +++ b/ydb/core/tx/datashard/drop_index_notice_unit.cpp @@ -30,7 +30,7 @@ public: const auto& params = schemeTx.GetDropIndexNotice(); - auto pathId = TPathId(params.GetPathId().GetOwnerId(), params.GetPathId().GetLocalId()); + const auto pathId = PathIdFromPathId(params.GetPathId()); Y_VERIFY(pathId.OwnerId == DataShard.GetPathOwnerId()); const auto version = params.GetTableSchemaVersion(); @@ -38,13 +38,13 @@ public: TUserTable::TPtr tableInfo; if (params.HasIndexPathId()) { - auto indexPathId = TPathId(params.GetIndexPathId().GetOwnerId(), params.GetIndexPathId().GetLocalId()); - const auto& userTables = DataShard.GetUserTables(); Y_VERIFY(userTables.contains(pathId.LocalPathId)); const auto& indexes = userTables.at(pathId.LocalPathId)->Indexes; + auto indexPathId = PathIdFromPathId(params.GetIndexPathId()); auto it = indexes.find(indexPathId); + if (it != indexes.end() && it->second.Type == NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync) { RemoveSender.Reset(new TEvChangeExchange::TEvRemoveSender(indexPathId)); } diff --git a/ydb/core/tx/datashard/finalize_build_index_unit.cpp b/ydb/core/tx/datashard/finalize_build_index_unit.cpp index 778676ec984..c6bf271597d 100644 --- a/ydb/core/tx/datashard/finalize_build_index_unit.cpp +++ b/ydb/core/tx/datashard/finalize_build_index_unit.cpp @@ -6,6 +6,8 @@ namespace NKikimr { namespace NDataShard { class TFinalizeBuildIndexUnit : public TExecutionUnit { + THolder<TEvChangeExchange::TEvRemoveSender> RemoveSender; + public: TFinalizeBuildIndexUnit(TDataShard& dataShard, TPipeline& pipeline) : TExecutionUnit(EExecutionUnitKind::FinalizeBuildIndex, false, dataShard, pipeline) @@ -28,13 +30,31 @@ public: const auto& params = schemeTx.GetFinalizeBuildIndex(); - auto pathId = TPathId(params.GetPathId().GetOwnerId(), params.GetPathId().GetLocalId()); + const auto pathId = PathIdFromPathId(params.GetPathId()); Y_VERIFY(pathId.OwnerId == DataShard.GetPathOwnerId()); const auto version = params.GetTableSchemaVersion(); Y_VERIFY(version); - auto tableInfo = DataShard.AlterTableSchemaVersion(ctx, txc, pathId, version); + TUserTable::TPtr tableInfo; + if (params.HasOutcome() && params.GetOutcome().HasCancel()) { + const auto& userTables = DataShard.GetUserTables(); + Y_VERIFY(userTables.contains(pathId.LocalPathId)); + const auto& indexes = userTables.at(pathId.LocalPathId)->Indexes; + + auto indexPathId = PathIdFromPathId(params.GetOutcome().GetCancel().GetIndexPathId()); + auto it = indexes.find(indexPathId); + + if (it != indexes.end() && it->second.Type == NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync) { + RemoveSender.Reset(new TEvChangeExchange::TEvRemoveSender(indexPathId)); + } + + tableInfo = DataShard.AlterTableDropIndex(ctx, txc, pathId, version, indexPathId); + } else { + tableInfo = DataShard.AlterTableSchemaVersion(ctx, txc, pathId, version); + } + + Y_VERIFY(tableInfo); DataShard.AddUserTable(pathId, tableInfo); if (tableInfo->NeedSchemaSnapshots()) { @@ -45,28 +65,25 @@ public: ui64 txId = params.GetSnapshotTxId(); Y_VERIFY(step != 0); - const TSnapshotKey key(pathId.OwnerId, pathId.LocalPathId, step, txId); - if (DataShard.GetBuildIndexManager().Contains(params.GetBuildIndexId())) { auto record = DataShard.GetBuildIndexManager().Get(params.GetBuildIndexId()); DataShard.CancelScan(tableInfo->LocalTid, record.ScanId); DataShard.GetBuildIndexManager().Drop(params.GetBuildIndexId()); } - bool removed = DataShard.GetSnapshotManager().RemoveSnapshot(txc.DB, key); + const TSnapshotKey key(pathId.OwnerId, pathId.LocalPathId, step, txId); + DataShard.GetSnapshotManager().RemoveSnapshot(txc.DB, key); BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); op->Result()->SetStepOrderId(op->GetStepOrder().ToPair()); - if (removed) { - return EExecutionStatus::ExecutedNoMoreRestarts; - } else { - return EExecutionStatus::Executed; - } + return EExecutionStatus::DelayCompleteNoMoreRestarts; } - void Complete(TOperation::TPtr, const TActorContext&) override { - // nothing + void Complete(TOperation::TPtr, const TActorContext& ctx) override { + if (RemoveSender) { + ctx.Send(DataShard.GetChangeSender(), RemoveSender.Release()); + } } }; diff --git a/ydb/core/tx/datashard/initiate_build_index_unit.cpp b/ydb/core/tx/datashard/initiate_build_index_unit.cpp index 07e53261e2f..6b521598c01 100644 --- a/ydb/core/tx/datashard/initiate_build_index_unit.cpp +++ b/ydb/core/tx/datashard/initiate_build_index_unit.cpp @@ -64,20 +64,15 @@ public: Y_VERIFY(step != 0); const TSnapshotKey key(pathId.OwnerId, pathId.LocalPathId, step, txId); + const ui64 flags = TSnapshot::FlagScheme; - ui64 flags = TSnapshot::FlagScheme; - - bool added = DataShard.GetSnapshotManager().AddSnapshot( + DataShard.GetSnapshotManager().AddSnapshot( txc.DB, key, params.GetSnapshotName(), flags, TDuration::Zero()); BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); op->Result()->SetStepOrderId(op->GetStepOrder().ToPair()); - if (added) { - return EExecutionStatus::DelayCompleteNoMoreRestarts; - } else { - return EExecutionStatus::DelayComplete; - } + return EExecutionStatus::DelayCompleteNoMoreRestarts; } void Complete(TOperation::TPtr, const TActorContext& ctx) override { diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 3eca1bd828c..2cc91b5366d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -3175,6 +3175,12 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { bool deserializeRes = ParseFromStringNoSizeLimit(*txState.SplitDescription, extraData); Y_VERIFY(deserializeRes); splitOpIds.push_back(operationId); + } else if (txState.TxType == TTxState::TxFinalizeBuildIndex) { + if (!extraData.empty()) { + txState.BuildIndexOutcome = std::make_shared<NKikimrSchemeOp::TBuildIndexOutcome>(); + bool deserializeRes = ParseFromStringNoSizeLimit(*txState.BuildIndexOutcome, extraData); + Y_VERIFY(deserializeRes); + } } else if (txState.TxType == TTxState::TxAlterTable) { if (txState.State <= TTxState::Propose) { // If state is >=Propose then alter has already been applied to the table diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp index c1ac9556511..08b459c4d39 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp @@ -41,6 +41,7 @@ TVector<ISubOperationBase::TPtr> ApplyBuildIndex(TOperationId nextId, const TTxT op->SetTableName(table.LeafName()); op->SetSnapshotTxId(config.GetSnaphotTxId()); op->SetBuildIndexId(config.GetBuildIndexId()); + op->MutableOutcome()->MutableApply(); result.push_back(CreateFinalizeBuildIndexMainTable(NextPartId(nextId, result), finalize)); } @@ -92,6 +93,7 @@ TVector<ISubOperationBase::TPtr> CancelBuildIndex(TOperationId nextId, const TTx op->SetTableName(table.LeafName()); op->SetSnapshotTxId(config.GetSnaphotTxId()); op->SetBuildIndexId(config.GetBuildIndexId()); + PathIdFromPathId(index.Base()->PathId, op->MutableOutcome()->MutableCancel()->MutableIndexPathId()); result.push_back(CreateFinalizeBuildIndexMainTable(NextPartId(nextId, result), finalize)); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp index fe4d116416f..fe5416aa52e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp @@ -72,10 +72,11 @@ public: op->SetSnapshotTxId(ui64(snapshotTxId)); op->SetSnapshotStep(ui64(snapshotStepid)); - op->SetTableSchemaVersion(table->AlterVersion+1); - op->SetBuildIndexId(ui64(txState->BuildIndexId)); + if (txState->BuildIndexOutcome) { + op->MutableOutcome()->CopyFrom(*txState->BuildIndexOutcome); + } context.SS->FillSeqNo(tx, seqNo); @@ -414,6 +415,11 @@ public: TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxFinalizeBuildIndex, tablePathId); txState.BuildIndexId = TTxId(finalizeMainTable.GetBuildIndexId()); + if (finalizeMainTable.HasOutcome()) { + txState.BuildIndexOutcome = std::make_shared<NKikimrSchemeOp::TBuildIndexOutcome>(); + txState.BuildIndexOutcome->CopyFrom(finalizeMainTable.GetOutcome()); + } + context.SS->PersistTxState(db, OperationId); TTableInfo::TPtr table = context.SS->Tables.at(tablePathId); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index ccaf7805d0f..8a4e9a37b16 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1952,6 +1952,11 @@ void TSchemeShard::PersistTxState(NIceDb::TNiceDb& db, const TOperationId opId) Y_VERIFY(txState.SplitDescription, "Split Tx must have non-empty split description"); bool serializeRes = txState.SplitDescription->SerializeToString(&extraData); Y_VERIFY(serializeRes); + } else if (txState.TxType == TTxState::TxFinalizeBuildIndex) { + if (txState.BuildIndexOutcome) { + bool serializeRes = txState.BuildIndexOutcome->SerializeToString(&extraData); + Y_VERIFY(serializeRes); + } } else if (txState.TxType == TTxState::TxAlterTable) { TPathId pathId = txState.TargetPathId; diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h index 41a8045155a..b051205c0f0 100644 --- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h +++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h @@ -240,6 +240,7 @@ struct TTxState { std::shared_ptr<NKikimrTxDataShard::TSplitMergeDescription> SplitDescription; bool TxShardsListFinalized = false; TTxId BuildIndexId; + std::shared_ptr<NKikimrSchemeOp::TBuildIndexOutcome> BuildIndexOutcome; // fields below used for backup/restore bool Cancel = false; THashMap<TShardIdx, TShardStatus> ShardStatuses; |
