diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-01-17 12:40:33 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-01-17 12:40:33 +0300 |
commit | 4dd5d15a4ccd02507d72968f7d585ed79c442c51 (patch) | |
tree | 6e592cf68208ce582724d2f4806bb8ba2863bb51 | |
parent | b993b615fddb6d35247ff92539bed3afe7975e11 (diff) | |
download | ydb-4dd5d15a4ccd02507d72968f7d585ed79c442c51.tar.gz |
Drop lock & snapshot during stream deletion
13 files changed, 400 insertions, 175 deletions
diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index ee279130b3..f3bc4c20bb 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -172,6 +172,7 @@ enum ESimpleCounters { COUNTER_IN_FLIGHT_OPS_TxCreateCdcStreamAtTableWithInitialScan = 139 [(CounterOpts) = {Name: "InFlightOps/CreateCdcStreamAtTableWithInitialScan"}]; COUNTER_IN_FLIGHT_OPS_TxAlterExtSubDomainCreateHive = 140 [(CounterOpts) = {Name: "InFlightOps/AlterExtSubDomainCreateHive"}]; COUNTER_IN_FLIGHT_OPS_TxAlterCdcStreamAtTableDropSnapshot = 141 [(CounterOpts) = {Name: "InFlightOps/AlterCdcStreamAtTableDropSnapshot"}]; + COUNTER_IN_FLIGHT_OPS_TxDropCdcStreamAtTableDropSnapshot = 142 [(CounterOpts) = {Name: "InFlightOps/DropCdcStreamAtTableDropSnapshot"}]; } enum ECumulativeCounters { @@ -277,6 +278,7 @@ enum ECumulativeCounters { COUNTER_FINISHED_OPS_TxCreateCdcStreamAtTableWithInitialScan = 84 [(CounterOpts) = {Name: "FinishedOps/CreateCdcStreamAtTableWithInitialScan"}]; COUNTER_FINISHED_OPS_TxAlterExtSubDomainCreateHive = 85 [(CounterOpts) = {Name: "FinishedOps/AlterExtSubDomainCreateHive"}]; COUNTER_FINISHED_OPS_TxAlterCdcStreamAtTableDropSnapshot = 86 [(CounterOpts) = {Name: "FinishedOps/AlterCdcStreamAtTableDropSnapshot"}]; + COUNTER_FINISHED_OPS_TxDropCdcStreamAtTableDropSnapshot = 87 [(CounterOpts) = {Name: "FinishedOps/DropCdcStreamAtTableDropSnapshot"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 6d25d172da..ed3a9e25a3 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -418,6 +418,7 @@ message TDropCdcStreamNotice { optional NKikimrProto.TPathID PathId = 1; optional uint64 TableSchemaVersion = 2; optional NKikimrProto.TPathID StreamPathId = 3; + optional TSnapshot DropSnapshot = 4; } message TAsyncIndexInfo { diff --git a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp index 67b676e4e5..2788930a5f 100644 --- a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp @@ -46,6 +46,14 @@ public: DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); } + if (params.HasDropSnapshot()) { + const auto& snapshot = params.GetDropSnapshot(); + Y_VERIFY(snapshot.GetStep() != 0); + + const TSnapshotKey key(pathId.OwnerId, pathId.LocalPathId, snapshot.GetStep(), snapshot.GetTxId()); + DataShard.GetSnapshotManager().RemoveSnapshot(txc.DB, key); + } + RemoveSender.Reset(new TEvChangeExchange::TEvRemoveSender(streamPathId)); BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index 3514444a26..bb83d34e90 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -969,7 +969,9 @@ ISubOperationBase::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxSta case TTxState::ETxType::TxDropCdcStream: return CreateDropCdcStreamImpl(NextPartId(), txState); case TTxState::ETxType::TxDropCdcStreamAtTable: - return CreateDropCdcStreamAtTable(NextPartId(), txState); + return CreateDropCdcStreamAtTable(NextPartId(), txState, false); + case TTxState::ETxType::TxDropCdcStreamAtTableDropSnapshot: + return CreateDropCdcStreamAtTable(NextPartId(), txState, true); // Sequences case TTxState::ETxType::TxCreateSequence: diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp index 2050f0a05f..2fc3225149 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp @@ -278,40 +278,6 @@ public: }; // TConfigurePartsAtTableDropSnapshot -class TProposeAtTableDropSnapshot: public NCdcStreamState::TProposeAtTable { -public: - using NCdcStreamState::TProposeAtTable::TProposeAtTable; - - bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { - NCdcStreamState::TProposeAtTable::HandleReply(ev, context); - - const auto* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState); - const auto& pathId = txState->TargetPathId; - - Y_VERIFY(context.SS->TablesWithSnapshots.contains(pathId)); - const auto snapshotTxId = context.SS->TablesWithSnapshots.at(pathId); - - auto it = context.SS->SnapshotTables.find(snapshotTxId); - if (it != context.SS->SnapshotTables.end()) { - it->second.erase(pathId); - if (it->second.empty()) { - context.SS->SnapshotTables.erase(it); - } - } - - context.SS->SnapshotsStepIds.erase(snapshotTxId); - context.SS->TablesWithSnapshots.erase(pathId); - - NIceDb::TNiceDb db(context.GetDB()); - context.SS->PersistDropSnapshot(db, snapshotTxId, pathId); - - context.SS->TabletCounters->Simple()[COUNTER_SNAPSHOTS_COUNT].Sub(1); - return true; - } - -}; // TProposeAtTableDropSnapshot - class TAlterCdcStreamAtTable: public TSubOperation { static TTxState::ETxState NextState() { return TTxState::ConfigureParts; @@ -344,7 +310,7 @@ class TAlterCdcStreamAtTable: public TSubOperation { } case TTxState::Propose: if (DropSnapshot) { - return MakeHolder<TProposeAtTableDropSnapshot>(OperationId); + return MakeHolder<NCdcStreamState::TProposeAtTableDropSnapshot>(OperationId); } else { return MakeHolder<NCdcStreamState::TProposeAtTable>(OperationId); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp index 218c0c921b..af2e16c89e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp @@ -359,6 +359,8 @@ void NTableState::UpdatePartitioningForTableModification(TOperationId operationI commonShardOp = TTxState::ConfigureParts; } else if (txState.TxType == TTxState::TxDropCdcStreamAtTable) { commonShardOp = TTxState::ConfigureParts; + } else if (txState.TxType == TTxState::TxDropCdcStreamAtTableDropSnapshot) { + commonShardOp = TTxState::ConfigureParts; } else { Y_FAIL("UNREACHABLE"); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index 6ae047a6a1..1441944725 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -1052,6 +1052,7 @@ class TConfigurePartsAtTable: public TSubOperationState { case TTxState::TxAlterCdcStreamAtTable: case TTxState::TxAlterCdcStreamAtTableDropSnapshot: case TTxState::TxDropCdcStreamAtTable: + case TTxState::TxDropCdcStreamAtTableDropSnapshot: return true; default: return false; @@ -1131,6 +1132,7 @@ class TProposeAtTable: public TSubOperationState { case TTxState::TxAlterCdcStreamAtTable: case TTxState::TxAlterCdcStreamAtTableDropSnapshot: case TTxState::TxDropCdcStreamAtTable: + case TTxState::TxDropCdcStreamAtTableDropSnapshot: return true; default: return false; @@ -1207,6 +1209,40 @@ protected: }; // TProposeAtTable +class TProposeAtTableDropSnapshot: public TProposeAtTable { +public: + using TProposeAtTable::TProposeAtTable; + + bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { + TProposeAtTable::HandleReply(ev, context); + + const auto* txState = context.SS->FindTx(OperationId); + Y_VERIFY(txState); + const auto& pathId = txState->TargetPathId; + + Y_VERIFY(context.SS->TablesWithSnapshots.contains(pathId)); + const auto snapshotTxId = context.SS->TablesWithSnapshots.at(pathId); + + auto it = context.SS->SnapshotTables.find(snapshotTxId); + if (it != context.SS->SnapshotTables.end()) { + it->second.erase(pathId); + if (it->second.empty()) { + context.SS->SnapshotTables.erase(it); + } + } + + context.SS->SnapshotsStepIds.erase(snapshotTxId); + context.SS->TablesWithSnapshots.erase(pathId); + + NIceDb::TNiceDb db(context.GetDB()); + context.SS->PersistDropSnapshot(db, snapshotTxId, pathId); + + context.SS->TabletCounters->Simple()[COUNTER_SNAPSHOTS_COUNT].Sub(1); + return true; + } + +}; // TProposeAtTableDropSnapshot + } // NCdcStreamState namespace NForceDrop { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp index c0355b08f7..7884c523dc 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp @@ -225,6 +225,29 @@ public: }; // TConfigurePartsAtTable +class TConfigurePartsAtTableDropSnapshot: public TConfigurePartsAtTable { +protected: + void FillNotice(const TPathId& pathId, NKikimrTxDataShard::TFlatSchemeTransaction& tx, TOperationContext& context) const override { + TConfigurePartsAtTable::FillNotice(pathId, tx, context); + + Y_VERIFY(context.SS->TablesWithSnapshots.contains(pathId)); + const auto snapshotTxId = context.SS->TablesWithSnapshots.at(pathId); + + Y_VERIFY(context.SS->SnapshotsStepIds.contains(snapshotTxId)); + const auto snapshotStep = context.SS->SnapshotsStepIds.at(snapshotTxId); + + Y_VERIFY(tx.HasDropCdcStreamNotice()); + auto& notice = *tx.MutableDropCdcStreamNotice(); + + notice.MutableDropSnapshot()->SetStep(ui64(snapshotStep)); + notice.MutableDropSnapshot()->SetTxId(ui64(snapshotTxId)); + } + +public: + using TConfigurePartsAtTable::TConfigurePartsAtTable; + +}; // TConfigurePartsAtTableDropSnapshot + class TDropCdcStreamAtTable: public TSubOperation { static TTxState::ETxState NextState() { return TTxState::ConfigureParts; @@ -248,9 +271,17 @@ class TDropCdcStreamAtTable: public TSubOperation { switch (state) { case TTxState::Waiting: case TTxState::ConfigureParts: - return MakeHolder<TConfigurePartsAtTable>(OperationId); + if (DropSnapshot) { + return MakeHolder<TConfigurePartsAtTableDropSnapshot>(OperationId); + } else { + return MakeHolder<TConfigurePartsAtTable>(OperationId); + } case TTxState::Propose: - return MakeHolder<NCdcStreamState::TProposeAtTable>(OperationId); + if (DropSnapshot) { + return MakeHolder<NCdcStreamState::TProposeAtTableDropSnapshot>(OperationId); + } else { + return MakeHolder<NCdcStreamState::TProposeAtTable>(OperationId); + } case TTxState::ProposedWaitParts: return MakeHolder<NTableState::TProposedWaitParts>(OperationId); case TTxState::Done: @@ -261,7 +292,17 @@ class TDropCdcStreamAtTable: public TSubOperation { } public: - using TSubOperation::TSubOperation; + explicit TDropCdcStreamAtTable(TOperationId id, const TTxTransaction& tx, bool dropSnapshot) + : TSubOperation(id, tx) + , DropSnapshot(dropSnapshot) + { + } + + explicit TDropCdcStreamAtTable(TOperationId id, TTxState::ETxState state, bool dropSnapshot) + : TSubOperation(id, state) + , DropSnapshot(dropSnapshot) + { + } THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override { const auto& workingDir = Transaction.GetWorkingDir(); @@ -325,6 +366,17 @@ public: return result; } + if (DropSnapshot && !context.SS->TablesWithSnapshots.contains(tablePath.Base()->PathId)) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, TStringBuilder() << "Table has no snapshots" + << ": pathId# " << tablePath.Base()->PathId); + return result; + } + + auto guard = context.DbGuard(); + context.MemChanges.GrabPath(context.SS, tablePath.Base()->PathId); + context.MemChanges.GrabNewTxState(context.SS, OperationId); + + context.DbChanges.PersistPath(tablePath.Base()->PathId); context.DbChanges.PersistTxState(OperationId); Y_VERIFY(context.SS->Tables.contains(tablePath.Base()->PathId)); @@ -339,8 +391,12 @@ public: Y_VERIFY(stream->AlterVersion != 0); Y_VERIFY(!stream->AlterData); + const auto txType = DropSnapshot + ? TTxState::TxDropCdcStreamAtTableDropSnapshot + : TTxState::TxDropCdcStreamAtTable; + Y_VERIFY(!context.SS->FindTx(OperationId)); - auto& txState = context.SS->CreateTx(OperationId, TTxState::TxDropCdcStreamAtTable, tablePath.Base()->PathId); + auto& txState = context.SS->CreateTx(OperationId, txType, tablePath.Base()->PathId); txState.State = TTxState::ConfigureParts; tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAlter; @@ -356,8 +412,9 @@ public: return result; } - void AbortPropose(TOperationContext&) override { - Y_FAIL("no AbortPropose for TDropCdcStreamAtTable"); + void AbortPropose(TOperationContext& context) override { + LOG_N("TDropCdcStreamAtTable AbortPropose" + << ": opId# " << OperationId); } void AbortUnsafe(TTxId txId, TOperationContext& context) override { @@ -367,6 +424,9 @@ public: context.OnComplete.DoneOperation(OperationId); } +private: + const bool DropSnapshot; + }; // TDropCdcStreamAtTable } // anonymous @@ -379,12 +439,12 @@ ISubOperationBase::TPtr CreateDropCdcStreamImpl(TOperationId id, TTxState::ETxSt return MakeSubOperation<TDropCdcStream>(id, state); } -ISubOperationBase::TPtr CreateDropCdcStreamAtTable(TOperationId id, const TTxTransaction& tx) { - return MakeSubOperation<TDropCdcStreamAtTable>(id, tx); +ISubOperationBase::TPtr CreateDropCdcStreamAtTable(TOperationId id, const TTxTransaction& tx, bool dropSnapshot) { + return MakeSubOperation<TDropCdcStreamAtTable>(id, tx, dropSnapshot); } -ISubOperationBase::TPtr CreateDropCdcStreamAtTable(TOperationId id, TTxState::ETxState state) { - return MakeSubOperation<TDropCdcStreamAtTable>(id, state); +ISubOperationBase::TPtr CreateDropCdcStreamAtTable(TOperationId id, TTxState::ETxState state, bool dropSnapshot) { + return MakeSubOperation<TDropCdcStreamAtTable>(id, state, dropSnapshot); } TVector<ISubOperationBase::TPtr> CreateDropCdcStream(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) { @@ -442,7 +502,13 @@ TVector<ISubOperationBase::TPtr> CreateDropCdcStream(TOperationId opId, const TT return {CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, errStr)}; } - if (!context.SS->CheckLocks(tablePath.Base()->PathId, tx, errStr)) { + Y_VERIFY(context.SS->CdcStreams.contains(streamPath.Base()->PathId)); + auto stream = context.SS->CdcStreams.at(streamPath.Base()->PathId); + + const auto lockTxId = stream->State == TCdcStreamInfo::EState::ECdcStreamStateScan + ? streamPath.Base()->CreateTxId + : InvalidTxId; + if (!context.SS->CheckLocks(tablePath.Base()->PathId, lockTxId, errStr)) { return {CreateReject(opId, NKikimrScheme::StatusMultipleModifications, errStr)}; } @@ -452,13 +518,31 @@ TVector<ISubOperationBase::TPtr> CreateDropCdcStream(TOperationId opId, const TT auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamAtTable); outTx.MutableDropCdcStream()->CopyFrom(op); - result.push_back(CreateDropCdcStreamAtTable(NextPartId(opId, result), outTx)); + if (lockTxId != InvalidTxId) { + outTx.MutableLockGuard()->SetOwnerTxId(ui64(lockTxId)); + } + + result.push_back(CreateDropCdcStreamAtTable(NextPartId(opId, result), outTx, lockTxId != InvalidTxId)); + } + + if (lockTxId != InvalidTxId) { + auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropLock); + outTx.SetFailOnExist(true); + outTx.SetInternal(true); + outTx.MutableLockConfig()->SetName(tablePath.LeafName()); + outTx.MutableLockGuard()->SetOwnerTxId(ui64(lockTxId)); + + result.push_back(DropLock(NextPartId(opId, result), outTx)); } { auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamImpl); outTx.MutableDrop()->SetName(streamPath.Base()->Name); + if (lockTxId != InvalidTxId) { + outTx.MutableLockGuard()->SetOwnerTxId(ui64(lockTxId)); + } + result.push_back(CreateDropCdcStreamImpl(NextPartId(opId, result), outTx)); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index 188d045df2..41526b52f5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -386,8 +386,8 @@ ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, TTxState::E TVector<ISubOperationBase::TPtr> CreateDropCdcStream(TOperationId id, const TTxTransaction& tx, TOperationContext& context); ISubOperationBase::TPtr CreateDropCdcStreamImpl(TOperationId id, const TTxTransaction& tx); ISubOperationBase::TPtr CreateDropCdcStreamImpl(TOperationId id, TTxState::ETxState state); -ISubOperationBase::TPtr CreateDropCdcStreamAtTable(TOperationId id, const TTxTransaction& tx); -ISubOperationBase::TPtr CreateDropCdcStreamAtTable(TOperationId id, TTxState::ETxState state); +ISubOperationBase::TPtr CreateDropCdcStreamAtTable(TOperationId id, const TTxTransaction& tx, bool dropSnapshot); +ISubOperationBase::TPtr CreateDropCdcStreamAtTable(TOperationId id, TTxState::ETxState state, bool dropSnapshot); ISubOperationBase::TPtr CreateBackup(TOperationId id, const TTxTransaction& tx); ISubOperationBase::TPtr CreateBackup(TOperationId id, TTxState::ETxState state); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index b1898ccfef..33db4e272a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1404,6 +1404,7 @@ TPathElement::EPathState TSchemeShard::CalcPathState(TTxState::ETxType txType, T case TTxState::TxCreateCdcStreamAtTable: case TTxState::TxCreateCdcStreamAtTableWithInitialScan: case TTxState::TxDropCdcStreamAtTable: + case TTxState::TxDropCdcStreamAtTableDropSnapshot: case TTxState::TxAlterSequence: case TTxState::TxAlterReplication: case TTxState::TxAlterBlobDepot: diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h index b4982edabd..c150e03b85 100644 --- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h +++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h @@ -119,6 +119,7 @@ struct TTxState { item(TxCreateCdcStreamAtTableWithInitialScan, 73) \ item(TxAlterExtSubDomainCreateHive, 74) \ item(TxAlterCdcStreamAtTableDropSnapshot, 75) \ + item(TxDropCdcStreamAtTableDropSnapshot, 76) \ // TX_STATE_TYPE_ENUM @@ -351,6 +352,7 @@ struct TTxState { case TxDropTableIndexAtMainTable: case TxDropCdcStream: case TxDropCdcStreamAtTable: + case TxDropCdcStreamAtTableDropSnapshot: case TxDropSequence: case TxDropReplication: case TxDropBlobDepot: @@ -441,6 +443,7 @@ struct TTxState { case TxFinalizeBuildIndex: case TxDropTableIndexAtMainTable: // just increments schemaversion at main table case TxDropCdcStreamAtTable: + case TxDropCdcStreamAtTableDropSnapshot: case TxUpdateMainTableOnIndexMove: return false; case TxAlterPQGroup: @@ -530,6 +533,7 @@ struct TTxState { case TxDropLock: case TxDropTableIndexAtMainTable: case TxDropCdcStreamAtTable: + case TxDropCdcStreamAtTableDropSnapshot: case TxUpdateMainTableOnIndexMove: return false; case TxAlterPQGroup: diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp index 13f0fe2668..e4a40a535a 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp @@ -139,123 +139,6 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { } } - void InitialScan(bool enable) { - TTestBasicRuntime runtime; - TTestEnv env(runtime, TTestEnvOptions() - .EnableProtoSourceIdInfo(true) - .EnableChangefeedInitialScan(enable)); - ui64 txId = 100; - - TestCreateTable(runtime, ++txId, "/MyRoot", R"( - Name: "Table" - Columns { Name: "key" Type: "Uint64" } - Columns { Name: "value" Type: "Uint64" } - KeyColumnNames: ["key"] - )"); - env.TestWaitNotification(runtime, txId); - - const auto expectedStatus = enable - ? NKikimrScheme::StatusAccepted - : NKikimrScheme::StatusPreconditionFailed; - - TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( - TableName: "Table" - StreamDescription { - Name: "Stream" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - State: ECdcStreamStateScan - } - )", {expectedStatus}); - - if (!enable) { - return; - } - - env.TestWaitNotification(runtime, txId); - const auto lockTxId = txId; - - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { - NLs::PathExist, - NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly), - NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), - NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateScan), - }); - - auto testAlterCdcStream = [&runtime](ui64 txId, const TString& parentPath, const TString& schema, - const TMaybe<ui64>& lockTxId, TEvSchemeShard::EStatus expectedStatus = NKikimrScheme::StatusAccepted) - { - auto request = AlterCdcStreamRequest(txId, parentPath, schema); - if (lockTxId) { - request->Record.MutableTransaction(0)->MutableLockGuard()->SetOwnerTxId(*lockTxId); - } - - ForwardToTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor(), request); - TestModificationResult(runtime, txId, expectedStatus); - }; - - // without guard & lockTxId - testAlterCdcStream(++txId, "/MyRoot", R"( - TableName: "Table" - StreamName: "Stream" - GetReady { - LockTxId: 0 - } - )", {}, NKikimrScheme::StatusMultipleModifications); - - // with guard, without lockTxId - testAlterCdcStream(++txId, "/MyRoot", R"( - TableName: "Table" - StreamName: "Stream" - GetReady { - LockTxId: 0 - } - )", lockTxId, NKikimrScheme::StatusMultipleModifications); - - // without guard, with lockTxId - testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"( - TableName: "Table" - StreamName: "Stream" - GetReady { - LockTxId: %lu - } - )", lockTxId), {}, NKikimrScheme::StatusMultipleModifications); - - // with guard & lockTxId - testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"( - TableName: "Table" - StreamName: "Stream" - GetReady { - LockTxId: %lu - } - )", lockTxId), lockTxId); - env.TestWaitNotification(runtime, txId); - - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { - NLs::PathExist, - NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly), - NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), - NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady), - }); - - // another try should fail - testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"( - TableName: "Table" - StreamName: "Stream" - GetReady { - LockTxId: %lu - } - )", lockTxId), {}, NKikimrScheme::StatusPreconditionFailed); - } - - Y_UNIT_TEST(InitialScanShouldSucceed) { - InitialScan(true); - } - - Y_UNIT_TEST(InitialScanShouldFail) { - InitialScan(false); - } - Y_UNIT_TEST(Negative) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); @@ -945,3 +828,216 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { } } // TCdcStreamTests + +Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) { + void InitialScan(bool enable) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions() + .EnableProtoSourceIdInfo(true) + .EnableChangefeedInitialScan(enable)); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + const auto expectedStatus = enable + ? NKikimrScheme::StatusAccepted + : NKikimrScheme::StatusPreconditionFailed; + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + State: ECdcStreamStateScan + } + )", {expectedStatus}); + + if (!enable) { + return; + } + + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::PathExist, + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly), + NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), + NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateScan), + }); + } + + Y_UNIT_TEST(InitialScanEnabled) { + InitialScan(true); + } + + Y_UNIT_TEST(InitialScanDisabled) { + InitialScan(false); + } + + Y_UNIT_TEST(AlterStream) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions() + .EnableProtoSourceIdInfo(true) + .EnableChangefeedInitialScan(true)); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + State: ECdcStreamStateScan + } + )"); + env.TestWaitNotification(runtime, txId); + + const auto lockTxId = txId; + auto testAlterCdcStream = [&runtime](ui64 txId, const TString& parentPath, const TString& schema, + const TMaybe<ui64>& lockTxId, TEvSchemeShard::EStatus expectedStatus = NKikimrScheme::StatusAccepted) + { + auto request = AlterCdcStreamRequest(txId, parentPath, schema); + if (lockTxId) { + request->Record.MutableTransaction(0)->MutableLockGuard()->SetOwnerTxId(*lockTxId); + } + + ForwardToTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor(), request); + TestModificationResult(runtime, txId, expectedStatus); + }; + + // without guard & lockTxId + testAlterCdcStream(++txId, "/MyRoot", R"( + TableName: "Table" + StreamName: "Stream" + GetReady { + LockTxId: 0 + } + )", {}, NKikimrScheme::StatusMultipleModifications); + + // with guard, without lockTxId + testAlterCdcStream(++txId, "/MyRoot", R"( + TableName: "Table" + StreamName: "Stream" + GetReady { + LockTxId: 0 + } + )", lockTxId, NKikimrScheme::StatusMultipleModifications); + + // without guard, with lockTxId + testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"( + TableName: "Table" + StreamName: "Stream" + GetReady { + LockTxId: %lu + } + )", lockTxId), {}, NKikimrScheme::StatusMultipleModifications); + + // with guard & lockTxId + testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"( + TableName: "Table" + StreamName: "Stream" + GetReady { + LockTxId: %lu + } + )", lockTxId), lockTxId); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::PathExist, + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly), + NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), + NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady), + }); + + // another try should fail + testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"( + TableName: "Table" + StreamName: "Stream" + GetReady { + LockTxId: %lu + } + )", lockTxId), {}, NKikimrScheme::StatusPreconditionFailed); + } + + Y_UNIT_TEST(DropStream) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions() + .EnableProtoSourceIdInfo(true) + .EnableChangefeedInitialScan(true)); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream1" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + State: ECdcStreamStateScan + } + )"); + env.TestWaitNotification(runtime, txId); + + // the table is locked now + TestAlterTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "extra" Type: "Uint64"} + )", {NKikimrScheme::StatusMultipleModifications}); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream2" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )", {NKikimrScheme::StatusMultipleModifications}); + + // drop the stream that locks the table + TestDropCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamName: "Stream1" + )"); + env.TestWaitNotification(runtime, txId); + + // the table is no longer locked + TestAlterTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "extra" Type: "Uint64"} + )"); + env.TestWaitNotification(runtime, txId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream2" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + env.TestWaitNotification(runtime, txId); + } + +} // TCdcStreamWithInitialScanTests diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp index a4ddb9beb6..d067dccb5a 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp @@ -175,8 +175,10 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { }); } - Y_UNIT_TEST(DropStream) { + void DropStream(const TMaybe<NKikimrSchemeOp::ECdcStreamState>& state = Nothing()) { TTestWithReboots t; + t.GetTestEnvOptions().EnableChangefeedInitialScan(true); + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { { TInactiveZone inactive(activeZone); @@ -188,14 +190,23 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); - TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( + NKikimrSchemeOp::TCdcStreamDescription streamDesc; + streamDesc.SetName("Stream"); + streamDesc.SetMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly); + streamDesc.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); + + if (state) { + streamDesc.SetState(*state); + } + + TString strDesc; + const bool ok = google::protobuf::TextFormat::PrintToString(streamDesc, &strDesc); + UNIT_ASSERT_C(ok, "protobuf serialization failed"); + + TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", Sprintf(R"( TableName: "Table" - StreamDescription { - Name: "Stream" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - } - )"); + StreamDescription { %s } + )", strDesc.c_str())); t.TestEnv->TestWaitNotification(runtime, t.TxId); } @@ -209,6 +220,18 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { }); } + Y_UNIT_TEST(DropStream) { + DropStream(); + } + + Y_UNIT_TEST(DropStreamExplicitReady) { + DropStream(NKikimrSchemeOp::ECdcStreamStateReady); + } + + Y_UNIT_TEST(DropStreamCreatedWithInitialScan) { + DropStream(NKikimrSchemeOp::ECdcStreamStateScan); + } + Y_UNIT_TEST(CreateDropRecreate) { TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { |