summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <[email protected]>2022-04-08 15:28:13 +0300
committerIlnaz Nizametdinov <[email protected]>2022-04-08 15:28:13 +0300
commit5d3536387c9c9efffb378f4821b0e9add3df1cb4 (patch)
treee8ff78af09c1337eaa585708256b3edd46aa956e
parent07343356d2608e3663009ff48811e67557b24957 (diff)
Drop index info at main table upon cancellation KIKIMR-14664
ref:244254b2154f375f1f98473980d9992e6cc02a9e
-rw-r--r--ydb/core/protos/flat_scheme_op.proto14
-rw-r--r--ydb/core/protos/tx_datashard.proto2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp97
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp17
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h5
-rw-r--r--ydb/core/tx/datashard/drop_index_notice_unit.cpp6
-rw-r--r--ydb/core/tx/datashard/finalize_build_index_unit.cpp41
-rw-r--r--ydb/core/tx/datashard/initiate_build_index_unit.cpp11
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp10
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp5
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_tx_infly.h1
13 files changed, 192 insertions, 25 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index ce65302fa1c..e26c0bd845c 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -13,6 +13,7 @@ import "ydb/public/api/protos/ydb_coordination.proto";
import "ydb/public/api/protos/ydb_export.proto";
import "ydb/library/mkql_proto/protos/minikql.proto";
+import "google/protobuf/empty.proto";
package NKikimrSchemeOp;
option java_package = "ru.yandex.kikimr.proto";
@@ -802,10 +803,23 @@ message TInitiateBuildIndexMainTable {
optional string TableName = 1;
}
+message TBuildIndexOutcome {
+ message TCancel {
+ // Path id of the index whose creation was cancelled
+ optional NKikimrProto.TPathID IndexPathId = 1;
+ }
+
+ oneof Result {
+ google.protobuf.Empty Apply = 1;
+ TCancel Cancel = 2;
+ }
+}
+
message TFinalizeBuildIndexMainTable {
optional string TableName = 1;
optional uint64 SnapshotTxId = 2;
optional uint64 BuildIndexId = 3;
+ optional TBuildIndexOutcome Outcome = 4;
}
message TCopyTableConfig { //TTableDescription implemets copying a table in original and full way
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index e197ecf4662..fd3b16512fe 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -341,6 +341,8 @@ message TFinalizeBuildIndex {
optional uint64 TableSchemaVersion = 4;
optional uint64 BuildIndexId = 5;
+
+ optional NKikimrSchemeOp.TBuildIndexOutcome Outcome = 6;
}
message TDropIndexNotice {
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 74aa9ac3322..eb1df20a3a7 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -341,6 +341,103 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
UNIT_ASSERT_VALUES_EQUAL(enqueued.size(), removed.size());
}
+ Y_UNIT_TEST(ShouldRemoveRecordsAfterCancelIndexBuild) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings
+ .SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableDataColumnForIndexTable(true)
+ .SetEnableAsyncIndexes(true);
+
+ TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+ const TActorId sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::CHANGE_EXCHANGE, NLog::PRI_DEBUG);
+ InitRoot(server, sender);
+
+ TVector<THolder<IEventHandle>> delayed;
+ bool inited = false;
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvChangeExchange::EvEnqueueRecords:
+ delayed.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+
+ case TEvDataShard::EvBuildIndexCreateRequest:
+ inited = true;
+ return TTestActorRuntime::EEventAction::DROP;
+
+ default:
+ return TTestActorRuntime::EEventAction::PROCESS;
+ }
+ });
+
+ CreateShardedTable(server, sender, "/Root", "Table", TableWoIndexes());
+ ExecSQL(server, sender, R"(INSERT INTO `/Root/Table` (pkey, ikey) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )");
+
+ const auto buildIndexId = AsyncAlterAddIndex(server, "/Root", "/Root/Table", SimpleAsyncIndex());
+ if (!inited) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&inited](IEventHandle&) {
+ return inited;
+ });
+ runtime.DispatchEvents(opts);
+ }
+
+ ExecSQL(server, sender, "INSERT INTO `/Root/Table` (pkey, ikey) VALUES (4, 40);");
+ if (delayed.empty()) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&delayed](IEventHandle&) {
+ return !delayed.empty();
+ });
+ runtime.DispatchEvents(opts);
+ }
+
+ CancelAddIndex(server, "/Root", buildIndexId);
+ WaitTxNotification(server, sender, buildIndexId);
+
+ THashSet<ui64> enqueued;
+ THashSet<ui64> removed;
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvChangeExchange::EvEnqueueRecords:
+ for (const auto& record : ev->Get<TEvChangeExchange::TEvEnqueueRecords>()->Records) {
+ enqueued.insert(record.Order);
+ }
+ break;
+
+ case TEvChangeExchange::EvRemoveRecords:
+ for (const auto& record : ev->Get<TEvChangeExchange::TEvRemoveRecords>()->Records) {
+ removed.insert(record);
+ }
+ break;
+
+ default:
+ break;
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ for (auto& ev : std::exchange(delayed, TVector<THolder<IEventHandle>>())) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ if (removed.empty()) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&enqueued, &removed](IEventHandle&) {
+ return removed && enqueued == removed;
+ });
+ runtime.DispatchEvents(opts);
+ }
+ }
+
void WaitForContent(TServer::TPtr server, const TString& tablePath, const TString& expected) {
while (true) {
auto content = ReadShardedTable(server, tablePath);
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index 820d7f2d2c1..c30c35d6963 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -1656,6 +1656,23 @@ ui64 AsyncAlterAddIndex(
return txId;
}
+void CancelAddIndex(Tests::TServer::TPtr server, const TString& dbName, ui64 buildIndexId) {
+ auto &runtime = *server->GetRuntime();
+ auto &settings = server->GetSettings();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.Send(new IEventHandle(MakeTxProxyID(), sender, new TEvTxUserProxy::TEvAllocateTxId()));
+ auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvAllocateTxIdResult>(sender);
+ const auto txId = ev->Get()->TxId;
+
+ auto req = MakeHolder<TEvIndexBuilder::TEvCancelRequest>(txId, dbName, buildIndexId);
+ auto tabletId = ChangeStateStorage(SchemeRoot, settings.Domain);
+ runtime.SendToPipe(tabletId, sender, req.Release(), 0, GetPipeConfigWithRetries());
+
+ auto resp = runtime.GrabEdgeEventRethrow<TEvIndexBuilder::TEvCancelResponse>(sender);
+ UNIT_ASSERT_EQUAL(resp->Get()->Record.GetStatus(), Ydb::StatusIds::SUCCESS);
+}
+
ui64 AsyncAlterDropIndex(
Tests::TServer::TPtr server,
const TString& workingDir,
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index 7f903afe5f6..352c94e1b82 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -568,6 +568,11 @@ ui64 AsyncAlterAddIndex(
const TString& tablePath,
const TShardedTableOptions::TIndex& indexDesc);
+void CancelAddIndex(
+ Tests::TServer::TPtr server,
+ const TString& dbName,
+ ui64 buildIndexId);
+
ui64 AsyncAlterDropIndex(
Tests::TServer::TPtr server,
const TString& workingDir,
diff --git a/ydb/core/tx/datashard/drop_index_notice_unit.cpp b/ydb/core/tx/datashard/drop_index_notice_unit.cpp
index d1c877dccf4..5960fad0321 100644
--- a/ydb/core/tx/datashard/drop_index_notice_unit.cpp
+++ b/ydb/core/tx/datashard/drop_index_notice_unit.cpp
@@ -30,7 +30,7 @@ public:
const auto& params = schemeTx.GetDropIndexNotice();
- auto pathId = TPathId(params.GetPathId().GetOwnerId(), params.GetPathId().GetLocalId());
+ const auto pathId = PathIdFromPathId(params.GetPathId());
Y_VERIFY(pathId.OwnerId == DataShard.GetPathOwnerId());
const auto version = params.GetTableSchemaVersion();
@@ -38,13 +38,13 @@ public:
TUserTable::TPtr tableInfo;
if (params.HasIndexPathId()) {
- auto indexPathId = TPathId(params.GetIndexPathId().GetOwnerId(), params.GetIndexPathId().GetLocalId());
-
const auto& userTables = DataShard.GetUserTables();
Y_VERIFY(userTables.contains(pathId.LocalPathId));
const auto& indexes = userTables.at(pathId.LocalPathId)->Indexes;
+ auto indexPathId = PathIdFromPathId(params.GetIndexPathId());
auto it = indexes.find(indexPathId);
+
if (it != indexes.end() && it->second.Type == NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync) {
RemoveSender.Reset(new TEvChangeExchange::TEvRemoveSender(indexPathId));
}
diff --git a/ydb/core/tx/datashard/finalize_build_index_unit.cpp b/ydb/core/tx/datashard/finalize_build_index_unit.cpp
index 778676ec984..c6bf271597d 100644
--- a/ydb/core/tx/datashard/finalize_build_index_unit.cpp
+++ b/ydb/core/tx/datashard/finalize_build_index_unit.cpp
@@ -6,6 +6,8 @@ namespace NKikimr {
namespace NDataShard {
class TFinalizeBuildIndexUnit : public TExecutionUnit {
+ THolder<TEvChangeExchange::TEvRemoveSender> RemoveSender;
+
public:
TFinalizeBuildIndexUnit(TDataShard& dataShard, TPipeline& pipeline)
: TExecutionUnit(EExecutionUnitKind::FinalizeBuildIndex, false, dataShard, pipeline)
@@ -28,13 +30,31 @@ public:
const auto& params = schemeTx.GetFinalizeBuildIndex();
- auto pathId = TPathId(params.GetPathId().GetOwnerId(), params.GetPathId().GetLocalId());
+ const auto pathId = PathIdFromPathId(params.GetPathId());
Y_VERIFY(pathId.OwnerId == DataShard.GetPathOwnerId());
const auto version = params.GetTableSchemaVersion();
Y_VERIFY(version);
- auto tableInfo = DataShard.AlterTableSchemaVersion(ctx, txc, pathId, version);
+ TUserTable::TPtr tableInfo;
+ if (params.HasOutcome() && params.GetOutcome().HasCancel()) {
+ const auto& userTables = DataShard.GetUserTables();
+ Y_VERIFY(userTables.contains(pathId.LocalPathId));
+ const auto& indexes = userTables.at(pathId.LocalPathId)->Indexes;
+
+ auto indexPathId = PathIdFromPathId(params.GetOutcome().GetCancel().GetIndexPathId());
+ auto it = indexes.find(indexPathId);
+
+ if (it != indexes.end() && it->second.Type == NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync) {
+ RemoveSender.Reset(new TEvChangeExchange::TEvRemoveSender(indexPathId));
+ }
+
+ tableInfo = DataShard.AlterTableDropIndex(ctx, txc, pathId, version, indexPathId);
+ } else {
+ tableInfo = DataShard.AlterTableSchemaVersion(ctx, txc, pathId, version);
+ }
+
+ Y_VERIFY(tableInfo);
DataShard.AddUserTable(pathId, tableInfo);
if (tableInfo->NeedSchemaSnapshots()) {
@@ -45,28 +65,25 @@ public:
ui64 txId = params.GetSnapshotTxId();
Y_VERIFY(step != 0);
- const TSnapshotKey key(pathId.OwnerId, pathId.LocalPathId, step, txId);
-
if (DataShard.GetBuildIndexManager().Contains(params.GetBuildIndexId())) {
auto record = DataShard.GetBuildIndexManager().Get(params.GetBuildIndexId());
DataShard.CancelScan(tableInfo->LocalTid, record.ScanId);
DataShard.GetBuildIndexManager().Drop(params.GetBuildIndexId());
}
- bool removed = DataShard.GetSnapshotManager().RemoveSnapshot(txc.DB, key);
+ const TSnapshotKey key(pathId.OwnerId, pathId.LocalPathId, step, txId);
+ DataShard.GetSnapshotManager().RemoveSnapshot(txc.DB, key);
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
op->Result()->SetStepOrderId(op->GetStepOrder().ToPair());
- if (removed) {
- return EExecutionStatus::ExecutedNoMoreRestarts;
- } else {
- return EExecutionStatus::Executed;
- }
+ return EExecutionStatus::DelayCompleteNoMoreRestarts;
}
- void Complete(TOperation::TPtr, const TActorContext&) override {
- // nothing
+ void Complete(TOperation::TPtr, const TActorContext& ctx) override {
+ if (RemoveSender) {
+ ctx.Send(DataShard.GetChangeSender(), RemoveSender.Release());
+ }
}
};
diff --git a/ydb/core/tx/datashard/initiate_build_index_unit.cpp b/ydb/core/tx/datashard/initiate_build_index_unit.cpp
index 07e53261e2f..6b521598c01 100644
--- a/ydb/core/tx/datashard/initiate_build_index_unit.cpp
+++ b/ydb/core/tx/datashard/initiate_build_index_unit.cpp
@@ -64,20 +64,15 @@ public:
Y_VERIFY(step != 0);
const TSnapshotKey key(pathId.OwnerId, pathId.LocalPathId, step, txId);
+ const ui64 flags = TSnapshot::FlagScheme;
- ui64 flags = TSnapshot::FlagScheme;
-
- bool added = DataShard.GetSnapshotManager().AddSnapshot(
+ DataShard.GetSnapshotManager().AddSnapshot(
txc.DB, key, params.GetSnapshotName(), flags, TDuration::Zero());
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
op->Result()->SetStepOrderId(op->GetStepOrder().ToPair());
- if (added) {
- return EExecutionStatus::DelayCompleteNoMoreRestarts;
- } else {
- return EExecutionStatus::DelayComplete;
- }
+ return EExecutionStatus::DelayCompleteNoMoreRestarts;
}
void Complete(TOperation::TPtr, const TActorContext& ctx) override {
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index 3eca1bd828c..2cc91b5366d 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -3175,6 +3175,12 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
bool deserializeRes = ParseFromStringNoSizeLimit(*txState.SplitDescription, extraData);
Y_VERIFY(deserializeRes);
splitOpIds.push_back(operationId);
+ } else if (txState.TxType == TTxState::TxFinalizeBuildIndex) {
+ if (!extraData.empty()) {
+ txState.BuildIndexOutcome = std::make_shared<NKikimrSchemeOp::TBuildIndexOutcome>();
+ bool deserializeRes = ParseFromStringNoSizeLimit(*txState.BuildIndexOutcome, extraData);
+ Y_VERIFY(deserializeRes);
+ }
} else if (txState.TxType == TTxState::TxAlterTable) {
if (txState.State <= TTxState::Propose) {
// If state is >=Propose then alter has already been applied to the table
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp
index c1ac9556511..08b459c4d39 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp
@@ -41,6 +41,7 @@ TVector<ISubOperationBase::TPtr> ApplyBuildIndex(TOperationId nextId, const TTxT
op->SetTableName(table.LeafName());
op->SetSnapshotTxId(config.GetSnaphotTxId());
op->SetBuildIndexId(config.GetBuildIndexId());
+ op->MutableOutcome()->MutableApply();
result.push_back(CreateFinalizeBuildIndexMainTable(NextPartId(nextId, result), finalize));
}
@@ -92,6 +93,7 @@ TVector<ISubOperationBase::TPtr> CancelBuildIndex(TOperationId nextId, const TTx
op->SetTableName(table.LeafName());
op->SetSnapshotTxId(config.GetSnaphotTxId());
op->SetBuildIndexId(config.GetBuildIndexId());
+ PathIdFromPathId(index.Base()->PathId, op->MutableOutcome()->MutableCancel()->MutableIndexPathId());
result.push_back(CreateFinalizeBuildIndexMainTable(NextPartId(nextId, result), finalize));
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp
index fe4d116416f..fe5416aa52e 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp
@@ -72,10 +72,11 @@ public:
op->SetSnapshotTxId(ui64(snapshotTxId));
op->SetSnapshotStep(ui64(snapshotStepid));
-
op->SetTableSchemaVersion(table->AlterVersion+1);
-
op->SetBuildIndexId(ui64(txState->BuildIndexId));
+ if (txState->BuildIndexOutcome) {
+ op->MutableOutcome()->CopyFrom(*txState->BuildIndexOutcome);
+ }
context.SS->FillSeqNo(tx, seqNo);
@@ -414,6 +415,11 @@ public:
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxFinalizeBuildIndex, tablePathId);
txState.BuildIndexId = TTxId(finalizeMainTable.GetBuildIndexId());
+ if (finalizeMainTable.HasOutcome()) {
+ txState.BuildIndexOutcome = std::make_shared<NKikimrSchemeOp::TBuildIndexOutcome>();
+ txState.BuildIndexOutcome->CopyFrom(finalizeMainTable.GetOutcome());
+ }
+
context.SS->PersistTxState(db, OperationId);
TTableInfo::TPtr table = context.SS->Tables.at(tablePathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index ccaf7805d0f..8a4e9a37b16 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -1952,6 +1952,11 @@ void TSchemeShard::PersistTxState(NIceDb::TNiceDb& db, const TOperationId opId)
Y_VERIFY(txState.SplitDescription, "Split Tx must have non-empty split description");
bool serializeRes = txState.SplitDescription->SerializeToString(&extraData);
Y_VERIFY(serializeRes);
+ } else if (txState.TxType == TTxState::TxFinalizeBuildIndex) {
+ if (txState.BuildIndexOutcome) {
+ bool serializeRes = txState.BuildIndexOutcome->SerializeToString(&extraData);
+ Y_VERIFY(serializeRes);
+ }
} else if (txState.TxType == TTxState::TxAlterTable) {
TPathId pathId = txState.TargetPathId;
diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h
index 41a8045155a..b051205c0f0 100644
--- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h
+++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h
@@ -240,6 +240,7 @@ struct TTxState {
std::shared_ptr<NKikimrTxDataShard::TSplitMergeDescription> SplitDescription;
bool TxShardsListFinalized = false;
TTxId BuildIndexId;
+ std::shared_ptr<NKikimrSchemeOp::TBuildIndexOutcome> BuildIndexOutcome;
// fields below used for backup/restore
bool Cancel = false;
THashMap<TShardIdx, TShardStatus> ShardStatuses;