aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-01-13 18:13:21 +0300
committerilnaz <ilnaz@ydb.tech>2023-01-13 18:13:21 +0300
commit6ba153ea01c99e3084f4ca75e3260f89da8d8527 (patch)
tree591307dc81e08e5b1945941e0c6671740a5901f6
parentade5487ce6008abe6796ba9b29da4b29d4b4337f (diff)
downloadydb-6ba153ea01c99e3084f4ca75e3260f89da8d8527.tar.gz
Switch cdc stream to ready state, drop lock & snapshot
-rw-r--r--ydb/core/protos/counters_schemeshard.proto13
-rw-r--r--ydb/core/protos/flat_scheme_op.proto5
-rw-r--r--ydb/core/protos/tx_datashard.proto6
-rw-r--r--ydb/core/tx/datashard/alter_cdc_stream_unit.cpp32
-rw-r--r--ydb/core/tx/datashard/datashard.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h4
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation.cpp6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp173
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.h6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp10
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp9
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_part.h4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp7
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_tx_infly.h12
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream.cpp66
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp61
19 files changed, 369 insertions, 63 deletions
diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto
index 5a806e3db7..ee279130b3 100644
--- a/ydb/core/protos/counters_schemeshard.proto
+++ b/ydb/core/protos/counters_schemeshard.proto
@@ -169,9 +169,9 @@ enum ESimpleCounters {
COUNTER_IN_FLIGHT_OPS_TxUpdateMainTableOnIndexMove = 137 [(CounterOpts) = {Name: "InFlightOps/UpdateMainTableOnIndexMove"}];
COUNTER_IN_FLIGHT_OPS_TxAllocatePQ = 138 [(CounterOpts) = {Name: "InFlightOps/AllocatePQ"}];
- COUNTER_IN_FLIGHT_OPS_TxCreateCdcStreamAtTableWithSnapshot = 139 [(CounterOpts) = {Name: "InFlightOps/CreateCdcStreamAtTableWithSnapshot"}];
-
- COUNTER_IN_FLIGHT_OPS_TxAlterExtSubDomainCreateHive = 140 [(CounterOpts) = {Name: "InFlightOps/AlterExtSubDomainCreateHive"}];
+ COUNTER_IN_FLIGHT_OPS_TxCreateCdcStreamAtTableWithInitialScan = 139 [(CounterOpts) = {Name: "InFlightOps/CreateCdcStreamAtTableWithInitialScan"}];
+ COUNTER_IN_FLIGHT_OPS_TxAlterExtSubDomainCreateHive = 140 [(CounterOpts) = {Name: "InFlightOps/AlterExtSubDomainCreateHive"}];
+ COUNTER_IN_FLIGHT_OPS_TxAlterCdcStreamAtTableDropSnapshot = 141 [(CounterOpts) = {Name: "InFlightOps/AlterCdcStreamAtTableDropSnapshot"}];
}
enum ECumulativeCounters {
@@ -274,10 +274,9 @@ enum ECumulativeCounters {
COUNTER_FINISHED_OPS_TxUpdateMainTableOnIndexMove = 82 [(CounterOpts) = {Name: "FinishedOps/UpdateMainTableOnIndexMove"}];
COUNTER_FINISHED_OPS_TxAllocatePQ = 83 [(CounterOpts) = {Name: "FinishedOps/AllocatePQ"}];
- COUNTER_FINISHED_OPS_TxCreateCdcStreamAtTableWithSnapshot = 84 [(CounterOpts) = {Name: "FinishedOps/CreateCdcStreamAtTableWithSnapshot"}];
-
- COUNTER_FINISHED_OPS_TxAlterExtSubDomainCreateHive = 85 [(CounterOpts) = {Name: "FinishedOps/AlterExtSubDomainCreateHive"}];
-
+ COUNTER_FINISHED_OPS_TxCreateCdcStreamAtTableWithInitialScan = 84 [(CounterOpts) = {Name: "FinishedOps/CreateCdcStreamAtTableWithInitialScan"}];
+ COUNTER_FINISHED_OPS_TxAlterExtSubDomainCreateHive = 85 [(CounterOpts) = {Name: "FinishedOps/AlterExtSubDomainCreateHive"}];
+ COUNTER_FINISHED_OPS_TxAlterCdcStreamAtTableDropSnapshot = 86 [(CounterOpts) = {Name: "FinishedOps/AlterCdcStreamAtTableDropSnapshot"}];
}
enum EPercentileCounters {
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 21511d4c1c..7e3d329c39 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -736,8 +736,13 @@ message TAlterCdcStream {
message TDisable {
}
+ message TGetReady {
+ optional uint64 LockTxId = 1;
+ }
+
oneof Action {
TDisable Disable = 3;
+ TGetReady GetReady = 4;
}
}
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 2c37a991a7..6d25d172da 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -402,10 +402,16 @@ message TCreateCdcStreamNotice {
optional string SnapshotName = 4;
}
+message TSnapshot {
+ optional uint64 Step = 1;
+ optional uint64 TxId = 2;
+}
+
message TAlterCdcStreamNotice {
optional NKikimrProto.TPathID PathId = 1;
optional uint64 TableSchemaVersion = 2;
optional NKikimrSchemeOp.TCdcStreamDescription StreamDescription = 3;
+ optional TSnapshot DropSnapshot = 4;
}
message TDropCdcStreamNotice {
diff --git a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
index 37e372b233..de503b1b54 100644
--- a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
+++ b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
@@ -29,19 +29,39 @@ public:
const auto& params = schemeTx.GetAlterCdcStreamNotice();
const auto& streamDesc = params.GetStreamDescription();
+ const auto streamPathId = PathIdFromPathId(streamDesc.GetPathId());
+ const auto state = streamDesc.GetState();
const auto pathId = PathIdFromPathId(params.GetPathId());
Y_VERIFY(pathId.OwnerId == DataShard.GetPathOwnerId());
- const auto streamPathId = PathIdFromPathId(streamDesc.GetPathId());
-
- Y_VERIFY_S(streamDesc.GetState() == NKikimrSchemeOp::ECdcStreamStateDisabled, "Unexpected alter cdc stream"
- << ": desc# " << streamDesc.ShortDebugString());
-
const auto version = params.GetTableSchemaVersion();
Y_VERIFY(version);
- auto tableInfo = DataShard.AlterTableDisableCdcStream(ctx, txc, pathId, version, streamPathId);
+ TUserTable::TPtr tableInfo;
+ switch (state) {
+ case NKikimrSchemeOp::ECdcStreamStateDisabled:
+ case NKikimrSchemeOp::ECdcStreamStateReady:
+ tableInfo = DataShard.AlterTableSwitchCdcStreamState(ctx, txc, pathId, version, streamPathId, state);
+ if (state == NKikimrSchemeOp::ECdcStreamStateReady) {
+ if (params.HasDropSnapshot()) {
+ const auto& snapshot = params.GetDropSnapshot();
+ Y_VERIFY(snapshot.GetStep() != 0);
+
+ const TSnapshotKey key(pathId.OwnerId, pathId.LocalPathId, snapshot.GetStep(), snapshot.GetTxId());
+ DataShard.GetSnapshotManager().RemoveSnapshot(txc.DB, key);
+ } else {
+ Y_VERIFY_DEBUG(false, "Absent snapshot");
+ }
+ }
+ break;
+
+ default:
+ Y_FAIL_S("Unexpected alter cdc stream"
+ << ": params# " << params.ShortDebugString());
+ }
+
+ Y_VERIFY(tableInfo);
DataShard.AddUserTable(pathId, tableInfo);
if (tableInfo->NeedSchemaSnapshots()) {
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 19b32a4290..a8f6565a10 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -1255,13 +1255,13 @@ TUserTable::TPtr TDataShard::AlterTableAddCdcStream(
return tableInfo;
}
-TUserTable::TPtr TDataShard::AlterTableDisableCdcStream(
+TUserTable::TPtr TDataShard::AlterTableSwitchCdcStreamState(
const TActorContext& ctx, TTransactionContext& txc,
const TPathId& pathId, ui64 tableSchemaVersion,
- const TPathId& streamPathId)
+ const TPathId& streamPathId, NKikimrSchemeOp::ECdcStreamState state)
{
auto tableInfo = AlterTableSchemaVersion(ctx, txc, pathId, tableSchemaVersion, false);
- tableInfo->DisableCdcStream(streamPathId);
+ tableInfo->SwitchCdcStreamState(streamPathId, state);
NIceDb::TNiceDb db(txc.DB);
PersistUserTable(db, pathId.LocalPathId, *tableInfo);
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index d7e554f826..874fa1f885 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1598,10 +1598,10 @@ public:
const TPathId& pathId, ui64 tableSchemaVersion,
const NKikimrSchemeOp::TCdcStreamDescription& streamDesc);
- TUserTable::TPtr AlterTableDisableCdcStream(
+ TUserTable::TPtr AlterTableSwitchCdcStreamState(
const TActorContext& ctx, TTransactionContext& txc,
const TPathId& pathId, ui64 tableSchemaVersion,
- const TPathId& streamPathId);
+ const TPathId& streamPathId, NKikimrSchemeOp::ECdcStreamState state);
TUserTable::TPtr AlterTableDropCdcStream(
const TActorContext& ctx, TTransactionContext& txc,
diff --git a/ydb/core/tx/datashard/datashard_user_table.cpp b/ydb/core/tx/datashard/datashard_user_table.cpp
index c78cac2c91..8a8016c475 100644
--- a/ydb/core/tx/datashard/datashard_user_table.cpp
+++ b/ydb/core/tx/datashard/datashard_user_table.cpp
@@ -129,13 +129,13 @@ void TUserTable::AddCdcStream(const NKikimrSchemeOp::TCdcStreamDescription& stre
SetSchema(schema);
}
-void TUserTable::DisableCdcStream(const TPathId& streamPathId) {
+void TUserTable::SwitchCdcStreamState(const TPathId& streamPathId, TCdcStream::EState state) {
auto it = CdcStreams.find(streamPathId);
if (it == CdcStreams.end()) {
return;
}
- it->second.State = TCdcStream::EState::ECdcStreamStateDisabled;
+ it->second.State = state;
NKikimrSchemeOp::TTableDescription schema;
GetSchema(schema);
@@ -145,7 +145,7 @@ void TUserTable::DisableCdcStream(const TPathId& streamPathId) {
continue;
}
- it->SetState(TCdcStream::EState::ECdcStreamStateDisabled);
+ it->SetState(state);
SetSchema(schema);
return;
diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h
index 9f9af09506..409e677388 100644
--- a/ydb/core/tx/datashard/datashard_user_table.h
+++ b/ydb/core/tx/datashard/datashard_user_table.h
@@ -411,7 +411,7 @@ struct TUserTable : public TThrRefBase {
bool HasAsyncIndexes() const;
void AddCdcStream(const NKikimrSchemeOp::TCdcStreamDescription& streamDesc);
- void DisableCdcStream(const TPathId& streamPathId);
+ void SwitchCdcStreamState(const TPathId& streamPathId, TCdcStream::EState state);
void DropCdcStream(const TPathId& streamPathId);
bool HasCdcStreams() const;
bool NeedSchemaSnapshots() const;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp
index c8649f0cf5..3514444a26 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp
@@ -958,12 +958,14 @@ ISubOperationBase::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxSta
return CreateNewCdcStreamImpl(NextPartId(), txState);
case TTxState::ETxType::TxCreateCdcStreamAtTable:
return CreateNewCdcStreamAtTable(NextPartId(), txState, false);
- case TTxState::ETxType::TxCreateCdcStreamAtTableWithSnapshot:
+ case TTxState::ETxType::TxCreateCdcStreamAtTableWithInitialScan:
return CreateNewCdcStreamAtTable(NextPartId(), txState, true);
case TTxState::ETxType::TxAlterCdcStream:
return CreateAlterCdcStreamImpl(NextPartId(), txState);
case TTxState::ETxType::TxAlterCdcStreamAtTable:
- return CreateAlterCdcStreamAtTable(NextPartId(), txState);
+ return CreateAlterCdcStreamAtTable(NextPartId(), txState, false);
+ case TTxState::ETxType::TxAlterCdcStreamAtTableDropSnapshot:
+ return CreateAlterCdcStreamAtTable(NextPartId(), txState, true);
case TTxState::ETxType::TxDropCdcStream:
return CreateDropCdcStreamImpl(NextPartId(), txState);
case TTxState::ETxType::TxDropCdcStreamAtTable:
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
index 43de78d88d..2050f0a05f 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
@@ -147,19 +147,24 @@ public:
}
}
- auto guard = context.DbGuard();
- context.DbChanges.PersistAlterCdcStream(streamPath.Base()->PathId);
- context.DbChanges.PersistTxState(OperationId);
-
Y_VERIFY(context.SS->CdcStreams.contains(streamPath.Base()->PathId));
auto stream = context.SS->CdcStreams.at(streamPath.Base()->PathId);
- auto streamAlter = stream->CreateNextVersion();
- Y_VERIFY(streamAlter);
+ TCdcStreamInfo::EState newState = TCdcStreamInfo::EState::ECdcStreamStateInvalid;
switch (op.GetActionCase()) {
case NKikimrSchemeOp::TAlterCdcStream::kDisable:
- streamAlter->State = TCdcStreamInfo::EState::ECdcStreamStateDisabled;
+ newState = TCdcStreamInfo::EState::ECdcStreamStateDisabled;
+ break;
+ case NKikimrSchemeOp::TAlterCdcStream::kGetReady:
+ if (stream->State == TCdcStreamInfo::EState::ECdcStreamStateScan) {
+ newState = TCdcStreamInfo::EState::ECdcStreamStateReady;
+ } else {
+ result->SetError(NKikimrScheme::StatusPreconditionFailed, TStringBuilder()
+ << "Cannot switch to ready state"
+ << ": current# " << stream->State);
+ return result;
+ }
break;
default:
result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder()
@@ -167,6 +172,21 @@ public:
return result;
}
+ auto guard = context.DbGuard();
+ context.MemChanges.GrabPath(context.SS, streamPath.Base()->PathId);
+ context.MemChanges.GrabCdcStream(context.SS, streamPath.Base()->PathId);
+ context.MemChanges.GrabNewTxState(context.SS, OperationId);
+
+ context.DbChanges.PersistPath(streamPath.Base()->PathId);
+ context.DbChanges.PersistAlterCdcStream(streamPath.Base()->PathId);
+ context.DbChanges.PersistTxState(OperationId);
+
+ auto streamAlter = stream->CreateNextVersion();
+ Y_VERIFY(streamAlter);
+
+ Y_VERIFY(newState != TCdcStreamInfo::EState::ECdcStreamStateInvalid);
+ streamAlter->State = newState;
+
Y_VERIFY(!context.SS->FindTx(OperationId));
auto& txState = context.SS->CreateTx(OperationId, TTxState::TxAlterCdcStream, streamPath.Base()->PathId);
txState.State = TTxState::Propose;
@@ -181,8 +201,9 @@ public:
return result;
}
- void AbortPropose(TOperationContext&) override {
- Y_FAIL("no AbortPropose for TAlterCdcStream");
+ void AbortPropose(TOperationContext& context) override {
+ LOG_N("TAlterCdcStream AbortPropose"
+ << ": opId# " << OperationId);
}
void AbortUnsafe(TTxId txId, TOperationContext& context) override {
@@ -234,6 +255,63 @@ public:
}; // TConfigurePartsAtTable
+class TConfigurePartsAtTableDropSnapshot: public TConfigurePartsAtTable {
+protected:
+ void FillNotice(const TPathId& pathId, NKikimrTxDataShard::TFlatSchemeTransaction& tx, TOperationContext& context) const override {
+ TConfigurePartsAtTable::FillNotice(pathId, tx, context);
+
+ Y_VERIFY(context.SS->TablesWithSnapshots.contains(pathId));
+ const auto snapshotTxId = context.SS->TablesWithSnapshots.at(pathId);
+
+ Y_VERIFY(context.SS->SnapshotsStepIds.contains(snapshotTxId));
+ const auto snapshotStep = context.SS->SnapshotsStepIds.at(snapshotTxId);
+
+ Y_VERIFY(tx.HasAlterCdcStreamNotice());
+ auto& notice = *tx.MutableAlterCdcStreamNotice();
+
+ notice.MutableDropSnapshot()->SetStep(ui64(snapshotStep));
+ notice.MutableDropSnapshot()->SetTxId(ui64(snapshotTxId));
+ }
+
+public:
+ using TConfigurePartsAtTable::TConfigurePartsAtTable;
+
+}; // TConfigurePartsAtTableDropSnapshot
+
+class TProposeAtTableDropSnapshot: public NCdcStreamState::TProposeAtTable {
+public:
+ using NCdcStreamState::TProposeAtTable::TProposeAtTable;
+
+ bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override {
+ NCdcStreamState::TProposeAtTable::HandleReply(ev, context);
+
+ const auto* txState = context.SS->FindTx(OperationId);
+ Y_VERIFY(txState);
+ const auto& pathId = txState->TargetPathId;
+
+ Y_VERIFY(context.SS->TablesWithSnapshots.contains(pathId));
+ const auto snapshotTxId = context.SS->TablesWithSnapshots.at(pathId);
+
+ auto it = context.SS->SnapshotTables.find(snapshotTxId);
+ if (it != context.SS->SnapshotTables.end()) {
+ it->second.erase(pathId);
+ if (it->second.empty()) {
+ context.SS->SnapshotTables.erase(it);
+ }
+ }
+
+ context.SS->SnapshotsStepIds.erase(snapshotTxId);
+ context.SS->TablesWithSnapshots.erase(pathId);
+
+ NIceDb::TNiceDb db(context.GetDB());
+ context.SS->PersistDropSnapshot(db, snapshotTxId, pathId);
+
+ context.SS->TabletCounters->Simple()[COUNTER_SNAPSHOTS_COUNT].Sub(1);
+ return true;
+ }
+
+}; // TProposeAtTableDropSnapshot
+
class TAlterCdcStreamAtTable: public TSubOperation {
static TTxState::ETxState NextState() {
return TTxState::ConfigureParts;
@@ -259,9 +337,17 @@ class TAlterCdcStreamAtTable: public TSubOperation {
switch (state) {
case TTxState::Waiting:
case TTxState::ConfigureParts:
- return MakeHolder<TConfigurePartsAtTable>(OperationId);
+ if (DropSnapshot) {
+ return MakeHolder<TConfigurePartsAtTableDropSnapshot>(OperationId);
+ } else {
+ return MakeHolder<TConfigurePartsAtTable>(OperationId);
+ }
case TTxState::Propose:
- return MakeHolder<NCdcStreamState::TProposeAtTable>(OperationId);
+ if (DropSnapshot) {
+ return MakeHolder<TProposeAtTableDropSnapshot>(OperationId);
+ } else {
+ return MakeHolder<NCdcStreamState::TProposeAtTable>(OperationId);
+ }
case TTxState::ProposedWaitParts:
return MakeHolder<NTableState::TProposedWaitParts>(OperationId);
case TTxState::Done:
@@ -272,7 +358,17 @@ class TAlterCdcStreamAtTable: public TSubOperation {
}
public:
- using TSubOperation::TSubOperation;
+ explicit TAlterCdcStreamAtTable(TOperationId id, const TTxTransaction& tx, bool dropSnapshot)
+ : TSubOperation(id, tx)
+ , DropSnapshot(dropSnapshot)
+ {
+ }
+
+ explicit TAlterCdcStreamAtTable(TOperationId id, TTxState::ETxState state, bool dropSnapshot)
+ : TSubOperation(id, state)
+ , DropSnapshot(dropSnapshot)
+ {
+ }
THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override {
const auto& workingDir = Transaction.GetWorkingDir();
@@ -336,6 +432,17 @@ public:
return result;
}
+ if (DropSnapshot && !context.SS->TablesWithSnapshots.contains(tablePath.Base()->PathId)) {
+ result->SetError(NKikimrScheme::StatusPreconditionFailed, TStringBuilder() << "Table has no snapshots"
+ << ": pathId# " << tablePath.Base()->PathId);
+ return result;
+ }
+
+ auto guard = context.DbGuard();
+ context.MemChanges.GrabPath(context.SS, tablePath.Base()->PathId);
+ context.MemChanges.GrabNewTxState(context.SS, OperationId);
+
+ context.DbChanges.PersistPath(tablePath.Base()->PathId);
context.DbChanges.PersistTxState(OperationId);
Y_VERIFY(context.SS->Tables.contains(tablePath.Base()->PathId));
@@ -350,8 +457,12 @@ public:
Y_VERIFY(stream->AlterVersion != 0);
Y_VERIFY(stream->AlterData);
+ const auto txType = DropSnapshot
+ ? TTxState::TxAlterCdcStreamAtTableDropSnapshot
+ : TTxState::TxAlterCdcStreamAtTable;
+
Y_VERIFY(!context.SS->FindTx(OperationId));
- auto& txState = context.SS->CreateTx(OperationId, TTxState::TxAlterCdcStreamAtTable, tablePath.Base()->PathId);
+ auto& txState = context.SS->CreateTx(OperationId, txType, tablePath.Base()->PathId);
txState.State = TTxState::ConfigureParts;
tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAlter;
@@ -367,8 +478,9 @@ public:
return result;
}
- void AbortPropose(TOperationContext&) override {
- Y_FAIL("no AbortPropose for TAlterCdcStreamAtTable");
+ void AbortPropose(TOperationContext& context) override {
+ LOG_N("TAlterCdcStreamAtTable AbortPropose"
+ << ": opId# " << OperationId);
}
void AbortUnsafe(TTxId txId, TOperationContext& context) override {
@@ -378,6 +490,9 @@ public:
context.OnComplete.DoneOperation(OperationId);
}
+private:
+ const bool DropSnapshot;
+
}; // TAlterCdcStreamAtTable
} // anonymous
@@ -390,12 +505,12 @@ ISubOperationBase::TPtr CreateAlterCdcStreamImpl(TOperationId id, TTxState::ETxS
return MakeSubOperation<TAlterCdcStream>(id, state);
}
-ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, const TTxTransaction& tx) {
- return MakeSubOperation<TAlterCdcStreamAtTable>(id, tx);
+ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, const TTxTransaction& tx, bool dropSnapshot) {
+ return MakeSubOperation<TAlterCdcStreamAtTable>(id, tx, dropSnapshot);
}
-ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, TTxState::ETxState state) {
- return MakeSubOperation<TAlterCdcStreamAtTable>(id, state);
+ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, TTxState::ETxState state, bool dropSnapshot) {
+ return MakeSubOperation<TAlterCdcStreamAtTable>(id, state, dropSnapshot);
}
TVector<ISubOperationBase::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) {
@@ -461,6 +576,10 @@ TVector<ISubOperationBase::TPtr> CreateAlterCdcStream(TOperationId opId, const T
auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl);
outTx.MutableAlterCdcStream()->CopyFrom(op);
+ if (op.HasGetReady()) {
+ outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId());
+ }
+
result.push_back(CreateAlterCdcStreamImpl(NextPartId(opId, result), outTx));
}
@@ -468,7 +587,21 @@ TVector<ISubOperationBase::TPtr> CreateAlterCdcStream(TOperationId opId, const T
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamAtTable);
outTx.MutableAlterCdcStream()->CopyFrom(op);
- result.push_back(CreateAlterCdcStreamAtTable(NextPartId(opId, result), outTx));
+ if (op.HasGetReady()) {
+ outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId());
+ }
+
+ result.push_back(CreateAlterCdcStreamAtTable(NextPartId(opId, result), outTx, op.HasGetReady()));
+ }
+
+ if (op.HasGetReady()) {
+ auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropLock);
+ outTx.SetFailOnExist(true);
+ outTx.SetInternal(true);
+ outTx.MutableLockConfig()->SetName(tablePath.LeafName());
+ outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId());
+
+ result.push_back(DropLock(NextPartId(opId, result), outTx));
}
return result;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
index 7459536435..218c0c921b 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
@@ -351,10 +351,12 @@ void NTableState::UpdatePartitioningForTableModification(TOperationId operationI
commonShardOp = TTxState::ConfigureParts;
} else if (txState.TxType == TTxState::TxCreateCdcStreamAtTable) {
commonShardOp = TTxState::ConfigureParts;
- } else if (txState.TxType == TTxState::TxCreateCdcStreamAtTableWithSnapshot) {
+ } else if (txState.TxType == TTxState::TxCreateCdcStreamAtTableWithInitialScan) {
commonShardOp = TTxState::ConfigureParts;
} else if (txState.TxType == TTxState::TxAlterCdcStreamAtTable) {
commonShardOp = TTxState::ConfigureParts;
+ } else if (txState.TxType == TTxState::TxAlterCdcStreamAtTableDropSnapshot) {
+ commonShardOp = TTxState::ConfigureParts;
} else if (txState.TxType == TTxState::TxDropCdcStreamAtTable) {
commonShardOp = TTxState::ConfigureParts;
} else {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
index e51b19a593..6ae047a6a1 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
@@ -1048,8 +1048,9 @@ class TConfigurePartsAtTable: public TSubOperationState {
static bool IsExpectedTxType(TTxState::ETxType txType) {
switch (txType) {
case TTxState::TxCreateCdcStreamAtTable:
- case TTxState::TxCreateCdcStreamAtTableWithSnapshot:
+ case TTxState::TxCreateCdcStreamAtTableWithInitialScan:
case TTxState::TxAlterCdcStreamAtTable:
+ case TTxState::TxAlterCdcStreamAtTableDropSnapshot:
case TTxState::TxDropCdcStreamAtTable:
return true;
default:
@@ -1126,8 +1127,9 @@ class TProposeAtTable: public TSubOperationState {
static bool IsExpectedTxType(TTxState::ETxType txType) {
switch (txType) {
case TTxState::TxCreateCdcStreamAtTable:
- case TTxState::TxCreateCdcStreamAtTableWithSnapshot:
+ case TTxState::TxCreateCdcStreamAtTableWithInitialScan:
case TTxState::TxAlterCdcStreamAtTable:
+ case TTxState::TxAlterCdcStreamAtTableDropSnapshot:
case TTxState::TxDropCdcStreamAtTable:
return true;
default:
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index 4262bc92dc..1dc4a69f17 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -302,7 +302,7 @@ public:
}; // TConfigurePartsAtTable
-class TProposeAtTableWithSnapshot: public NCdcStreamState::TProposeAtTable {
+class TProposeAtTableWithInitialScan: public NCdcStreamState::TProposeAtTable {
public:
using NCdcStreamState::TProposeAtTable::TProposeAtTable;
@@ -318,7 +318,7 @@ public:
return true;
}
-}; // TProposeAtTableWithSnapshot
+}; // TProposeAtTableWithInitialScan
class TNewCdcStreamAtTable: public TSubOperation {
static TTxState::ETxState NextState() {
@@ -346,7 +346,7 @@ class TNewCdcStreamAtTable: public TSubOperation {
return MakeHolder<TConfigurePartsAtTable>(OperationId);
case TTxState::Propose:
if (InitialScan) {
- return MakeHolder<TProposeAtTableWithSnapshot>(OperationId);
+ return MakeHolder<TProposeAtTableWithInitialScan>(OperationId);
} else {
return MakeHolder<NCdcStreamState::TProposeAtTable>(OperationId);
}
@@ -444,6 +444,8 @@ public:
auto guard = context.DbGuard();
context.MemChanges.GrabPath(context.SS, tablePath.Base()->PathId);
context.MemChanges.GrabNewTxState(context.SS, OperationId);
+
+ context.DbChanges.PersistPath(tablePath.Base()->PathId);
context.DbChanges.PersistTxState(OperationId);
if (InitialScan) {
@@ -461,7 +463,7 @@ public:
Y_VERIFY(!table->AlterData);
const auto txType = InitialScan
- ? TTxState::TxCreateCdcStreamAtTableWithSnapshot
+ ? TTxState::TxCreateCdcStreamAtTableWithInitialScan
: TTxState::TxCreateCdcStreamAtTable;
Y_VERIFY(!context.SS->FindTx(OperationId));
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp
index 71c0f44c5e..1174884952 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp
@@ -135,9 +135,16 @@ public:
.IsAtLocalSchemeShard()
.IsResolved()
.NotUnderDeleting()
- .NotUnderOperation()
.IsCommonSensePath();
+ if (checks) {
+ if (dstPath.IsUnderOperation()) { // may be part of a consistent operation
+ checks.IsUnderTheSameOperation(OperationId.GetTxId());
+ } else {
+ checks.NotUnderOperation();
+ }
+ }
+
if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
if (dstPath.IsResolved()) {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h
index 121871315d..188d045df2 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h
@@ -380,8 +380,8 @@ ISubOperationBase::TPtr CreateNewCdcStreamAtTable(TOperationId id, TTxState::ETx
TVector<ISubOperationBase::TPtr> CreateAlterCdcStream(TOperationId id, const TTxTransaction& tx, TOperationContext& context);
ISubOperationBase::TPtr CreateAlterCdcStreamImpl(TOperationId id, const TTxTransaction& tx);
ISubOperationBase::TPtr CreateAlterCdcStreamImpl(TOperationId id, TTxState::ETxState state);
-ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, const TTxTransaction& tx);
-ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, TTxState::ETxState state);
+ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, const TTxTransaction& tx, bool dropSnapshot);
+ISubOperationBase::TPtr CreateAlterCdcStreamAtTable(TOperationId id, TTxState::ETxState state, bool dropSnapshot);
// Drop
TVector<ISubOperationBase::TPtr> CreateDropCdcStream(TOperationId id, const TTxTransaction& tx, TOperationContext& context);
ISubOperationBase::TPtr CreateDropCdcStreamImpl(TOperationId id, const TTxTransaction& tx);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 895952147e..503bc848e7 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -1400,8 +1400,9 @@ TPathElement::EPathState TSchemeShard::CalcPathState(TTxState::ETxType txType, T
case TTxState::TxAlterColumnTable:
case TTxState::TxAlterCdcStream:
case TTxState::TxAlterCdcStreamAtTable:
+ case TTxState::TxAlterCdcStreamAtTableDropSnapshot:
case TTxState::TxCreateCdcStreamAtTable:
- case TTxState::TxCreateCdcStreamAtTableWithSnapshot:
+ case TTxState::TxCreateCdcStreamAtTableWithInitialScan:
case TTxState::TxDropCdcStreamAtTable:
case TTxState::TxAlterSequence:
case TTxState::TxAlterReplication:
@@ -2079,13 +2080,11 @@ void TSchemeShard::PersistSnapshotTable(NIceDb::TNiceDb& db, const TTxId snapsho
}
void TSchemeShard::PersistSnapshotStepId(NIceDb::TNiceDb& db, const TTxId snapshotId, const TStepId stepId) {
- db.Table<Schema::SnapshotSteps>().Key(snapshotId).Update(
- NIceDb::TUpdate<Schema::SnapshotSteps::StepId>(stepId));
+ db.Table<Schema::SnapshotSteps>().Key(snapshotId).Update(NIceDb::TUpdate<Schema::SnapshotSteps::StepId>(stepId));
}
void TSchemeShard::PersistDropSnapshot(NIceDb::TNiceDb& db, const TTxId snapshotId, const TPathId tableId) {
db.Table<Schema::SnapshotTables>().Key(snapshotId, tableId.OwnerId, tableId.LocalPathId).Delete();
-
db.Table<Schema::SnapshotSteps>().Key(snapshotId).Delete();
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h
index d7a30ef4ef..b4982edabd 100644
--- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h
+++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h
@@ -116,8 +116,9 @@ struct TTxState {
item(TxDropBlobDepot, 70) \
item(TxUpdateMainTableOnIndexMove, 71) \
item(TxAllocatePQ, 72) \
- item(TxCreateCdcStreamAtTableWithSnapshot, 73) \
+ item(TxCreateCdcStreamAtTableWithInitialScan, 73) \
item(TxAlterExtSubDomainCreateHive, 74) \
+ item(TxAlterCdcStreamAtTableDropSnapshot, 75) \
// TX_STATE_TYPE_ENUM
@@ -328,7 +329,7 @@ struct TTxState {
return true;
case TxInitializeBuildIndex: //this is more like alter
case TxCreateCdcStreamAtTable:
- case TxCreateCdcStreamAtTableWithSnapshot:
+ case TxCreateCdcStreamAtTableWithInitialScan:
return false;
case TxCreateLock: //this is more like alter
case TxDropLock: //this is more like alter
@@ -378,6 +379,7 @@ struct TTxState {
case TxAlterSolomonVolume:
case TxAlterCdcStream:
case TxAlterCdcStreamAtTable:
+ case TxAlterCdcStreamAtTableDropSnapshot:
case TxAlterSequence:
case TxAlterReplication:
case TxAlterBlobDepot:
@@ -429,7 +431,7 @@ struct TTxState {
case TxFillIndex:
case TxCreateCdcStream:
case TxCreateCdcStreamAtTable:
- case TxCreateCdcStreamAtTableWithSnapshot:
+ case TxCreateCdcStreamAtTableWithInitialScan:
case TxCreateSequence:
case TxCreateReplication:
case TxCreateBlobDepot:
@@ -464,6 +466,7 @@ struct TTxState {
case TxAlterSolomonVolume:
case TxAlterCdcStream:
case TxAlterCdcStreamAtTable:
+ case TxAlterCdcStreamAtTableDropSnapshot:
case TxAlterSequence:
case TxAlterReplication:
case TxAlterBlobDepot:
@@ -518,7 +521,7 @@ struct TTxState {
case TxCreateTableIndex:
case TxCreateCdcStream:
case TxCreateCdcStreamAtTable:
- case TxCreateCdcStreamAtTableWithSnapshot:
+ case TxCreateCdcStreamAtTableWithInitialScan:
case TxCreateSequence:
case TxCreateReplication:
case TxCreateBlobDepot:
@@ -551,6 +554,7 @@ struct TTxState {
case TxAlterSolomonVolume:
case TxAlterCdcStream:
case TxAlterCdcStreamAtTable:
+ case TxAlterCdcStreamAtTableDropSnapshot:
case TxMoveTable:
case TxMoveTableIndex:
case TxAlterSequence:
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
index 42ee88dd1f..13f0fe2668 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
@@ -173,6 +173,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
}
env.TestWaitNotification(runtime, txId);
+ const auto lockTxId = txId;
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
NLs::PathExist,
@@ -180,6 +181,71 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto),
NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateScan),
});
+
+ auto testAlterCdcStream = [&runtime](ui64 txId, const TString& parentPath, const TString& schema,
+ const TMaybe<ui64>& lockTxId, TEvSchemeShard::EStatus expectedStatus = NKikimrScheme::StatusAccepted)
+ {
+ auto request = AlterCdcStreamRequest(txId, parentPath, schema);
+ if (lockTxId) {
+ request->Record.MutableTransaction(0)->MutableLockGuard()->SetOwnerTxId(*lockTxId);
+ }
+
+ ForwardToTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor(), request);
+ TestModificationResult(runtime, txId, expectedStatus);
+ };
+
+ // without guard & lockTxId
+ testAlterCdcStream(++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamName: "Stream"
+ GetReady {
+ LockTxId: 0
+ }
+ )", {}, NKikimrScheme::StatusMultipleModifications);
+
+ // with guard, without lockTxId
+ testAlterCdcStream(++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamName: "Stream"
+ GetReady {
+ LockTxId: 0
+ }
+ )", lockTxId, NKikimrScheme::StatusMultipleModifications);
+
+ // without guard, with lockTxId
+ testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ StreamName: "Stream"
+ GetReady {
+ LockTxId: %lu
+ }
+ )", lockTxId), {}, NKikimrScheme::StatusMultipleModifications);
+
+ // with guard & lockTxId
+ testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ StreamName: "Stream"
+ GetReady {
+ LockTxId: %lu
+ }
+ )", lockTxId), lockTxId);
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ NLs::PathExist,
+ NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly),
+ NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto),
+ NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady),
+ });
+
+ // another try should fail
+ testAlterCdcStream(++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ StreamName: "Stream"
+ GetReady {
+ LockTxId: %lu
+ }
+ )", lockTxId), {}, NKikimrScheme::StatusPreconditionFailed);
}
Y_UNIT_TEST(InitialScanShouldSucceed) {
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
index e94857120b..a4ddb9beb6 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
@@ -65,7 +65,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
CreateStream({}, true);
}
- Y_UNIT_TEST(AlterStream) {
+ Y_UNIT_TEST(DisableStream) {
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
{
@@ -116,6 +116,65 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
});
}
+ Y_UNIT_TEST(GetReadyStream) {
+ TTestWithReboots t;
+ t.GetTestEnvOptions().EnableChangefeedInitialScan(true);
+
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ {
+ TInactiveZone inactive(activeZone);
+ TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ State: ECdcStreamStateScan
+ }
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ NLs::PathExist,
+ NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly),
+ NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto),
+ NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateScan),
+ });
+ }
+
+ const auto lockTxId = t.TxId;
+ auto request = AlterCdcStreamRequest(++t.TxId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ StreamName: "Stream"
+ GetReady {
+ LockTxId: %lu
+ }
+ )", lockTxId));
+ request->Record.MutableTransaction(0)->MutableLockGuard()->SetOwnerTxId(lockTxId);
+
+ t.TestEnv->ReliablePropose(runtime, request, {
+ NKikimrScheme::StatusAccepted,
+ NKikimrScheme::StatusMultipleModifications,
+ });
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ NLs::PathExist,
+ NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly),
+ NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto),
+ NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady),
+ });
+ });
+ }
+
Y_UNIT_TEST(DropStream) {
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {