aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-10-04 11:06:30 +0300
committerilnaz <ilnaz@ydb.tech>2022-10-04 11:06:30 +0300
commitfe82ac7c9e6d41a24ec601abf1413ba28e10c067 (patch)
treeddbd912e2705ebd1d8e864796f69c97fd79472e8
parentc639d8e279f133e4a7dd8689aa5d1359107296b9 (diff)
downloadydb-fe82ac7c9e6d41a24ec601abf1413ba28e10c067.tar.gz
Abortable TInitializeBuildIndex
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp18
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_db_changes.cpp30
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_db_changes.h9
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp63
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp26
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.h9
6 files changed, 90 insertions, 65 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp
index a7f7184b945..7e968c2c874 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp
@@ -9,7 +9,7 @@ using namespace NTableIndex;
class TPropose: public TSubOperationState {
private:
- TOperationId OperationId;
+ const TOperationId OperationId;
TString DebugHint() const override {
return TStringBuilder()
@@ -82,7 +82,7 @@ class TCreateTableIndex: public TSubOperation {
}
TTxState::ETxState NextState(TTxState::ETxState state) {
- switch(state) {
+ switch (state) {
case TTxState::Propose:
return TTxState::Done;
default:
@@ -91,7 +91,7 @@ class TCreateTableIndex: public TSubOperation {
}
TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) {
- switch(state) {
+ switch (state) {
case TTxState::Propose:
return THolder(new TPropose(OperationId));
case TTxState::Done:
@@ -113,13 +113,13 @@ class TCreateTableIndex: public TSubOperation {
public:
TCreateTableIndex(TOperationId id, const TTxTransaction& tx)
: OperationId(id)
- , Transaction(tx)
+ , Transaction(tx)
{
}
TCreateTableIndex(TOperationId id, TTxState::ETxState state)
: OperationId(id)
- , State(state)
+ , State(state)
{
SetState(SelectStateFunc(state));
}
@@ -230,6 +230,7 @@ public:
return result;
}
}
+
if (!context.SS->CheckInFlightLimit(TTxState::TxCreateTableIndex, errStr)) {
result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
return result;
@@ -298,11 +299,9 @@ public:
}
};
-}
+} // anonymous namespace
-
-namespace NKikimr {
-namespace NSchemeShard {
+namespace NKikimr::NSchemeShard {
ISubOperationBase::TPtr CreateNewTableIndex(TOperationId id, const TTxTransaction& tx) {
return new TCreateTableIndex(id, tx);
@@ -313,4 +312,3 @@ ISubOperationBase::TPtr CreateNewTableIndex(TOperationId id, TTxState::ETxState
}
}
-}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_db_changes.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_db_changes.cpp
index 0fd238ee181..9d4d08a69c8 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_db_changes.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_db_changes.cpp
@@ -1,48 +1,50 @@
#include "schemeshard__operation_db_changes.h"
-
#include "schemeshard_impl.h"
#include <ydb/core/tx/tx_processing.h>
-namespace NKikimr {
-namespace NSchemeShard {
+namespace NKikimr::NSchemeShard {
void TStorageChanges::Apply(TSchemeShard* ss, NTabletFlatExecutor::TTransactionContext& txc, const TActorContext&) {
NIceDb::TNiceDb db(txc.DB);
- for (const auto& pId: Pathes) {
+ for (const auto& pId : Pathes) {
ss->PersistPath(db, pId);
}
- for (const auto& pId: AlterUserAttrs) {
+ for (const auto& pId : AlterUserAttrs) {
ss->PersistAlterUserAttributes(db, pId);
}
- for (const auto& pId: ApplyUserAttrs) {
+ for (const auto& pId : ApplyUserAttrs) {
ss->ApplyAndPersistUserAttrs(db, pId);
}
- for (const auto& pId: AlterIndexes) {
+ for (const auto& pId : AlterIndexes) {
ss->PersistTableIndexAlterData(db, pId);
}
- for (const auto& pId: ApplyIndexes) {
+ for (const auto& pId : ApplyIndexes) {
ss->PersistTableIndex(db, pId);
}
- for (const auto& pId: AlterCdcStreams) {
+ for (const auto& pId : AlterCdcStreams) {
ss->PersistCdcStreamAlterData(db, pId);
}
- for (const auto& pId: ApplyCdcStreams) {
+ for (const auto& pId : ApplyCdcStreams) {
ss->PersistCdcStream(db, pId);
}
- for (const auto& pId: Tables) {
+ for (const auto& pId : Tables) {
ss->PersistTable(db, pId);
}
- for (const auto& shardIdx: Shards) {
+ for (const auto& [pId, snapshotTxId] : TableSnapshots) {
+ ss->PersistSnapshotTable(db, snapshotTxId, pId);
+ }
+
+ for (const auto& shardIdx : Shards) {
const TShardInfo& shardInfo = ss->ShardInfos.at(shardIdx);
const TPathId& pId = shardInfo.PathId;
const TTableInfo::TPtr tableInfo = ss->Tables.at(pId);
@@ -55,7 +57,7 @@ void TStorageChanges::Apply(TSchemeShard* ss, NTabletFlatExecutor::TTransactionC
}
}
- for (const auto& opId: TxStates) {
+ for (const auto& opId : TxStates) {
ss->PersistTxState(db, opId);
}
@@ -63,6 +65,4 @@ void TStorageChanges::Apply(TSchemeShard* ss, NTabletFlatExecutor::TTransactionC
ss->PersistUpdateNextShardIdx(db);
}
-
-}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_db_changes.h b/ydb/core/tx/schemeshard/schemeshard__operation_db_changes.h
index 84a22a34ef0..6a164985ce0 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_db_changes.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_db_changes.h
@@ -8,8 +8,7 @@
#include <util/generic/ptr.h>
-namespace NKikimr {
-namespace NSchemeShard {
+namespace NKikimr::NSchemeShard {
class TSchemeShard;
@@ -17,6 +16,7 @@ class TStorageChanges: public TSimpleRefCount<TStorageChanges> {
TDeque<TPathId> Pathes;
TDeque<TPathId> Tables;
+ TDeque<std::pair<TPathId, TTxId>> TableSnapshots;
TDeque<TShardIdx> Shards;
@@ -42,6 +42,10 @@ public:
Tables.push_back(pathId);
}
+ void PersistTableSnapshot(const TPathId& pathId, TTxId snapshotTxId) {
+ TableSnapshots.emplace_back(pathId, snapshotTxId);
+ }
+
void PersistAlterUserAttrs(const TPathId& pathId) {
AlterUserAttrs.push_back(pathId);
}
@@ -78,4 +82,3 @@ public:
};
}
-}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp
index 04282249136..54c2df192da 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp
@@ -13,7 +13,7 @@ using namespace NSchemeShard;
class TConfigureParts: public TSubOperationState {
private:
- TOperationId OperationId;
+ const TOperationId OperationId;
TString DebugHint() const override {
return TStringBuilder()
@@ -114,13 +114,12 @@ public:
<< " at schemeshard: " << ssId);
- THolder<TEvDataShard::TEvProposeTransaction> event =
- THolder(new TEvDataShard::TEvProposeTransaction(NKikimrTxDataShard::TX_KIND_SCHEME,
- context.SS->TabletID(),
- context.Ctx.SelfID,
- ui64(OperationId.GetTxId()),
- txBody,
- context.SS->SelectProcessingPrarams(txState->TargetPathId)));
+ auto event = MakeHolder<TEvDataShard::TEvProposeTransaction>(
+ NKikimrTxDataShard::TX_KIND_SCHEME,
+ context.SS->TabletID(), context.Ctx.SelfID,
+ ui64(OperationId.GetTxId()), txBody,
+ context.SS->SelectProcessingPrarams(txState->TargetPathId)
+ );
context.OnComplete.BindMsgToPipe(OperationId, datashardId, shardIdx, event.Release());
}
@@ -130,10 +129,9 @@ public:
}
};
-
class TPropose: public TSubOperationState {
private:
- TOperationId OperationId;
+ const TOperationId OperationId;
TString DebugHint() const override {
return TStringBuilder()
@@ -145,7 +143,10 @@ public:
TPropose(TOperationId id)
: OperationId(id)
{
- IgnoreMessages(DebugHint(), {TEvHive::TEvCreateTabletReply::EventType, TEvDataShard::TEvProposeTransactionResult::EventType});
+ IgnoreMessages(DebugHint(), {
+ TEvHive::TEvCreateTabletReply::EventType,
+ TEvDataShard::TEvProposeTransactionResult::EventType,
+ });
}
bool HandleReply(TEvDataShard::TEvSchemaChanged::TPtr& ev, TOperationContext& context) override {
@@ -215,11 +216,9 @@ public:
}
};
-
-
class TCreateTxShards: public TSubOperationState {
private:
- TOperationId OperationId;
+ const TOperationId OperationId;
TString DebugHint() const override {
return TStringBuilder()
@@ -255,14 +254,12 @@ public:
NIceDb::TNiceDb db(context.GetDB());
-
context.SS->ChangeTxState(db, OperationId, TTxState::ConfigureParts);
return true;
}
};
-
class TInitializeBuildIndex: public TSubOperation {
const TOperationId OperationId;
const TTxTransaction Transaction;
@@ -273,7 +270,7 @@ class TInitializeBuildIndex: public TSubOperation {
}
TTxState::ETxState NextState(TTxState::ETxState state) {
- switch(state) {
+ switch (state) {
case TTxState::Waiting:
case TTxState::CreateParts:
return TTxState::ConfigureParts;
@@ -286,11 +283,10 @@ class TInitializeBuildIndex: public TSubOperation {
default:
return TTxState::Invalid;
}
- return TTxState::Invalid;
}
TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) {
- switch(state) {
+ switch (state) {
case TTxState::Waiting:
case TTxState::CreateParts:
return THolder(new TCreateTxShards(OperationId));
@@ -319,13 +315,13 @@ class TInitializeBuildIndex: public TSubOperation {
public:
TInitializeBuildIndex(TOperationId id, const TTxTransaction& tx)
: OperationId(id)
- , Transaction(tx)
+ , Transaction(tx)
{
}
TInitializeBuildIndex(TOperationId id, TTxState::ETxState state)
: OperationId(id)
- , State(state)
+ , State(state)
{
SetState(SelectStateFunc(state));
}
@@ -425,19 +421,25 @@ public:
result->SetError(TEvSchemeShard::EStatus::StatusSchemeError, errStr);
return result;
}
+
if (!context.SS->CheckInFlightLimit(TTxState::TxInitializeBuildIndex, errStr)) {
result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
return result;
}
- NIceDb::TNiceDb db(context.GetDB());
+ auto guard = context.DbGuard();
+ context.MemChanges.GrabPath(context.SS, tablePathId);
+ context.MemChanges.GrabTableSnapshot(context.SS, tablePathId, OperationId.GetTxId());
+ context.MemChanges.GrabNewTxState(context.SS, OperationId);
+
+ context.DbChanges.PersistTableSnapshot(tablePathId, OperationId.GetTxId());
+ context.DbChanges.PersistTxState(OperationId);
pathEl->LastTxId = OperationId.GetTxId();
pathEl->PathState = NKikimrSchemeOp::EPathState::EPathStateAlter;
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxInitializeBuildIndex, tablePathId);
txState.State = TTxState::CreateParts;
- context.SS->PersistTxState(db, OperationId);
TTableInfo::TPtr table = context.SS->Tables.at(tablePathId);
for (auto splitTx: table->GetSplitOpsInFlight()) {
@@ -446,7 +448,6 @@ public:
context.SS->TablesWithSnaphots.emplace(tablePathId, OperationId.GetTxId());
context.SS->SnapshotTables[OperationId.GetTxId()].insert(tablePathId);
- context.SS->PersistSnapshotTable(db, OperationId.GetTxId(), tablePathId);
context.SS->TabletCounters->Simple()[COUNTER_SNAPSHOTS_COUNT].Add(1);
context.OnComplete.ActivateTx(OperationId);
@@ -456,8 +457,12 @@ public:
return result;
}
- void AbortPropose(TOperationContext&) override {
- Y_FAIL("no AbortPropose for TInitializeBuildIndex");
+ void AbortPropose(TOperationContext& context) override {
+ LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "TInitializeBuildIndex AbortPropose"
+ << ", opId: " << OperationId
+ << ", at schemeshard: " << context.SS->TabletID());
+ context.SS->TabletCounters->Simple()[COUNTER_SNAPSHOTS_COUNT].Sub(1);
}
void AbortUnsafe(TTxId forceDropTxId, TOperationContext& context) override {
@@ -471,10 +476,9 @@ public:
}
};
-}
+} // anonymous namespace
-namespace NKikimr {
-namespace NSchemeShard {
+namespace NKikimr::NSchemeShard {
ISubOperationBase::TPtr CreateInitializeBuildIndexMainTable(TOperationId id, const TTxTransaction& tx) {
return new TInitializeBuildIndex(id, tx);
@@ -486,4 +490,3 @@ ISubOperationBase::TPtr CreateInitializeBuildIndexMainTable(TOperationId id, TTx
}
}
-}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp
index 126d02d850c..0c2c51fdce8 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp
@@ -1,11 +1,9 @@
#include "schemeshard__operation_memory_changes.h"
-
#include "schemeshard_impl.h"
#include <ydb/core/tx/tx_processing.h>
-namespace NKikimr {
-namespace NSchemeShard {
+namespace NKikimr::NSchemeShard {
void TMemoryChanges::GrabNewTxState(TSchemeShard* ss, const TOperationId& op) {
Y_VERIFY(!ss->TxInFlight.contains(op));
@@ -76,6 +74,12 @@ void TMemoryChanges::GrabNewCdcStream(TSchemeShard* ss, const TPathId& pathId) {
CdcStreams.emplace(pathId, nullptr);
}
+void TMemoryChanges::GrabTableSnapshot(TSchemeShard* ss, const TPathId& pathId, TTxId snapshotTxId) {
+ Y_VERIFY(!ss->TablesWithSnaphots.contains(pathId));
+
+ TablesWithSnaphots.emplace(pathId, snapshotTxId);
+}
+
void TMemoryChanges::UnDo(TSchemeShard* ss) {
// be aware of the order of grab & undo ops
// stack is the best way to manage it right
@@ -110,6 +114,21 @@ void TMemoryChanges::UnDo(TSchemeShard* ss) {
CdcStreams.pop();
}
+ while (TablesWithSnaphots) {
+ const auto& [id, snapshotTxId] = TablesWithSnaphots.top();
+
+ ss->TablesWithSnaphots.erase(id);
+ auto it = ss->SnapshotTables.find(snapshotTxId);
+ if (it != ss->SnapshotTables.end()) {
+ it->second.erase(id);
+ if (it->second.empty()) {
+ ss->SnapshotTables.erase(it);
+ }
+ }
+
+ TablesWithSnaphots.pop();
+ }
+
while (Tables) {
const auto& [id, elem] = Tables.top();
if (elem) {
@@ -153,4 +172,3 @@ void TMemoryChanges::UnDo(TSchemeShard* ss) {
}
}
-}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.h b/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.h
index 7bc04e3beb9..b9c2d889e24 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.h
@@ -9,8 +9,7 @@
#include <util/generic/ptr.h>
#include <util/generic/stack.h>
-namespace NKikimr {
-namespace NSchemeShard {
+namespace NKikimr::NSchemeShard {
class TSchemeShard;
@@ -24,6 +23,9 @@ class TMemoryChanges: public TSimpleRefCount<TMemoryChanges> {
using TCdcStreamState = std::pair<TPathId, TCdcStreamInfo::TPtr>;
TStack<TCdcStreamState> CdcStreams;
+ using TTableSnapshotState = std::pair<TPathId, TTxId>;
+ TStack<TTableSnapshotState> TablesWithSnaphots;
+
using TTableState = std::pair<TPathId, TTableInfo::TPtr>;
TStack<TTableState> Tables;
@@ -57,8 +59,9 @@ public:
void GrabNewCdcStream(TSchemeShard* ss, const TPathId& pathId);
+ void GrabTableSnapshot(TSchemeShard* ss, const TPathId& pathId, TTxId snapshotTxId);
+
void UnDo(TSchemeShard* ss);
};
}
-}