diff options
author | ilnaz <ilnaz@ydb.tech> | 2022-10-04 11:06:30 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2022-10-04 11:06:30 +0300 |
commit | fe82ac7c9e6d41a24ec601abf1413ba28e10c067 (patch) | |
tree | ddbd912e2705ebd1d8e864796f69c97fd79472e8 | |
parent | c639d8e279f133e4a7dd8689aa5d1359107296b9 (diff) | |
download | ydb-fe82ac7c9e6d41a24ec601abf1413ba28e10c067.tar.gz |
Abortable TInitializeBuildIndex
6 files changed, 90 insertions, 65 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp index a7f7184b945..7e968c2c874 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp @@ -9,7 +9,7 @@ using namespace NTableIndex; class TPropose: public TSubOperationState { private: - TOperationId OperationId; + const TOperationId OperationId; TString DebugHint() const override { return TStringBuilder() @@ -82,7 +82,7 @@ class TCreateTableIndex: public TSubOperation { } TTxState::ETxState NextState(TTxState::ETxState state) { - switch(state) { + switch (state) { case TTxState::Propose: return TTxState::Done; default: @@ -91,7 +91,7 @@ class TCreateTableIndex: public TSubOperation { } TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) { - switch(state) { + switch (state) { case TTxState::Propose: return THolder(new TPropose(OperationId)); case TTxState::Done: @@ -113,13 +113,13 @@ class TCreateTableIndex: public TSubOperation { public: TCreateTableIndex(TOperationId id, const TTxTransaction& tx) : OperationId(id) - , Transaction(tx) + , Transaction(tx) { } TCreateTableIndex(TOperationId id, TTxState::ETxState state) : OperationId(id) - , State(state) + , State(state) { SetState(SelectStateFunc(state)); } @@ -230,6 +230,7 @@ public: return result; } } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateTableIndex, errStr)) { result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); return result; @@ -298,11 +299,9 @@ public: } }; -} +} // anonymous namespace - -namespace NKikimr { -namespace NSchemeShard { +namespace NKikimr::NSchemeShard { ISubOperationBase::TPtr CreateNewTableIndex(TOperationId id, const TTxTransaction& tx) { return new TCreateTableIndex(id, tx); @@ -313,4 +312,3 @@ ISubOperationBase::TPtr CreateNewTableIndex(TOperationId id, TTxState::ETxState } } -} diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_db_changes.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_db_changes.cpp index 0fd238ee181..9d4d08a69c8 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_db_changes.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_db_changes.cpp @@ -1,48 +1,50 @@ #include "schemeshard__operation_db_changes.h" - #include "schemeshard_impl.h" #include <ydb/core/tx/tx_processing.h> -namespace NKikimr { -namespace NSchemeShard { +namespace NKikimr::NSchemeShard { void TStorageChanges::Apply(TSchemeShard* ss, NTabletFlatExecutor::TTransactionContext& txc, const TActorContext&) { NIceDb::TNiceDb db(txc.DB); - for (const auto& pId: Pathes) { + for (const auto& pId : Pathes) { ss->PersistPath(db, pId); } - for (const auto& pId: AlterUserAttrs) { + for (const auto& pId : AlterUserAttrs) { ss->PersistAlterUserAttributes(db, pId); } - for (const auto& pId: ApplyUserAttrs) { + for (const auto& pId : ApplyUserAttrs) { ss->ApplyAndPersistUserAttrs(db, pId); } - for (const auto& pId: AlterIndexes) { + for (const auto& pId : AlterIndexes) { ss->PersistTableIndexAlterData(db, pId); } - for (const auto& pId: ApplyIndexes) { + for (const auto& pId : ApplyIndexes) { ss->PersistTableIndex(db, pId); } - for (const auto& pId: AlterCdcStreams) { + for (const auto& pId : AlterCdcStreams) { ss->PersistCdcStreamAlterData(db, pId); } - for (const auto& pId: ApplyCdcStreams) { + for (const auto& pId : ApplyCdcStreams) { ss->PersistCdcStream(db, pId); } - for (const auto& pId: Tables) { + for (const auto& pId : Tables) { ss->PersistTable(db, pId); } - for (const auto& shardIdx: Shards) { + for (const auto& [pId, snapshotTxId] : TableSnapshots) { + ss->PersistSnapshotTable(db, snapshotTxId, pId); + } + + for (const auto& shardIdx : Shards) { const TShardInfo& shardInfo = ss->ShardInfos.at(shardIdx); const TPathId& pId = shardInfo.PathId; const TTableInfo::TPtr tableInfo = ss->Tables.at(pId); @@ -55,7 +57,7 @@ void TStorageChanges::Apply(TSchemeShard* ss, NTabletFlatExecutor::TTransactionC } } - for (const auto& opId: TxStates) { + for (const auto& opId : TxStates) { ss->PersistTxState(db, opId); } @@ -63,6 +65,4 @@ void TStorageChanges::Apply(TSchemeShard* ss, NTabletFlatExecutor::TTransactionC ss->PersistUpdateNextShardIdx(db); } - -} } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_db_changes.h b/ydb/core/tx/schemeshard/schemeshard__operation_db_changes.h index 84a22a34ef0..6a164985ce0 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_db_changes.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_db_changes.h @@ -8,8 +8,7 @@ #include <util/generic/ptr.h> -namespace NKikimr { -namespace NSchemeShard { +namespace NKikimr::NSchemeShard { class TSchemeShard; @@ -17,6 +16,7 @@ class TStorageChanges: public TSimpleRefCount<TStorageChanges> { TDeque<TPathId> Pathes; TDeque<TPathId> Tables; + TDeque<std::pair<TPathId, TTxId>> TableSnapshots; TDeque<TShardIdx> Shards; @@ -42,6 +42,10 @@ public: Tables.push_back(pathId); } + void PersistTableSnapshot(const TPathId& pathId, TTxId snapshotTxId) { + TableSnapshots.emplace_back(pathId, snapshotTxId); + } + void PersistAlterUserAttrs(const TPathId& pathId) { AlterUserAttrs.push_back(pathId); } @@ -78,4 +82,3 @@ public: }; } -} diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp index 04282249136..54c2df192da 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp @@ -13,7 +13,7 @@ using namespace NSchemeShard; class TConfigureParts: public TSubOperationState { private: - TOperationId OperationId; + const TOperationId OperationId; TString DebugHint() const override { return TStringBuilder() @@ -114,13 +114,12 @@ public: << " at schemeshard: " << ssId); - THolder<TEvDataShard::TEvProposeTransaction> event = - THolder(new TEvDataShard::TEvProposeTransaction(NKikimrTxDataShard::TX_KIND_SCHEME, - context.SS->TabletID(), - context.Ctx.SelfID, - ui64(OperationId.GetTxId()), - txBody, - context.SS->SelectProcessingPrarams(txState->TargetPathId))); + auto event = MakeHolder<TEvDataShard::TEvProposeTransaction>( + NKikimrTxDataShard::TX_KIND_SCHEME, + context.SS->TabletID(), context.Ctx.SelfID, + ui64(OperationId.GetTxId()), txBody, + context.SS->SelectProcessingPrarams(txState->TargetPathId) + ); context.OnComplete.BindMsgToPipe(OperationId, datashardId, shardIdx, event.Release()); } @@ -130,10 +129,9 @@ public: } }; - class TPropose: public TSubOperationState { private: - TOperationId OperationId; + const TOperationId OperationId; TString DebugHint() const override { return TStringBuilder() @@ -145,7 +143,10 @@ public: TPropose(TOperationId id) : OperationId(id) { - IgnoreMessages(DebugHint(), {TEvHive::TEvCreateTabletReply::EventType, TEvDataShard::TEvProposeTransactionResult::EventType}); + IgnoreMessages(DebugHint(), { + TEvHive::TEvCreateTabletReply::EventType, + TEvDataShard::TEvProposeTransactionResult::EventType, + }); } bool HandleReply(TEvDataShard::TEvSchemaChanged::TPtr& ev, TOperationContext& context) override { @@ -215,11 +216,9 @@ public: } }; - - class TCreateTxShards: public TSubOperationState { private: - TOperationId OperationId; + const TOperationId OperationId; TString DebugHint() const override { return TStringBuilder() @@ -255,14 +254,12 @@ public: NIceDb::TNiceDb db(context.GetDB()); - context.SS->ChangeTxState(db, OperationId, TTxState::ConfigureParts); return true; } }; - class TInitializeBuildIndex: public TSubOperation { const TOperationId OperationId; const TTxTransaction Transaction; @@ -273,7 +270,7 @@ class TInitializeBuildIndex: public TSubOperation { } TTxState::ETxState NextState(TTxState::ETxState state) { - switch(state) { + switch (state) { case TTxState::Waiting: case TTxState::CreateParts: return TTxState::ConfigureParts; @@ -286,11 +283,10 @@ class TInitializeBuildIndex: public TSubOperation { default: return TTxState::Invalid; } - return TTxState::Invalid; } TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) { - switch(state) { + switch (state) { case TTxState::Waiting: case TTxState::CreateParts: return THolder(new TCreateTxShards(OperationId)); @@ -319,13 +315,13 @@ class TInitializeBuildIndex: public TSubOperation { public: TInitializeBuildIndex(TOperationId id, const TTxTransaction& tx) : OperationId(id) - , Transaction(tx) + , Transaction(tx) { } TInitializeBuildIndex(TOperationId id, TTxState::ETxState state) : OperationId(id) - , State(state) + , State(state) { SetState(SelectStateFunc(state)); } @@ -425,19 +421,25 @@ public: result->SetError(TEvSchemeShard::EStatus::StatusSchemeError, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxInitializeBuildIndex, errStr)) { result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); return result; } - NIceDb::TNiceDb db(context.GetDB()); + auto guard = context.DbGuard(); + context.MemChanges.GrabPath(context.SS, tablePathId); + context.MemChanges.GrabTableSnapshot(context.SS, tablePathId, OperationId.GetTxId()); + context.MemChanges.GrabNewTxState(context.SS, OperationId); + + context.DbChanges.PersistTableSnapshot(tablePathId, OperationId.GetTxId()); + context.DbChanges.PersistTxState(OperationId); pathEl->LastTxId = OperationId.GetTxId(); pathEl->PathState = NKikimrSchemeOp::EPathState::EPathStateAlter; TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxInitializeBuildIndex, tablePathId); txState.State = TTxState::CreateParts; - context.SS->PersistTxState(db, OperationId); TTableInfo::TPtr table = context.SS->Tables.at(tablePathId); for (auto splitTx: table->GetSplitOpsInFlight()) { @@ -446,7 +448,6 @@ public: context.SS->TablesWithSnaphots.emplace(tablePathId, OperationId.GetTxId()); context.SS->SnapshotTables[OperationId.GetTxId()].insert(tablePathId); - context.SS->PersistSnapshotTable(db, OperationId.GetTxId(), tablePathId); context.SS->TabletCounters->Simple()[COUNTER_SNAPSHOTS_COUNT].Add(1); context.OnComplete.ActivateTx(OperationId); @@ -456,8 +457,12 @@ public: return result; } - void AbortPropose(TOperationContext&) override { - Y_FAIL("no AbortPropose for TInitializeBuildIndex"); + void AbortPropose(TOperationContext& context) override { + LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TInitializeBuildIndex AbortPropose" + << ", opId: " << OperationId + << ", at schemeshard: " << context.SS->TabletID()); + context.SS->TabletCounters->Simple()[COUNTER_SNAPSHOTS_COUNT].Sub(1); } void AbortUnsafe(TTxId forceDropTxId, TOperationContext& context) override { @@ -471,10 +476,9 @@ public: } }; -} +} // anonymous namespace -namespace NKikimr { -namespace NSchemeShard { +namespace NKikimr::NSchemeShard { ISubOperationBase::TPtr CreateInitializeBuildIndexMainTable(TOperationId id, const TTxTransaction& tx) { return new TInitializeBuildIndex(id, tx); @@ -486,4 +490,3 @@ ISubOperationBase::TPtr CreateInitializeBuildIndexMainTable(TOperationId id, TTx } } -} diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp index 126d02d850c..0c2c51fdce8 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp @@ -1,11 +1,9 @@ #include "schemeshard__operation_memory_changes.h" - #include "schemeshard_impl.h" #include <ydb/core/tx/tx_processing.h> -namespace NKikimr { -namespace NSchemeShard { +namespace NKikimr::NSchemeShard { void TMemoryChanges::GrabNewTxState(TSchemeShard* ss, const TOperationId& op) { Y_VERIFY(!ss->TxInFlight.contains(op)); @@ -76,6 +74,12 @@ void TMemoryChanges::GrabNewCdcStream(TSchemeShard* ss, const TPathId& pathId) { CdcStreams.emplace(pathId, nullptr); } +void TMemoryChanges::GrabTableSnapshot(TSchemeShard* ss, const TPathId& pathId, TTxId snapshotTxId) { + Y_VERIFY(!ss->TablesWithSnaphots.contains(pathId)); + + TablesWithSnaphots.emplace(pathId, snapshotTxId); +} + void TMemoryChanges::UnDo(TSchemeShard* ss) { // be aware of the order of grab & undo ops // stack is the best way to manage it right @@ -110,6 +114,21 @@ void TMemoryChanges::UnDo(TSchemeShard* ss) { CdcStreams.pop(); } + while (TablesWithSnaphots) { + const auto& [id, snapshotTxId] = TablesWithSnaphots.top(); + + ss->TablesWithSnaphots.erase(id); + auto it = ss->SnapshotTables.find(snapshotTxId); + if (it != ss->SnapshotTables.end()) { + it->second.erase(id); + if (it->second.empty()) { + ss->SnapshotTables.erase(it); + } + } + + TablesWithSnaphots.pop(); + } + while (Tables) { const auto& [id, elem] = Tables.top(); if (elem) { @@ -153,4 +172,3 @@ void TMemoryChanges::UnDo(TSchemeShard* ss) { } } -} diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.h b/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.h index 7bc04e3beb9..b9c2d889e24 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.h @@ -9,8 +9,7 @@ #include <util/generic/ptr.h> #include <util/generic/stack.h> -namespace NKikimr { -namespace NSchemeShard { +namespace NKikimr::NSchemeShard { class TSchemeShard; @@ -24,6 +23,9 @@ class TMemoryChanges: public TSimpleRefCount<TMemoryChanges> { using TCdcStreamState = std::pair<TPathId, TCdcStreamInfo::TPtr>; TStack<TCdcStreamState> CdcStreams; + using TTableSnapshotState = std::pair<TPathId, TTxId>; + TStack<TTableSnapshotState> TablesWithSnaphots; + using TTableState = std::pair<TPathId, TTableInfo::TPtr>; TStack<TTableState> Tables; @@ -57,8 +59,9 @@ public: void GrabNewCdcStream(TSchemeShard* ss, const TPathId& pathId); + void GrabTableSnapshot(TSchemeShard* ss, const TPathId& pathId, TTxId snapshotTxId); + void UnDo(TSchemeShard* ss); }; } -} |