aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-01-17 12:40:33 +0300
committerilnaz <ilnaz@ydb.tech>2023-01-17 12:40:33 +0300
commit4dd5d15a4ccd02507d72968f7d585ed79c442c51 (patch)
tree6e592cf68208ce582724d2f4806bb8ba2863bb51
parentb993b615fddb6d35247ff92539bed3afe7975e11 (diff)
downloadydb-4dd5d15a4ccd02507d72968f7d585ed79c442c51.tar.gz
Drop lock & snapshot during stream deletion
-rw-r--r--ydb/core/protos/counters_schemeshard.proto2
-rw-r--r--ydb/core/protos/tx_datashard.proto1
-rw-r--r--ydb/core/tx/datashard/drop_cdc_stream_unit.cpp8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp36
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.h36
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp108
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_part.h4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_tx_infly.h4
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream.cpp330
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp39
13 files changed, 400 insertions, 175 deletions
diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto
index ee279130b3..f3bc4c20bb 100644
--- a/ydb/core/protos/counters_schemeshard.proto
+++ b/ydb/core/protos/counters_schemeshard.proto
@@ -172,6 +172,7 @@ enum ESimpleCounters {
COUNTER_IN_FLIGHT_OPS_TxCreateCdcStreamAtTableWithInitialScan = 139 [(CounterOpts) = {Name: "InFlightOps/CreateCdcStreamAtTableWithInitialScan"}];
COUNTER_IN_FLIGHT_OPS_TxAlterExtSubDomainCreateHive = 140 [(CounterOpts) = {Name: "InFlightOps/AlterExtSubDomainCreateHive"}];
COUNTER_IN_FLIGHT_OPS_TxAlterCdcStreamAtTableDropSnapshot = 141 [(CounterOpts) = {Name: "InFlightOps/AlterCdcStreamAtTableDropSnapshot"}];
+ COUNTER_IN_FLIGHT_OPS_TxDropCdcStreamAtTableDropSnapshot = 142 [(CounterOpts) = {Name: "InFlightOps/DropCdcStreamAtTableDropSnapshot"}];
}
enum ECumulativeCounters {
@@ -277,6 +278,7 @@ enum ECumulativeCounters {
COUNTER_FINISHED_OPS_TxCreateCdcStreamAtTableWithInitialScan = 84 [(CounterOpts) = {Name: "FinishedOps/CreateCdcStreamAtTableWithInitialScan"}];
COUNTER_FINISHED_OPS_TxAlterExtSubDomainCreateHive = 85 [(CounterOpts) = {Name: "FinishedOps/AlterExtSubDomainCreateHive"}];
COUNTER_FINISHED_OPS_TxAlterCdcStreamAtTableDropSnapshot = 86 [(CounterOpts) = {Name: "FinishedOps/AlterCdcStreamAtTableDropSnapshot"}];
+ COUNTER_FINISHED_OPS_TxDropCdcStreamAtTableDropSnapshot = 87 [(CounterOpts) = {Name: "FinishedOps/DropCdcStreamAtTableDropSnapshot"}];
}
enum EPercentileCounters {
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 6d25d172da..ed3a9e25a3 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -418,6 +418,7 @@ message TDropCdcStreamNotice {
optional NKikimrProto.TPathID PathId = 1;
optional uint64 TableSchemaVersion = 2;
optional NKikimrProto.TPathID StreamPathId = 3;
+ optional TSnapshot DropSnapshot = 4;
}
message TAsyncIndexInfo {
diff --git a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
index 67b676e4e5..2788930a5f 100644
--- a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
+++ b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
@@ -46,6 +46,14 @@ public:
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
}
+ if (params.HasDropSnapshot()) {
+ const auto& snapshot = params.GetDropSnapshot();
+ Y_VERIFY(snapshot.GetStep() != 0);
+
+ const TSnapshotKey key(pathId.OwnerId, pathId.LocalPathId, snapshot.GetStep(), snapshot.GetTxId());
+ DataShard.GetSnapshotManager().RemoveSnapshot(txc.DB, key);
+ }
+
RemoveSender.Reset(new TEvChangeExchange::TEvRemoveSender(streamPathId));
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp
index 3514444a26..bb83d34e90 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp
@@ -969,7 +969,9 @@ ISubOperationBase::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxSta
case TTxState::ETxType::TxDropCdcStream:
return CreateDropCdcStreamImpl(NextPartId(), txState);
case TTxState::ETxType::TxDropCdcStreamAtTable:
- return CreateDropCdcStreamAtTable(NextPartId(), txState);
+ return CreateDropCdcStreamAtTable(NextPartId(), txState, false);
+ case TTxState::ETxType::TxDropCdcStreamAtTableDropSnapshot:
+ return CreateDropCdcStreamAtTable(NextPartId(), txState, true);
// Sequences
case TTxState::ETxType::TxCreateSequence:
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
index 2050f0a05f..2fc3225149 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
@@ -278,40 +278,6 @@ public:
}; // TConfigurePartsAtTableDropSnapshot
-class TProposeAtTableDropSnapshot: public NCdcStreamState::TProposeAtTable {
-public:
- using NCdcStreamState::TProposeAtTable::TProposeAtTable;
-
- bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override {
- NCdcStreamState::TProposeAtTable::HandleReply(ev, context);
-
- const auto* txState = context.SS->FindTx(OperationId);
- Y_VERIFY(txState);
- const auto& pathId = txState->TargetPathId;
-
- Y_VERIFY(context.SS->TablesWithSnapshots.contains(pathId));
- const auto snapshotTxId = context.SS->TablesWithSnapshots.at(pathId);
-
- auto it = context.SS->SnapshotTables.find(snapshotTxId);
- if (it != context.SS->SnapshotTables.end()) {
- it->second.erase(pathId);
- if (it->second.empty()) {
- context.SS->SnapshotTables.erase(it);
- }
- }
-
- context.SS->SnapshotsStepIds.erase(snapshotTxId);
- context.SS->TablesWithSnapshots.erase(pathId);
-
- NIceDb::TNiceDb db(context.GetDB());
- context.SS->PersistDropSnapshot(db, snapshotTxId, pathId);
-
- context.SS->TabletCounters->Simple()[COUNTER_SNAPSHOTS_COUNT].Sub(1);
- return true;
- }
-
-}; // TProposeAtTableDropSnapshot
-
class TAlterCdcStreamAtTable: public TSubOperation {
static TTxState::ETxState NextState() {
return TTxState::ConfigureParts;
@@ -344,7 +310,7 @@ class TAlterCdcStreamAtTable: public TSubOperation {
}
case TTxState::Propose:
if (DropSnapshot) {
- return MakeHolder<TProposeAtTableDropSnapshot>(OperationId);
+ return MakeHolder<NCdcStreamState::TProposeAtTableDropSnapshot>(OperationId);
} else {
return MakeHolder<NCdcStreamState::TProposeAtTable>(OperationId);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
index 218c0c921b..af2e16c89e 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
@@ -359,6 +359,8 @@ void NTableState::UpdatePartitioningForTableModification(TOperationId operationI
commonShardOp = TTxState::ConfigureParts;
} else if (txState.TxType == TTxState::TxDropCdcStreamAtTable) {
commonShardOp = TTxState::ConfigureParts;
+ } else if (txState.TxType == TTxState::TxDropCdcStreamAtTableDropSnapshot) {
+ commonShardOp = TTxState::ConfigureParts;
} else {
Y_FAIL("UNREACHABLE");
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
index 6ae047a6a1..1441944725 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
@@ -1052,6 +1052,7 @@ class TConfigurePartsAtTable: public TSubOperationState {
case TTxState::TxAlterCdcStreamAtTable:
case TTxState::TxAlterCdcStreamAtTableDropSnapshot:
case TTxState::TxDropCdcStreamAtTable:
+ case TTxState::TxDropCdcStreamAtTableDropSnapshot:
return true;
default:
return false;
@@ -1131,6 +1132,7 @@ class TProposeAtTable: public TSubOperationState {
case TTxState::TxAlterCdcStreamAtTable:
case TTxState::TxAlterCdcStreamAtTableDropSnapshot:
case TTxState::TxDropCdcStreamAtTable:
+ case TTxState::TxDropCdcStreamAtTableDropSnapshot:
return true;
default:
return false;
@@ -1207,6 +1209,40 @@ protected:
}; // TProposeAtTable
+class TProposeAtTableDropSnapshot: public TProposeAtTable {
+public:
+ using TProposeAtTable::TProposeAtTable;
+
+ bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override {
+ TProposeAtTable::HandleReply(ev, context);
+
+ const auto* txState = context.SS->FindTx(OperationId);
+ Y_VERIFY(txState);
+ const auto& pathId = txState->TargetPathId;
+
+ Y_VERIFY(context.SS->TablesWithSnapshots.contains(pathId));
+ const auto snapshotTxId = context.SS->TablesWithSnapshots.at(pathId);
+
+ auto it = context.SS->SnapshotTables.find(snapshotTxId);
+ if (it != context.SS->SnapshotTables.end()) {
+ it->second.erase(pathId);
+ if (it->second.empty()) {
+ context.SS->SnapshotTables.erase(it);
+ }
+ }
+
+ context.SS->SnapshotsStepIds.erase(snapshotTxId);
+ context.SS->TablesWithSnapshots.erase(pathId);
+
+ NIceDb::TNiceDb db(context.GetDB());
+ context.SS->PersistDropSnapshot(db, snapshotTxId, pathId);
+
+ context.SS->TabletCounters->Simple()[COUNTER_SNAPSHOTS_COUNT].Sub(1);
+ return true;
+ }
+
+}; // TProposeAtTableDropSnapshot
+
} // NCdcStreamState
namespace NForceDrop {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp
index c0355b08f7..7884c523dc 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp
@@ -225,6 +225,29 @@ public:
}; // TConfigurePartsAtTable
+class TConfigurePartsAtTableDropSnapshot: public TConfigurePartsAtTable {
+protected:
+ void FillNotice(const TPathId& pathId, NKikimrTxDataShard::TFlatSchemeTransaction& tx, TOperationContext& context) const override {
+ TConfigurePartsAtTable::FillNotice(pathId, tx, context);
+
+ Y_VERIFY(context.SS->TablesWithSnapshots.contains(pathId));
+ const auto snapshotTxId = context.SS->TablesWithSnapshots.at(pathId);
+
+ Y_VERIFY(context.SS->SnapshotsStepIds.contains(snapshotTxId));
+ const auto snapshotStep = context.SS->SnapshotsStepIds.at(snapshotTxId);
+
+ Y_VERIFY(tx.HasDropCdcStreamNotice());
+ auto& notice = *tx.MutableDropCdcStreamNotice();
+
+ notice.MutableDropSnapshot()->SetStep(ui64(snapshotStep));
+ notice.MutableDropSnapshot()->SetTxId(ui64(snapshotTxId));
+ }
+
+public:
+ using TConfigurePartsAtTable::TConfigurePartsAtTable;
+
+}; // TConfigurePartsAtTableDropSnapshot
+
class TDropCdcStreamAtTable: public TSubOperation {
static TTxState::ETxState NextState() {
return TTxState::ConfigureParts;
@@ -248,9 +271,17 @@ class TDropCdcStreamAtTable: public TSubOperation {
switch (state) {
case TTxState::Waiting:
case TTxState::ConfigureParts:
- return MakeHolder<TConfigurePartsAtTable>(OperationId);
+ if (DropSnapshot) {
+ return MakeHolder<TConfigurePartsAtTableDropSnapshot>(OperationId);
+ } else {
+ return MakeHolder<TConfigurePartsAtTable>(OperationId);
+ }
case TTxState::Propose:
- return MakeHolder<NCdcStreamState::TProposeAtTable>(OperationId);
+ if (DropSnapshot) {
+ return MakeHolder<NCdcStreamState::TProposeAtTableDropSnapshot>(OperationId);
+ } else {
+ return MakeHolder<NCdcStreamState::TProposeAtTable>(OperationId);
+ }
case TTxState::ProposedWaitParts:
return MakeHolder<NTableState::TProposedWaitParts>(OperationId);
case TTxState::Done:
@@ -261,7 +292,17 @@ class TDropCdcStreamAtTable: public TSubOperation {
}
public:
- using TSubOperation::TSubOperation;
+ explicit TDropCdcStreamAtTable(TOperationId id, const TTxTransaction& tx, bool dropSnapshot)
+ : TSubOperation(id, tx)
+ , DropSnapshot(dropSnapshot)
+ {
+ }
+
+ explicit TDropCdcStreamAtTable(TOperationId id, TTxState::ETxState state, bool dropSnapshot)
+ : TSubOperation(id, state)
+ , DropSnapshot(dropSnapshot)
+ {
+ }
THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override {
const auto& workingDir = Transaction.GetWorkingDir();
@@ -325,6 +366,17 @@ public:
return result;
}
+ if (DropSnapshot && !context.SS->TablesWithSnapshots.contains(tablePath.Base()->PathId)) {
+ result->SetError(NKikimrScheme::StatusPreconditionFailed, TStringBuilder() << "Table has no snapshots"
+ << ": pathId# " << tablePath.Base()->PathId);
+ return result;
+ }
+
+ auto guard = context.DbGuard();
+ context.MemChanges.GrabPath(context.SS, tablePath.Base()->PathId);
+ context.MemChanges.GrabNewTxState(context.SS, OperationId);
+
+ context.DbChanges.PersistPath(tablePath.Base()->PathId);
context.DbChanges.PersistTxState(OperationId);
Y_VERIFY(context.SS->Tables.contains(tablePath.Base()->PathId));
@@ -339,8 +391,12 @@ public:
Y_VERIFY(stream->AlterVersion != 0);
Y_VERIFY(!stream->AlterData);
+ const auto txType = DropSnapshot
+ ? TTxState::TxDropCdcStreamAtTableDropSnapshot
+ : TTxState::TxDropCdcStreamAtTable;
+
Y_VERIFY(!context.SS->FindTx(OperationId));
- auto& txState = context.SS->CreateTx(OperationId, TTxState::TxDropCdcStreamAtTable, tablePath.Base()->PathId);
+ auto& txState = context.SS->CreateTx(OperationId, txType, tablePath.Base()->PathId);
txState.State = TTxState::ConfigureParts;
tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAlter;
@@ -356,8 +412,9 @@ public:
return result;
}
- void AbortPropose(TOperationContext&) override {
- Y_FAIL("no AbortPropose for TDropCdcStreamAtTable");
+ void AbortPropose(TOperationContext& context) override {
+ LOG_N("TDropCdcStreamAtTable AbortPropose"
+ << ": opId# " << OperationId);
}
void AbortUnsafe(TTxId txId, TOperationContext& context) override {
@@ -367,6 +424,9 @@ public:
context.OnComplete.DoneOperation(OperationId);
}
+private:
+ const bool DropSnapshot;
+
}; // TDropCdcStreamAtTable
} // anonymous
@@ -379,12 +439,12 @@ ISubOperationBase::TPtr CreateDropCdcStreamImpl(TOperationId id, TTxState::ETxSt
return MakeSubOperation<TDropCdcStream>(id, state);
}
-ISubOperationBase::TPtr CreateDropCdcStreamAtTable(TOperationId id, const TTxTransaction& tx) {
- return MakeSubOperation<TDropCdcStreamAtTable>(id, tx);
+ISubOperationBase::TPtr CreateDropCdcStreamAtTable(TOperationId id, const TTxTransaction& tx, bool dropSnapshot) {
+ return MakeSubOperation<TDropCdcStreamAtTable>(id, tx, dropSnapshot);
}
-ISubOperationBase::TPtr CreateDropCdcStreamAtTable(TOperationId id, TTxState::ETxState state) {
- return MakeSubOperation<TDropCdcStreamAtTable>(id, state);
+ISubOperationBase::TPtr CreateDropCdcStreamAtTable(TOperationId id, TTxState::ETxState state, bool dropSnapshot) {
+ return MakeSubOperation<TDropCdcStreamAtTable>(id, state, dropSnapshot);
}
TVector<ISubOperationBase::TPtr> CreateDropCdcStream(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) {
@@ -442,7 +502,13 @@ TVector<ISubOperationBase::TPtr> CreateDropCdcStream(TOperationId opId, const TT
return {CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, errStr)};
}
- if (!context.SS->CheckLocks(tablePath.Base()->PathId, tx, errStr)) {
+ Y_VERIFY(context.SS->CdcStreams.contains(streamPath.Base()->PathId));
+ auto stream = context.SS->CdcStreams.at(streamPath.Base()->PathId);
+
+ const auto lockTxId = stream->State == TCdcStreamInfo::EState::ECdcStreamStateScan
+ ? streamPath.Base()->CreateTxId
+ : InvalidTxId;
+ if (!context.SS->CheckLocks(tablePath.Base()->PathId, lockTxId, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusMultipleModifications, errStr)};
}
@@ -452,13 +518,31 @@ TVector<ISubOperationBase::TPtr> CreateDropCdcStream(TOperationId opId, const TT
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamAtTable);
outTx.MutableDropCdcStream()->CopyFrom(op);
- result.push_back(CreateDropCdcStreamAtTable(NextPartId(opId, result), outTx));
+ if (lockTxId != InvalidTxId) {
+ outTx.MutableLockGuard()->SetOwnerTxId(ui64(lockTxId));
+ }
+
+ result.push_back(CreateDropCdcStreamAtTable(NextPartId(opId, result), outTx, lockTxId != InvalidTxId));
+ }
+
+ if (lockTxId != InvalidTxId) {
+ auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropLock);
+ outTx.SetFailOnExist(true);
+ outTx.SetInternal(true);
+ outTx.MutableLockConfig()->SetName(tablePath.LeafName());
+ outTx.MutableLockGuard()->SetOwnerTxId(ui64(lockTxId));
+
+ result.push_back(DropLock(NextPartId(opId, result), outTx));
}
{
auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamImpl);
outTx.MutableDrop()->SetName(streamPath.Base()->Name);
+ if (lockTxId != InvalidTxId) {
+ outTx.MutableLockGuard()->SetOwnerTxId(ui64(lockTxId));
+ }
+
result.push_back(CreateDropCdcStreamImpl(NextPartId(opId, result), outTx));
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h
index 188d045df2..41526b52f5 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h
@@ -386,8 +386,8 @@ ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, TTxState::E
TVector<ISubOperationBase::TPtr> CreateDropCdcStream(TOperationId id, const TTxTransaction& tx, TOperationContext& context);
ISubOperationBase::TPtr CreateDropCdcStreamImpl(TOperationId id, const TTxTransaction& tx);
ISubOperationBase::TPtr CreateDropCdcStreamImpl(TOperationId id, TTxState::ETxState state);
-ISubOperationBase::TPtr CreateDropCdcStreamAtTable(TOperationId id, const TTxTransaction& tx);
-ISubOperationBase::TPtr CreateDropCdcStreamAtTable(TOperationId id, TTxState::ETxState state);
+ISubOperationBase::TPtr CreateDropCdcStreamAtTable(TOperationId id, const TTxTransaction& tx, bool dropSnapshot);
+ISubOperationBase::TPtr CreateDropCdcStreamAtTable(TOperationId id, TTxState::ETxState state, bool dropSnapshot);
ISubOperationBase::TPtr CreateBackup(TOperationId id, const TTxTransaction& tx);
ISubOperationBase::TPtr CreateBackup(TOperationId id, TTxState::ETxState state);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index b1898ccfef..33db4e272a 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -1404,6 +1404,7 @@ TPathElement::EPathState TSchemeShard::CalcPathState(TTxState::ETxType txType, T
case TTxState::TxCreateCdcStreamAtTable:
case TTxState::TxCreateCdcStreamAtTableWithInitialScan:
case TTxState::TxDropCdcStreamAtTable:
+ case TTxState::TxDropCdcStreamAtTableDropSnapshot:
case TTxState::TxAlterSequence:
case TTxState::TxAlterReplication:
case TTxState::TxAlterBlobDepot:
diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h
index b4982edabd..c150e03b85 100644
--- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h
+++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h
@@ -119,6 +119,7 @@ struct TTxState {
item(TxCreateCdcStreamAtTableWithInitialScan, 73) \
item(TxAlterExtSubDomainCreateHive, 74) \
item(TxAlterCdcStreamAtTableDropSnapshot, 75) \
+ item(TxDropCdcStreamAtTableDropSnapshot, 76) \
// TX_STATE_TYPE_ENUM
@@ -351,6 +352,7 @@ struct TTxState {
case TxDropTableIndexAtMainTable:
case TxDropCdcStream:
case TxDropCdcStreamAtTable:
+ case TxDropCdcStreamAtTableDropSnapshot:
case TxDropSequence:
case TxDropReplication:
case TxDropBlobDepot:
@@ -441,6 +443,7 @@ struct TTxState {
case TxFinalizeBuildIndex:
case TxDropTableIndexAtMainTable: // just increments schemaversion at main table
case TxDropCdcStreamAtTable:
+ case TxDropCdcStreamAtTableDropSnapshot:
case TxUpdateMainTableOnIndexMove:
return false;
case TxAlterPQGroup:
@@ -530,6 +533,7 @@ struct TTxState {
case TxDropLock:
case TxDropTableIndexAtMainTable:
case TxDropCdcStreamAtTable:
+ case TxDropCdcStreamAtTableDropSnapshot:
case TxUpdateMainTableOnIndexMove:
return false;
case TxAlterPQGroup:
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
index 13f0fe2668..e4a40a535a 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
@@ -139,123 +139,6 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
}
}
- void InitialScan(bool enable) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime, TTestEnvOptions()
- .EnableProtoSourceIdInfo(true)
- .EnableChangefeedInitialScan(enable));
- ui64 txId = 100;
-
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
- Name: "Table"
- Columns { Name: "key" Type: "Uint64" }
- Columns { Name: "value" Type: "Uint64" }
- KeyColumnNames: ["key"]
- )");
- env.TestWaitNotification(runtime, txId);
-
- const auto expectedStatus = enable
- ? NKikimrScheme::StatusAccepted
- : NKikimrScheme::StatusPreconditionFailed;
-
- TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
- TableName: "Table"
- StreamDescription {
- Name: "Stream"
- Mode: ECdcStreamModeKeysOnly
- Format: ECdcStreamFormatProto
- State: ECdcStreamStateScan
- }
- )", {expectedStatus});
-
- if (!enable) {
- return;
- }
-
- env.TestWaitNotification(runtime, txId);
- const auto lockTxId = txId;
-
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
- NLs::PathExist,
- NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly),
- NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto),
- NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateScan),
- });
-
- auto testAlterCdcStream = [&runtime](ui64 txId, const TString& parentPath, const TString& schema,
- const TMaybe<ui64>& lockTxId, TEvSchemeShard::EStatus expectedStatus = NKikimrScheme::StatusAccepted)
- {
- auto request = AlterCdcStreamRequest(txId, parentPath, schema);
- if (lockTxId) {
- request->Record.MutableTransaction(0)->MutableLockGuard()->SetOwnerTxId(*lockTxId);
- }
-
- ForwardToTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor(), request);
- TestModificationResult(runtime, txId, expectedStatus);
- };
-
- // without guard & lockTxId
- testAlterCdcStream(++txId, "/MyRoot", R"(
- TableName: "Table"
- StreamName: "Stream"
- GetReady {
- LockTxId: 0
- }
- )", {}, NKikimrScheme::StatusMultipleModifications);
-
- // with guard, without lockTxId
- testAlterCdcStream(++txId, "/MyRoot", R"(
- TableName: "Table"
- StreamName: "Stream"
- GetReady {
- LockTxId: 0
- }
- )", lockTxId, NKikimrScheme::StatusMultipleModifications);
-
- // without guard, with lockTxId
- testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"(
- TableName: "Table"
- StreamName: "Stream"
- GetReady {
- LockTxId: %lu
- }
- )", lockTxId), {}, NKikimrScheme::StatusMultipleModifications);
-
- // with guard & lockTxId
- testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"(
- TableName: "Table"
- StreamName: "Stream"
- GetReady {
- LockTxId: %lu
- }
- )", lockTxId), lockTxId);
- env.TestWaitNotification(runtime, txId);
-
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
- NLs::PathExist,
- NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly),
- NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto),
- NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady),
- });
-
- // another try should fail
- testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"(
- TableName: "Table"
- StreamName: "Stream"
- GetReady {
- LockTxId: %lu
- }
- )", lockTxId), {}, NKikimrScheme::StatusPreconditionFailed);
- }
-
- Y_UNIT_TEST(InitialScanShouldSucceed) {
- InitialScan(true);
- }
-
- Y_UNIT_TEST(InitialScanShouldFail) {
- InitialScan(false);
- }
-
Y_UNIT_TEST(Negative) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
@@ -945,3 +828,216 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
}
} // TCdcStreamTests
+
+Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) {
+ void InitialScan(bool enable) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions()
+ .EnableProtoSourceIdInfo(true)
+ .EnableChangefeedInitialScan(enable));
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ const auto expectedStatus = enable
+ ? NKikimrScheme::StatusAccepted
+ : NKikimrScheme::StatusPreconditionFailed;
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ State: ECdcStreamStateScan
+ }
+ )", {expectedStatus});
+
+ if (!enable) {
+ return;
+ }
+
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ NLs::PathExist,
+ NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly),
+ NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto),
+ NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateScan),
+ });
+ }
+
+ Y_UNIT_TEST(InitialScanEnabled) {
+ InitialScan(true);
+ }
+
+ Y_UNIT_TEST(InitialScanDisabled) {
+ InitialScan(false);
+ }
+
+ Y_UNIT_TEST(AlterStream) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions()
+ .EnableProtoSourceIdInfo(true)
+ .EnableChangefeedInitialScan(true));
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ State: ECdcStreamStateScan
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ const auto lockTxId = txId;
+ auto testAlterCdcStream = [&runtime](ui64 txId, const TString& parentPath, const TString& schema,
+ const TMaybe<ui64>& lockTxId, TEvSchemeShard::EStatus expectedStatus = NKikimrScheme::StatusAccepted)
+ {
+ auto request = AlterCdcStreamRequest(txId, parentPath, schema);
+ if (lockTxId) {
+ request->Record.MutableTransaction(0)->MutableLockGuard()->SetOwnerTxId(*lockTxId);
+ }
+
+ ForwardToTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor(), request);
+ TestModificationResult(runtime, txId, expectedStatus);
+ };
+
+ // without guard & lockTxId
+ testAlterCdcStream(++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamName: "Stream"
+ GetReady {
+ LockTxId: 0
+ }
+ )", {}, NKikimrScheme::StatusMultipleModifications);
+
+ // with guard, without lockTxId
+ testAlterCdcStream(++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamName: "Stream"
+ GetReady {
+ LockTxId: 0
+ }
+ )", lockTxId, NKikimrScheme::StatusMultipleModifications);
+
+ // without guard, with lockTxId
+ testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ StreamName: "Stream"
+ GetReady {
+ LockTxId: %lu
+ }
+ )", lockTxId), {}, NKikimrScheme::StatusMultipleModifications);
+
+ // with guard & lockTxId
+ testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ StreamName: "Stream"
+ GetReady {
+ LockTxId: %lu
+ }
+ )", lockTxId), lockTxId);
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ NLs::PathExist,
+ NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly),
+ NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto),
+ NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady),
+ });
+
+ // another try should fail
+ testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ StreamName: "Stream"
+ GetReady {
+ LockTxId: %lu
+ }
+ )", lockTxId), {}, NKikimrScheme::StatusPreconditionFailed);
+ }
+
+ Y_UNIT_TEST(DropStream) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions()
+ .EnableProtoSourceIdInfo(true)
+ .EnableChangefeedInitialScan(true));
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream1"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ State: ECdcStreamStateScan
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ // the table is locked now
+ TestAlterTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "extra" Type: "Uint64"}
+ )", {NKikimrScheme::StatusMultipleModifications});
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream2"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ )", {NKikimrScheme::StatusMultipleModifications});
+
+ // drop the stream that locks the table
+ TestDropCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamName: "Stream1"
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ // the table is no longer locked
+ TestAlterTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "extra" Type: "Uint64"}
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream2"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+ }
+
+} // TCdcStreamWithInitialScanTests
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
index a4ddb9beb6..d067dccb5a 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
@@ -175,8 +175,10 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
});
}
- Y_UNIT_TEST(DropStream) {
+ void DropStream(const TMaybe<NKikimrSchemeOp::ECdcStreamState>& state = Nothing()) {
TTestWithReboots t;
+ t.GetTestEnvOptions().EnableChangefeedInitialScan(true);
+
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
{
TInactiveZone inactive(activeZone);
@@ -188,14 +190,23 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
)");
t.TestEnv->TestWaitNotification(runtime, t.TxId);
- TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
+ NKikimrSchemeOp::TCdcStreamDescription streamDesc;
+ streamDesc.SetName("Stream");
+ streamDesc.SetMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly);
+ streamDesc.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto);
+
+ if (state) {
+ streamDesc.SetState(*state);
+ }
+
+ TString strDesc;
+ const bool ok = google::protobuf::TextFormat::PrintToString(streamDesc, &strDesc);
+ UNIT_ASSERT_C(ok, "protobuf serialization failed");
+
+ TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", Sprintf(R"(
TableName: "Table"
- StreamDescription {
- Name: "Stream"
- Mode: ECdcStreamModeKeysOnly
- Format: ECdcStreamFormatProto
- }
- )");
+ StreamDescription { %s }
+ )", strDesc.c_str()));
t.TestEnv->TestWaitNotification(runtime, t.TxId);
}
@@ -209,6 +220,18 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
});
}
+ Y_UNIT_TEST(DropStream) {
+ DropStream();
+ }
+
+ Y_UNIT_TEST(DropStreamExplicitReady) {
+ DropStream(NKikimrSchemeOp::ECdcStreamStateReady);
+ }
+
+ Y_UNIT_TEST(DropStreamCreatedWithInitialScan) {
+ DropStream(NKikimrSchemeOp::ECdcStreamStateScan);
+ }
+
Y_UNIT_TEST(CreateDropRecreate) {
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {