diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-01-13 18:13:21 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-01-13 18:13:21 +0300 |
commit | 6ba153ea01c99e3084f4ca75e3260f89da8d8527 (patch) | |
tree | 591307dc81e08e5b1945941e0c6671740a5901f6 | |
parent | ade5487ce6008abe6796ba9b29da4b29d4b4337f (diff) | |
download | ydb-6ba153ea01c99e3084f4ca75e3260f89da8d8527.tar.gz |
Switch cdc stream to ready state, drop lock & snapshot
19 files changed, 369 insertions, 63 deletions
diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index 5a806e3db7..ee279130b3 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -169,9 +169,9 @@ enum ESimpleCounters { COUNTER_IN_FLIGHT_OPS_TxUpdateMainTableOnIndexMove = 137 [(CounterOpts) = {Name: "InFlightOps/UpdateMainTableOnIndexMove"}]; COUNTER_IN_FLIGHT_OPS_TxAllocatePQ = 138 [(CounterOpts) = {Name: "InFlightOps/AllocatePQ"}]; - COUNTER_IN_FLIGHT_OPS_TxCreateCdcStreamAtTableWithSnapshot = 139 [(CounterOpts) = {Name: "InFlightOps/CreateCdcStreamAtTableWithSnapshot"}]; - - COUNTER_IN_FLIGHT_OPS_TxAlterExtSubDomainCreateHive = 140 [(CounterOpts) = {Name: "InFlightOps/AlterExtSubDomainCreateHive"}]; + COUNTER_IN_FLIGHT_OPS_TxCreateCdcStreamAtTableWithInitialScan = 139 [(CounterOpts) = {Name: "InFlightOps/CreateCdcStreamAtTableWithInitialScan"}]; + COUNTER_IN_FLIGHT_OPS_TxAlterExtSubDomainCreateHive = 140 [(CounterOpts) = {Name: "InFlightOps/AlterExtSubDomainCreateHive"}]; + COUNTER_IN_FLIGHT_OPS_TxAlterCdcStreamAtTableDropSnapshot = 141 [(CounterOpts) = {Name: "InFlightOps/AlterCdcStreamAtTableDropSnapshot"}]; } enum ECumulativeCounters { @@ -274,10 +274,9 @@ enum ECumulativeCounters { COUNTER_FINISHED_OPS_TxUpdateMainTableOnIndexMove = 82 [(CounterOpts) = {Name: "FinishedOps/UpdateMainTableOnIndexMove"}]; COUNTER_FINISHED_OPS_TxAllocatePQ = 83 [(CounterOpts) = {Name: "FinishedOps/AllocatePQ"}]; - COUNTER_FINISHED_OPS_TxCreateCdcStreamAtTableWithSnapshot = 84 [(CounterOpts) = {Name: "FinishedOps/CreateCdcStreamAtTableWithSnapshot"}]; - - COUNTER_FINISHED_OPS_TxAlterExtSubDomainCreateHive = 85 [(CounterOpts) = {Name: "FinishedOps/AlterExtSubDomainCreateHive"}]; - + COUNTER_FINISHED_OPS_TxCreateCdcStreamAtTableWithInitialScan = 84 [(CounterOpts) = {Name: "FinishedOps/CreateCdcStreamAtTableWithInitialScan"}]; + COUNTER_FINISHED_OPS_TxAlterExtSubDomainCreateHive = 85 [(CounterOpts) = {Name: "FinishedOps/AlterExtSubDomainCreateHive"}]; + COUNTER_FINISHED_OPS_TxAlterCdcStreamAtTableDropSnapshot = 86 [(CounterOpts) = {Name: "FinishedOps/AlterCdcStreamAtTableDropSnapshot"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 21511d4c1c..7e3d329c39 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -736,8 +736,13 @@ message TAlterCdcStream { message TDisable { } + message TGetReady { + optional uint64 LockTxId = 1; + } + oneof Action { TDisable Disable = 3; + TGetReady GetReady = 4; } } diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 2c37a991a7..6d25d172da 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -402,10 +402,16 @@ message TCreateCdcStreamNotice { optional string SnapshotName = 4; } +message TSnapshot { + optional uint64 Step = 1; + optional uint64 TxId = 2; +} + message TAlterCdcStreamNotice { optional NKikimrProto.TPathID PathId = 1; optional uint64 TableSchemaVersion = 2; optional NKikimrSchemeOp.TCdcStreamDescription StreamDescription = 3; + optional TSnapshot DropSnapshot = 4; } message TDropCdcStreamNotice { diff --git a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp index 37e372b233..de503b1b54 100644 --- a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp @@ -29,19 +29,39 @@ public: const auto& params = schemeTx.GetAlterCdcStreamNotice(); const auto& streamDesc = params.GetStreamDescription(); + const auto streamPathId = PathIdFromPathId(streamDesc.GetPathId()); + const auto state = streamDesc.GetState(); const auto pathId = PathIdFromPathId(params.GetPathId()); Y_VERIFY(pathId.OwnerId == DataShard.GetPathOwnerId()); - const auto streamPathId = PathIdFromPathId(streamDesc.GetPathId()); - - Y_VERIFY_S(streamDesc.GetState() == NKikimrSchemeOp::ECdcStreamStateDisabled, "Unexpected alter cdc stream" - << ": desc# " << streamDesc.ShortDebugString()); - const auto version = params.GetTableSchemaVersion(); Y_VERIFY(version); - auto tableInfo = DataShard.AlterTableDisableCdcStream(ctx, txc, pathId, version, streamPathId); + TUserTable::TPtr tableInfo; + switch (state) { + case NKikimrSchemeOp::ECdcStreamStateDisabled: + case NKikimrSchemeOp::ECdcStreamStateReady: + tableInfo = DataShard.AlterTableSwitchCdcStreamState(ctx, txc, pathId, version, streamPathId, state); + if (state == NKikimrSchemeOp::ECdcStreamStateReady) { + if (params.HasDropSnapshot()) { + const auto& snapshot = params.GetDropSnapshot(); + Y_VERIFY(snapshot.GetStep() != 0); + + const TSnapshotKey key(pathId.OwnerId, pathId.LocalPathId, snapshot.GetStep(), snapshot.GetTxId()); + DataShard.GetSnapshotManager().RemoveSnapshot(txc.DB, key); + } else { + Y_VERIFY_DEBUG(false, "Absent snapshot"); + } + } + break; + + default: + Y_FAIL_S("Unexpected alter cdc stream" + << ": params# " << params.ShortDebugString()); + } + + Y_VERIFY(tableInfo); DataShard.AddUserTable(pathId, tableInfo); if (tableInfo->NeedSchemaSnapshots()) { diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 19b32a4290..a8f6565a10 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -1255,13 +1255,13 @@ TUserTable::TPtr TDataShard::AlterTableAddCdcStream( return tableInfo; } -TUserTable::TPtr TDataShard::AlterTableDisableCdcStream( +TUserTable::TPtr TDataShard::AlterTableSwitchCdcStreamState( const TActorContext& ctx, TTransactionContext& txc, const TPathId& pathId, ui64 tableSchemaVersion, - const TPathId& streamPathId) + const TPathId& streamPathId, NKikimrSchemeOp::ECdcStreamState state) { auto tableInfo = AlterTableSchemaVersion(ctx, txc, pathId, tableSchemaVersion, false); - tableInfo->DisableCdcStream(streamPathId); + tableInfo->SwitchCdcStreamState(streamPathId, state); NIceDb::TNiceDb db(txc.DB); PersistUserTable(db, pathId.LocalPathId, *tableInfo); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index d7e554f826..874fa1f885 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1598,10 +1598,10 @@ public: const TPathId& pathId, ui64 tableSchemaVersion, const NKikimrSchemeOp::TCdcStreamDescription& streamDesc); - TUserTable::TPtr AlterTableDisableCdcStream( + TUserTable::TPtr AlterTableSwitchCdcStreamState( const TActorContext& ctx, TTransactionContext& txc, const TPathId& pathId, ui64 tableSchemaVersion, - const TPathId& streamPathId); + const TPathId& streamPathId, NKikimrSchemeOp::ECdcStreamState state); TUserTable::TPtr AlterTableDropCdcStream( const TActorContext& ctx, TTransactionContext& txc, diff --git a/ydb/core/tx/datashard/datashard_user_table.cpp b/ydb/core/tx/datashard/datashard_user_table.cpp index c78cac2c91..8a8016c475 100644 --- a/ydb/core/tx/datashard/datashard_user_table.cpp +++ b/ydb/core/tx/datashard/datashard_user_table.cpp @@ -129,13 +129,13 @@ void TUserTable::AddCdcStream(const NKikimrSchemeOp::TCdcStreamDescription& stre SetSchema(schema); } -void TUserTable::DisableCdcStream(const TPathId& streamPathId) { +void TUserTable::SwitchCdcStreamState(const TPathId& streamPathId, TCdcStream::EState state) { auto it = CdcStreams.find(streamPathId); if (it == CdcStreams.end()) { return; } - it->second.State = TCdcStream::EState::ECdcStreamStateDisabled; + it->second.State = state; NKikimrSchemeOp::TTableDescription schema; GetSchema(schema); @@ -145,7 +145,7 @@ void TUserTable::DisableCdcStream(const TPathId& streamPathId) { continue; } - it->SetState(TCdcStream::EState::ECdcStreamStateDisabled); + it->SetState(state); SetSchema(schema); return; diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h index 9f9af09506..409e677388 100644 --- a/ydb/core/tx/datashard/datashard_user_table.h +++ b/ydb/core/tx/datashard/datashard_user_table.h @@ -411,7 +411,7 @@ struct TUserTable : public TThrRefBase { bool HasAsyncIndexes() const; void AddCdcStream(const NKikimrSchemeOp::TCdcStreamDescription& streamDesc); - void DisableCdcStream(const TPathId& streamPathId); + void SwitchCdcStreamState(const TPathId& streamPathId, TCdcStream::EState state); void DropCdcStream(const TPathId& streamPathId); bool HasCdcStreams() const; bool NeedSchemaSnapshots() const; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index c8649f0cf5..3514444a26 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -958,12 +958,14 @@ ISubOperationBase::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxSta return CreateNewCdcStreamImpl(NextPartId(), txState); case TTxState::ETxType::TxCreateCdcStreamAtTable: return CreateNewCdcStreamAtTable(NextPartId(), txState, false); - case TTxState::ETxType::TxCreateCdcStreamAtTableWithSnapshot: + case TTxState::ETxType::TxCreateCdcStreamAtTableWithInitialScan: return CreateNewCdcStreamAtTable(NextPartId(), txState, true); case TTxState::ETxType::TxAlterCdcStream: return CreateAlterCdcStreamImpl(NextPartId(), txState); case TTxState::ETxType::TxAlterCdcStreamAtTable: - return CreateAlterCdcStreamAtTable(NextPartId(), txState); + return CreateAlterCdcStreamAtTable(NextPartId(), txState, false); + case TTxState::ETxType::TxAlterCdcStreamAtTableDropSnapshot: + return CreateAlterCdcStreamAtTable(NextPartId(), txState, true); case TTxState::ETxType::TxDropCdcStream: return CreateDropCdcStreamImpl(NextPartId(), txState); case TTxState::ETxType::TxDropCdcStreamAtTable: diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp index 43de78d88d..2050f0a05f 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp @@ -147,19 +147,24 @@ public: } } - auto guard = context.DbGuard(); - context.DbChanges.PersistAlterCdcStream(streamPath.Base()->PathId); - context.DbChanges.PersistTxState(OperationId); - Y_VERIFY(context.SS->CdcStreams.contains(streamPath.Base()->PathId)); auto stream = context.SS->CdcStreams.at(streamPath.Base()->PathId); - auto streamAlter = stream->CreateNextVersion(); - Y_VERIFY(streamAlter); + TCdcStreamInfo::EState newState = TCdcStreamInfo::EState::ECdcStreamStateInvalid; switch (op.GetActionCase()) { case NKikimrSchemeOp::TAlterCdcStream::kDisable: - streamAlter->State = TCdcStreamInfo::EState::ECdcStreamStateDisabled; + newState = TCdcStreamInfo::EState::ECdcStreamStateDisabled; + break; + case NKikimrSchemeOp::TAlterCdcStream::kGetReady: + if (stream->State == TCdcStreamInfo::EState::ECdcStreamStateScan) { + newState = TCdcStreamInfo::EState::ECdcStreamStateReady; + } else { + result->SetError(NKikimrScheme::StatusPreconditionFailed, TStringBuilder() + << "Cannot switch to ready state" + << ": current# " << stream->State); + return result; + } break; default: result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder() @@ -167,6 +172,21 @@ public: return result; } + auto guard = context.DbGuard(); + context.MemChanges.GrabPath(context.SS, streamPath.Base()->PathId); + context.MemChanges.GrabCdcStream(context.SS, streamPath.Base()->PathId); + context.MemChanges.GrabNewTxState(context.SS, OperationId); + + context.DbChanges.PersistPath(streamPath.Base()->PathId); + context.DbChanges.PersistAlterCdcStream(streamPath.Base()->PathId); + context.DbChanges.PersistTxState(OperationId); + + auto streamAlter = stream->CreateNextVersion(); + Y_VERIFY(streamAlter); + + Y_VERIFY(newState != TCdcStreamInfo::EState::ECdcStreamStateInvalid); + streamAlter->State = newState; + Y_VERIFY(!context.SS->FindTx(OperationId)); auto& txState = context.SS->CreateTx(OperationId, TTxState::TxAlterCdcStream, streamPath.Base()->PathId); txState.State = TTxState::Propose; @@ -181,8 +201,9 @@ public: return result; } - void AbortPropose(TOperationContext&) override { - Y_FAIL("no AbortPropose for TAlterCdcStream"); + void AbortPropose(TOperationContext& context) override { + LOG_N("TAlterCdcStream AbortPropose" + << ": opId# " << OperationId); } void AbortUnsafe(TTxId txId, TOperationContext& context) override { @@ -234,6 +255,63 @@ public: }; // TConfigurePartsAtTable +class TConfigurePartsAtTableDropSnapshot: public TConfigurePartsAtTable { +protected: + void FillNotice(const TPathId& pathId, NKikimrTxDataShard::TFlatSchemeTransaction& tx, TOperationContext& context) const override { + TConfigurePartsAtTable::FillNotice(pathId, tx, context); + + Y_VERIFY(context.SS->TablesWithSnapshots.contains(pathId)); + const auto snapshotTxId = context.SS->TablesWithSnapshots.at(pathId); + + Y_VERIFY(context.SS->SnapshotsStepIds.contains(snapshotTxId)); + const auto snapshotStep = context.SS->SnapshotsStepIds.at(snapshotTxId); + + Y_VERIFY(tx.HasAlterCdcStreamNotice()); + auto& notice = *tx.MutableAlterCdcStreamNotice(); + + notice.MutableDropSnapshot()->SetStep(ui64(snapshotStep)); + notice.MutableDropSnapshot()->SetTxId(ui64(snapshotTxId)); + } + +public: + using TConfigurePartsAtTable::TConfigurePartsAtTable; + +}; // TConfigurePartsAtTableDropSnapshot + +class TProposeAtTableDropSnapshot: public NCdcStreamState::TProposeAtTable { +public: + using NCdcStreamState::TProposeAtTable::TProposeAtTable; + + bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { + NCdcStreamState::TProposeAtTable::HandleReply(ev, context); + + const auto* txState = context.SS->FindTx(OperationId); + Y_VERIFY(txState); + const auto& pathId = txState->TargetPathId; + + Y_VERIFY(context.SS->TablesWithSnapshots.contains(pathId)); + const auto snapshotTxId = context.SS->TablesWithSnapshots.at(pathId); + + auto it = context.SS->SnapshotTables.find(snapshotTxId); + if (it != context.SS->SnapshotTables.end()) { + it->second.erase(pathId); + if (it->second.empty()) { + context.SS->SnapshotTables.erase(it); + } + } + + context.SS->SnapshotsStepIds.erase(snapshotTxId); + context.SS->TablesWithSnapshots.erase(pathId); + + NIceDb::TNiceDb db(context.GetDB()); + context.SS->PersistDropSnapshot(db, snapshotTxId, pathId); + + context.SS->TabletCounters->Simple()[COUNTER_SNAPSHOTS_COUNT].Sub(1); + return true; + } + +}; // TProposeAtTableDropSnapshot + class TAlterCdcStreamAtTable: public TSubOperation { static TTxState::ETxState NextState() { return TTxState::ConfigureParts; @@ -259,9 +337,17 @@ class TAlterCdcStreamAtTable: public TSubOperation { switch (state) { case TTxState::Waiting: case TTxState::ConfigureParts: - return MakeHolder<TConfigurePartsAtTable>(OperationId); + if (DropSnapshot) { + return MakeHolder<TConfigurePartsAtTableDropSnapshot>(OperationId); + } else { + return MakeHolder<TConfigurePartsAtTable>(OperationId); + } case TTxState::Propose: - return MakeHolder<NCdcStreamState::TProposeAtTable>(OperationId); + if (DropSnapshot) { + return MakeHolder<TProposeAtTableDropSnapshot>(OperationId); + } else { + return MakeHolder<NCdcStreamState::TProposeAtTable>(OperationId); + } case TTxState::ProposedWaitParts: return MakeHolder<NTableState::TProposedWaitParts>(OperationId); case TTxState::Done: @@ -272,7 +358,17 @@ class TAlterCdcStreamAtTable: public TSubOperation { } public: - using TSubOperation::TSubOperation; + explicit TAlterCdcStreamAtTable(TOperationId id, const TTxTransaction& tx, bool dropSnapshot) + : TSubOperation(id, tx) + , DropSnapshot(dropSnapshot) + { + } + + explicit TAlterCdcStreamAtTable(TOperationId id, TTxState::ETxState state, bool dropSnapshot) + : TSubOperation(id, state) + , DropSnapshot(dropSnapshot) + { + } THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override { const auto& workingDir = Transaction.GetWorkingDir(); @@ -336,6 +432,17 @@ public: return result; } + if (DropSnapshot && !context.SS->TablesWithSnapshots.contains(tablePath.Base()->PathId)) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, TStringBuilder() << "Table has no snapshots" + << ": pathId# " << tablePath.Base()->PathId); + return result; + } + + auto guard = context.DbGuard(); + context.MemChanges.GrabPath(context.SS, tablePath.Base()->PathId); + context.MemChanges.GrabNewTxState(context.SS, OperationId); + + context.DbChanges.PersistPath(tablePath.Base()->PathId); context.DbChanges.PersistTxState(OperationId); Y_VERIFY(context.SS->Tables.contains(tablePath.Base()->PathId)); @@ -350,8 +457,12 @@ public: Y_VERIFY(stream->AlterVersion != 0); Y_VERIFY(stream->AlterData); + const auto txType = DropSnapshot + ? TTxState::TxAlterCdcStreamAtTableDropSnapshot + : TTxState::TxAlterCdcStreamAtTable; + Y_VERIFY(!context.SS->FindTx(OperationId)); - auto& txState = context.SS->CreateTx(OperationId, TTxState::TxAlterCdcStreamAtTable, tablePath.Base()->PathId); + auto& txState = context.SS->CreateTx(OperationId, txType, tablePath.Base()->PathId); txState.State = TTxState::ConfigureParts; tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAlter; @@ -367,8 +478,9 @@ public: return result; } - void AbortPropose(TOperationContext&) override { - Y_FAIL("no AbortPropose for TAlterCdcStreamAtTable"); + void AbortPropose(TOperationContext& context) override { + LOG_N("TAlterCdcStreamAtTable AbortPropose" + << ": opId# " << OperationId); } void AbortUnsafe(TTxId txId, TOperationContext& context) override { @@ -378,6 +490,9 @@ public: context.OnComplete.DoneOperation(OperationId); } +private: + const bool DropSnapshot; + }; // TAlterCdcStreamAtTable } // anonymous @@ -390,12 +505,12 @@ ISubOperationBase::TPtr CreateAlterCdcStreamImpl(TOperationId id, TTxState::ETxS return MakeSubOperation<TAlterCdcStream>(id, state); } -ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, const TTxTransaction& tx) { - return MakeSubOperation<TAlterCdcStreamAtTable>(id, tx); +ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, const TTxTransaction& tx, bool dropSnapshot) { + return MakeSubOperation<TAlterCdcStreamAtTable>(id, tx, dropSnapshot); } -ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, TTxState::ETxState state) { - return MakeSubOperation<TAlterCdcStreamAtTable>(id, state); +ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, TTxState::ETxState state, bool dropSnapshot) { + return MakeSubOperation<TAlterCdcStreamAtTable>(id, state, dropSnapshot); } TVector<ISubOperationBase::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) { @@ -461,6 +576,10 @@ TVector<ISubOperationBase::TPtr> CreateAlterCdcStream(TOperationId opId, const T auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl); outTx.MutableAlterCdcStream()->CopyFrom(op); + if (op.HasGetReady()) { + outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId()); + } + result.push_back(CreateAlterCdcStreamImpl(NextPartId(opId, result), outTx)); } @@ -468,7 +587,21 @@ TVector<ISubOperationBase::TPtr> CreateAlterCdcStream(TOperationId opId, const T auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamAtTable); outTx.MutableAlterCdcStream()->CopyFrom(op); - result.push_back(CreateAlterCdcStreamAtTable(NextPartId(opId, result), outTx)); + if (op.HasGetReady()) { + outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId()); + } + + result.push_back(CreateAlterCdcStreamAtTable(NextPartId(opId, result), outTx, op.HasGetReady())); + } + + if (op.HasGetReady()) { + auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropLock); + outTx.SetFailOnExist(true); + outTx.SetInternal(true); + outTx.MutableLockConfig()->SetName(tablePath.LeafName()); + outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId()); + + result.push_back(DropLock(NextPartId(opId, result), outTx)); } return result; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp index 7459536435..218c0c921b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp @@ -351,10 +351,12 @@ void NTableState::UpdatePartitioningForTableModification(TOperationId operationI commonShardOp = TTxState::ConfigureParts; } else if (txState.TxType == TTxState::TxCreateCdcStreamAtTable) { commonShardOp = TTxState::ConfigureParts; - } else if (txState.TxType == TTxState::TxCreateCdcStreamAtTableWithSnapshot) { + } else if (txState.TxType == TTxState::TxCreateCdcStreamAtTableWithInitialScan) { commonShardOp = TTxState::ConfigureParts; } else if (txState.TxType == TTxState::TxAlterCdcStreamAtTable) { commonShardOp = TTxState::ConfigureParts; + } else if (txState.TxType == TTxState::TxAlterCdcStreamAtTableDropSnapshot) { + commonShardOp = TTxState::ConfigureParts; } else if (txState.TxType == TTxState::TxDropCdcStreamAtTable) { commonShardOp = TTxState::ConfigureParts; } else { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index e51b19a593..6ae047a6a1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -1048,8 +1048,9 @@ class TConfigurePartsAtTable: public TSubOperationState { static bool IsExpectedTxType(TTxState::ETxType txType) { switch (txType) { case TTxState::TxCreateCdcStreamAtTable: - case TTxState::TxCreateCdcStreamAtTableWithSnapshot: + case TTxState::TxCreateCdcStreamAtTableWithInitialScan: case TTxState::TxAlterCdcStreamAtTable: + case TTxState::TxAlterCdcStreamAtTableDropSnapshot: case TTxState::TxDropCdcStreamAtTable: return true; default: @@ -1126,8 +1127,9 @@ class TProposeAtTable: public TSubOperationState { static bool IsExpectedTxType(TTxState::ETxType txType) { switch (txType) { case TTxState::TxCreateCdcStreamAtTable: - case TTxState::TxCreateCdcStreamAtTableWithSnapshot: + case TTxState::TxCreateCdcStreamAtTableWithInitialScan: case TTxState::TxAlterCdcStreamAtTable: + case TTxState::TxAlterCdcStreamAtTableDropSnapshot: case TTxState::TxDropCdcStreamAtTable: return true; default: diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 4262bc92dc..1dc4a69f17 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -302,7 +302,7 @@ public: }; // TConfigurePartsAtTable -class TProposeAtTableWithSnapshot: public NCdcStreamState::TProposeAtTable { +class TProposeAtTableWithInitialScan: public NCdcStreamState::TProposeAtTable { public: using NCdcStreamState::TProposeAtTable::TProposeAtTable; @@ -318,7 +318,7 @@ public: return true; } -}; // TProposeAtTableWithSnapshot +}; // TProposeAtTableWithInitialScan class TNewCdcStreamAtTable: public TSubOperation { static TTxState::ETxState NextState() { @@ -346,7 +346,7 @@ class TNewCdcStreamAtTable: public TSubOperation { return MakeHolder<TConfigurePartsAtTable>(OperationId); case TTxState::Propose: if (InitialScan) { - return MakeHolder<TProposeAtTableWithSnapshot>(OperationId); + return MakeHolder<TProposeAtTableWithInitialScan>(OperationId); } else { return MakeHolder<NCdcStreamState::TProposeAtTable>(OperationId); } @@ -444,6 +444,8 @@ public: auto guard = context.DbGuard(); context.MemChanges.GrabPath(context.SS, tablePath.Base()->PathId); context.MemChanges.GrabNewTxState(context.SS, OperationId); + + context.DbChanges.PersistPath(tablePath.Base()->PathId); context.DbChanges.PersistTxState(OperationId); if (InitialScan) { @@ -461,7 +463,7 @@ public: Y_VERIFY(!table->AlterData); const auto txType = InitialScan - ? TTxState::TxCreateCdcStreamAtTableWithSnapshot + ? TTxState::TxCreateCdcStreamAtTableWithInitialScan : TTxState::TxCreateCdcStreamAtTable; Y_VERIFY(!context.SS->FindTx(OperationId)); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp index 71c0f44c5e..1174884952 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp @@ -135,9 +135,16 @@ public: .IsAtLocalSchemeShard() .IsResolved() .NotUnderDeleting() - .NotUnderOperation() .IsCommonSensePath(); + if (checks) { + if (dstPath.IsUnderOperation()) { // may be part of a consistent operation + checks.IsUnderTheSameOperation(OperationId.GetTxId()); + } else { + checks.NotUnderOperation(); + } + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); if (dstPath.IsResolved()) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index 121871315d..188d045df2 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -380,8 +380,8 @@ ISubOperationBase::TPtr CreateNewCdcStreamAtTable(TOperationId id, TTxState::ETx TVector<ISubOperationBase::TPtr> CreateAlterCdcStream(TOperationId id, const TTxTransaction& tx, TOperationContext& context); ISubOperationBase::TPtr CreateAlterCdcStreamImpl(TOperationId id, const TTxTransaction& tx); ISubOperationBase::TPtr CreateAlterCdcStreamImpl(TOperationId id, TTxState::ETxState state); -ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, const TTxTransaction& tx); -ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, TTxState::ETxState state); +ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, const TTxTransaction& tx, bool dropSnapshot); +ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, TTxState::ETxState state, bool dropSnapshot); // Drop TVector<ISubOperationBase::TPtr> CreateDropCdcStream(TOperationId id, const TTxTransaction& tx, TOperationContext& context); ISubOperationBase::TPtr CreateDropCdcStreamImpl(TOperationId id, const TTxTransaction& tx); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 895952147e..503bc848e7 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1400,8 +1400,9 @@ TPathElement::EPathState TSchemeShard::CalcPathState(TTxState::ETxType txType, T case TTxState::TxAlterColumnTable: case TTxState::TxAlterCdcStream: case TTxState::TxAlterCdcStreamAtTable: + case TTxState::TxAlterCdcStreamAtTableDropSnapshot: case TTxState::TxCreateCdcStreamAtTable: - case TTxState::TxCreateCdcStreamAtTableWithSnapshot: + case TTxState::TxCreateCdcStreamAtTableWithInitialScan: case TTxState::TxDropCdcStreamAtTable: case TTxState::TxAlterSequence: case TTxState::TxAlterReplication: @@ -2079,13 +2080,11 @@ void TSchemeShard::PersistSnapshotTable(NIceDb::TNiceDb& db, const TTxId snapsho } void TSchemeShard::PersistSnapshotStepId(NIceDb::TNiceDb& db, const TTxId snapshotId, const TStepId stepId) { - db.Table<Schema::SnapshotSteps>().Key(snapshotId).Update( - NIceDb::TUpdate<Schema::SnapshotSteps::StepId>(stepId)); + db.Table<Schema::SnapshotSteps>().Key(snapshotId).Update(NIceDb::TUpdate<Schema::SnapshotSteps::StepId>(stepId)); } void TSchemeShard::PersistDropSnapshot(NIceDb::TNiceDb& db, const TTxId snapshotId, const TPathId tableId) { db.Table<Schema::SnapshotTables>().Key(snapshotId, tableId.OwnerId, tableId.LocalPathId).Delete(); - db.Table<Schema::SnapshotSteps>().Key(snapshotId).Delete(); } diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h index d7a30ef4ef..b4982edabd 100644 --- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h +++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h @@ -116,8 +116,9 @@ struct TTxState { item(TxDropBlobDepot, 70) \ item(TxUpdateMainTableOnIndexMove, 71) \ item(TxAllocatePQ, 72) \ - item(TxCreateCdcStreamAtTableWithSnapshot, 73) \ + item(TxCreateCdcStreamAtTableWithInitialScan, 73) \ item(TxAlterExtSubDomainCreateHive, 74) \ + item(TxAlterCdcStreamAtTableDropSnapshot, 75) \ // TX_STATE_TYPE_ENUM @@ -328,7 +329,7 @@ struct TTxState { return true; case TxInitializeBuildIndex: //this is more like alter case TxCreateCdcStreamAtTable: - case TxCreateCdcStreamAtTableWithSnapshot: + case TxCreateCdcStreamAtTableWithInitialScan: return false; case TxCreateLock: //this is more like alter case TxDropLock: //this is more like alter @@ -378,6 +379,7 @@ struct TTxState { case TxAlterSolomonVolume: case TxAlterCdcStream: case TxAlterCdcStreamAtTable: + case TxAlterCdcStreamAtTableDropSnapshot: case TxAlterSequence: case TxAlterReplication: case TxAlterBlobDepot: @@ -429,7 +431,7 @@ struct TTxState { case TxFillIndex: case TxCreateCdcStream: case TxCreateCdcStreamAtTable: - case TxCreateCdcStreamAtTableWithSnapshot: + case TxCreateCdcStreamAtTableWithInitialScan: case TxCreateSequence: case TxCreateReplication: case TxCreateBlobDepot: @@ -464,6 +466,7 @@ struct TTxState { case TxAlterSolomonVolume: case TxAlterCdcStream: case TxAlterCdcStreamAtTable: + case TxAlterCdcStreamAtTableDropSnapshot: case TxAlterSequence: case TxAlterReplication: case TxAlterBlobDepot: @@ -518,7 +521,7 @@ struct TTxState { case TxCreateTableIndex: case TxCreateCdcStream: case TxCreateCdcStreamAtTable: - case TxCreateCdcStreamAtTableWithSnapshot: + case TxCreateCdcStreamAtTableWithInitialScan: case TxCreateSequence: case TxCreateReplication: case TxCreateBlobDepot: @@ -551,6 +554,7 @@ struct TTxState { case TxAlterSolomonVolume: case TxAlterCdcStream: case TxAlterCdcStreamAtTable: + case TxAlterCdcStreamAtTableDropSnapshot: case TxMoveTable: case TxMoveTableIndex: case TxAlterSequence: diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp index 42ee88dd1f..13f0fe2668 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp @@ -173,6 +173,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { } env.TestWaitNotification(runtime, txId); + const auto lockTxId = txId; TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { NLs::PathExist, @@ -180,6 +181,71 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateScan), }); + + auto testAlterCdcStream = [&runtime](ui64 txId, const TString& parentPath, const TString& schema, + const TMaybe<ui64>& lockTxId, TEvSchemeShard::EStatus expectedStatus = NKikimrScheme::StatusAccepted) + { + auto request = AlterCdcStreamRequest(txId, parentPath, schema); + if (lockTxId) { + request->Record.MutableTransaction(0)->MutableLockGuard()->SetOwnerTxId(*lockTxId); + } + + ForwardToTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor(), request); + TestModificationResult(runtime, txId, expectedStatus); + }; + + // without guard & lockTxId + testAlterCdcStream(++txId, "/MyRoot", R"( + TableName: "Table" + StreamName: "Stream" + GetReady { + LockTxId: 0 + } + )", {}, NKikimrScheme::StatusMultipleModifications); + + // with guard, without lockTxId + testAlterCdcStream(++txId, "/MyRoot", R"( + TableName: "Table" + StreamName: "Stream" + GetReady { + LockTxId: 0 + } + )", lockTxId, NKikimrScheme::StatusMultipleModifications); + + // without guard, with lockTxId + testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"( + TableName: "Table" + StreamName: "Stream" + GetReady { + LockTxId: %lu + } + )", lockTxId), {}, NKikimrScheme::StatusMultipleModifications); + + // with guard & lockTxId + testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"( + TableName: "Table" + StreamName: "Stream" + GetReady { + LockTxId: %lu + } + )", lockTxId), lockTxId); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::PathExist, + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly), + NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), + NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady), + }); + + // another try should fail + testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"( + TableName: "Table" + StreamName: "Stream" + GetReady { + LockTxId: %lu + } + )", lockTxId), {}, NKikimrScheme::StatusPreconditionFailed); } Y_UNIT_TEST(InitialScanShouldSucceed) { diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp index e94857120b..a4ddb9beb6 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp @@ -65,7 +65,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { CreateStream({}, true); } - Y_UNIT_TEST(AlterStream) { + Y_UNIT_TEST(DisableStream) { TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { { @@ -116,6 +116,65 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { }); } + Y_UNIT_TEST(GetReadyStream) { + TTestWithReboots t; + t.GetTestEnvOptions().EnableChangefeedInitialScan(true); + + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + State: ECdcStreamStateScan + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::PathExist, + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly), + NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), + NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateScan), + }); + } + + const auto lockTxId = t.TxId; + auto request = AlterCdcStreamRequest(++t.TxId, "/MyRoot", Sprintf(R"( + TableName: "Table" + StreamName: "Stream" + GetReady { + LockTxId: %lu + } + )", lockTxId)); + request->Record.MutableTransaction(0)->MutableLockGuard()->SetOwnerTxId(lockTxId); + + t.TestEnv->ReliablePropose(runtime, request, { + NKikimrScheme::StatusAccepted, + NKikimrScheme::StatusMultipleModifications, + }); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::PathExist, + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly), + NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), + NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady), + }); + }); + } + Y_UNIT_TEST(DropStream) { TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { |