diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2024-07-18 19:43:49 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-18 19:43:49 +0300 |
commit | b4dd540e5f1c0122dda9ebc6bad10074f7193bd6 (patch) | |
tree | 6226fd4f98b0c4f9f119b9c2d6a8e2ce38cebe71 | |
parent | d1fc6c4593eb353405e4c000f96d83441e04e194 (diff) | |
download | ydb-b4dd540e5f1c0122dda9ebc6bad10074f7193bd6.tar.gz |
Allow streams on index table (#6827)
19 files changed, 514 insertions, 413 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index f13628c7ae..5dd0c004fc 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -922,10 +922,6 @@ message TCreateCdcStream { optional TCdcStreamDescription StreamDescription = 2; optional uint64 RetentionPeriodSeconds = 3 [default = 86400]; // 1d by default optional uint32 TopicPartitions = 4; - oneof IndexMode { - google.protobuf.Empty AllIndexes = 5; // Create topic per each index - string IndexName = 6; - } } message TAlterCdcStream { @@ -1653,7 +1649,6 @@ message TIndexBuildControl { message TLockConfig { optional string Name = 1; - optional bool AllowIndexImplLock = 2; } message TLockGuard { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp index 4efa03b179..7b95b2b3ac 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp @@ -143,9 +143,12 @@ public: .NotDeleted() .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .NotUnderOperation(); + if (checks && !tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); return result; @@ -370,10 +373,13 @@ public: .NotDeleted() .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .NotUnderDeleting() .NotUnderOperation(); + if (checks && !tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); return result; @@ -476,10 +482,10 @@ private: } // anonymous std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks( - const TOperationId& opId, - const TPath& workingDirPath, - const TString& tableName, - const TString& streamName) + const TOperationId& opId, + const TPath& workingDirPath, + const TString& tableName, + const TString& streamName) { const auto tablePath = workingDirPath.Child(tableName); { @@ -492,9 +498,12 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks( .NotDeleted() .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .NotUnderOperation(); + if (checks && !tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); + } + if (!checks) { return CreateReject(opId, checks.GetStatus(), checks.GetError()); } @@ -521,11 +530,11 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks( } void DoAlterStream( - const NKikimrSchemeOp::TAlterCdcStream& op, - const TOperationId& opId, - const TPath& workingDirPath, - const TPath& tablePath, - TVector<ISubOperation::TPtr>& result) + TVector<ISubOperation::TPtr>& result, + const NKikimrSchemeOp::TAlterCdcStream& op, + const TOperationId& opId, + const TPath& workingDirPath, + const TPath& tablePath) { { auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl); @@ -601,7 +610,7 @@ TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTr TVector<ISubOperation::TPtr> result; - DoAlterStream(op, opId, workingDirPath, tablePath, result); + DoAlterStream(result, op, opId, workingDirPath, tablePath); if (op.HasGetReady()) { auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropLock); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.h b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.h index 198d5ae35c..6154ee05ed 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.h @@ -1,7 +1,7 @@ #pragma once -#include "schemeshard__operation_create_cdc_stream.h" // for TStreamPaths #include "schemeshard__operation_common.h" +#include "schemeshard__operation_create_cdc_stream.h" // for TStreamPaths #include "schemeshard__operation_part.h" #include "schemeshard_impl.h" @@ -17,10 +17,10 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks( const TString& streamName); void DoAlterStream( + TVector<ISubOperation::TPtr>& result, const NKikimrSchemeOp::TAlterCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, - const TPath& tablePath, - TVector<ISubOperation::TPtr>& result); + const TPath& tablePath); } // namespace NKikimr::NSchemesShard::NCdc diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp index 4c7d2f282c..69883a91a2 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp @@ -1,9 +1,8 @@ -#include "schemeshard__operation_part.h" +#include "schemeshard__operation_alter_cdc_stream.h" #include "schemeshard__operation_common.h" +#include "schemeshard__operation_part.h" #include "schemeshard_impl.h" -#include "schemeshard__operation_alter_cdc_stream.h" - #include <ydb/core/tx/schemeshard/backup/constants.h> #include <ydb/core/engine/mkql_proto.h> @@ -111,7 +110,7 @@ TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, cons TVector<ISubOperation::TPtr> result; - NCdc::DoAlterStream(alterCdcStreamOp, opId, workingDirPath, tablePath, result); + NCdc::DoAlterStream(result, alterCdcStreamOp, opId, workingDirPath, tablePath); if (cbOp.GetActionCase() == NKikimrSchemeOp::TAlterContinuousBackup::kTakeIncrementalBackup) { DoCreateIncBackupTable(opId, backupTablePath, schema, result); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index a615d6b68a..8ec0165f8b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -105,38 +105,22 @@ class TNewCdcStream: public TSubOperation { } } - TString BuildWorkingDir() const { - if (Transaction.GetCreateCdcStream().HasIndexName()) { - return Transaction.GetWorkingDir() + "/" - + Transaction.GetCreateCdcStream().GetIndexName() + "/indexImplTable"; - } else { - return Transaction.GetWorkingDir(); - } - } - public: using TSubOperation::TSubOperation; THolder<TProposeResponse> Propose(const TString& owner, TOperationContext& context) override { + const auto& workingDir = Transaction.GetWorkingDir(); const auto& op = Transaction.GetCreateCdcStream(); const auto& streamDesc = op.GetStreamDescription(); const auto& streamName = streamDesc.GetName(); const auto acceptExisted = !Transaction.GetFailOnExist(); - auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); - - if (op.HasAllIndexes()) { - result->SetError(NKikimrScheme::StatusInvalidParameter, - "Illigal part operation with all indexes flag"); - return result; - } - - const auto& workingDir = BuildWorkingDir(); - LOG_N("TNewCdcStream Propose" << ": opId# " << OperationId << ", stream# " << workingDir << "/" << streamName); + auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); + const auto tablePath = TPath::Resolve(workingDir, context.SS); { const auto checks = tablePath.Check(); @@ -146,15 +130,12 @@ public: .IsAtLocalSchemeShard() .IsResolved() .NotDeleted() + .IsTable() .NotAsyncReplicaTable() .NotUnderDeleting(); - if (op.HasIndexName() && op.GetIndexName()) { - checks.IsInsideTableIndexPath(); - } else { - checks - .IsTable() - .IsCommonSensePath(); + if (checks && !tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); } if (!checks) { @@ -529,35 +510,17 @@ public: } THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override { - auto workingDir = Transaction.GetWorkingDir(); + const auto& workingDir = Transaction.GetWorkingDir(); const auto& op = Transaction.GetCreateCdcStream(); - auto tableName = op.GetTableName(); + const auto& tableName = op.GetTableName(); const auto& streamName = op.GetStreamDescription().GetName(); - auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); - bool isIndexTable = false; - - if (op.HasAllIndexes()) { - result->SetError(NKikimrScheme::StatusInvalidParameter, - "Illigal part operation with all indexes flag"); - return result; - } - - if (op.HasIndexName()) { - if (!op.GetIndexName()) { - result->SetError(NKikimrScheme::StatusInvalidParameter, - "Unexpected empty index name"); - return result; - } - isIndexTable = true; - workingDir += ("/" + tableName + "/" + op.GetIndexName()); - tableName = "indexImplTable"; - } - LOG_N("TNewCdcStreamAtTable Propose" << ": opId# " << OperationId << ", stream# " << workingDir << "/" << tableName << "/" << streamName); + auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); + const auto workingDirPath = TPath::Resolve(workingDir, context.SS); { const auto checks = workingDirPath.Check(); @@ -569,9 +532,7 @@ public: .IsLikeDirectory() .NotUnderDeleting(); - if (isIndexTable) { - checks.IsInsideTableIndexPath(); - } else { + if (checks && !workingDirPath.IsTableIndex()) { checks.IsCommonSensePath(); } @@ -595,7 +556,7 @@ public: .NotUnderDeleting(); if (checks) { - if (!isIndexTable) { + if (!tablePath.IsInsideTableIndexPath()) { checks.IsCommonSensePath(); } if (InitialScan) { @@ -679,27 +640,34 @@ public: private: const bool InitialScan; + }; // TNewCdcStreamAtTable -void DoCreateLock(const TOperationId opId, const TPath& workingDirPath, const TPath& tablePath, bool allowIndexImplLock, - TVector<ISubOperation::TPtr>& result) +void DoCreateLock( + TVector<ISubOperation::TPtr>& result, + const TOperationId opId, + const TPath& workingDirPath, + const TPath& tablePath) { - auto outTx = TransactionTemplate(workingDirPath.PathString(), - NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock); + auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock); outTx.SetFailOnExist(false); outTx.SetInternal(true); - auto cfg = outTx.MutableLockConfig(); - cfg->SetName(tablePath.LeafName()); - cfg->SetAllowIndexImplLock(allowIndexImplLock); + outTx.MutableLockConfig()->SetName(tablePath.LeafName()); result.push_back(CreateLock(NextPartId(opId, result), outTx)); } } // anonymous -void DoCreatePqPart(const TOperationId& opId, const TPath& streamPath, const TString& streamName, - const TIntrusivePtr<TTableInfo> table, const NKikimrSchemeOp::TCreateCdcStream& op, - const TVector<TString>& boundaries, const bool acceptExisted, TVector<ISubOperation::TPtr>& result) +void DoCreatePqPart( + TVector<ISubOperation::TPtr>& result, + const NKikimrSchemeOp::TCreateCdcStream& op, + const TOperationId& opId, + const TPath& streamPath, + const TString& streamName, + TTableInfo::TCPtr table, + const TVector<TString>& boundaries, + const bool acceptExisted) { auto outTx = TransactionTemplate(streamPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup); outTx.SetFailOnExist(!acceptExisted); @@ -752,34 +720,37 @@ void DoCreatePqPart(const TOperationId& opId, const TPath& streamPath, const TSt result.push_back(CreateNewPQ(NextPartId(opId, result), outTx)); } -void FillModifySchemaForCdc(NKikimrSchemeOp::TModifyScheme& outTx, const NKikimrSchemeOp::TCreateCdcStream& op, - const TOperationId& opId, const TString& indexName, bool acceptExisted, bool initialScan) +static void FillModifySchemaForCdc( + NKikimrSchemeOp::TModifyScheme& outTx, + const NKikimrSchemeOp::TCreateCdcStream& op, + const TOperationId& opId, + bool acceptExisted, + bool initialScan) { outTx.SetFailOnExist(!acceptExisted); outTx.MutableCreateCdcStream()->CopyFrom(op); - if (indexName) { - outTx.MutableCreateCdcStream()->SetIndexName(indexName); - } else { - outTx.MutableCreateCdcStream()->ClearIndexMode(); - } - if (initialScan) { outTx.MutableLockGuard()->SetOwnerTxId(ui64(opId.GetTxId())); } } -void DoCreateStream(const NKikimrSchemeOp::TCreateCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath, - const bool acceptExisted, const bool initialScan, const TString& indexName, TVector<ISubOperation::TPtr>& result) +void DoCreateStream( + TVector<ISubOperation::TPtr>& result, + const NKikimrSchemeOp::TCreateCdcStream& op, + const TOperationId& opId, + const TPath& workingDirPath, + const TPath& tablePath, + const bool acceptExisted, + const bool initialScan) { { auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl); - FillModifySchemaForCdc(outTx, op, opId, indexName, acceptExisted, initialScan); + FillModifySchemaForCdc(outTx, op, opId, acceptExisted, initialScan); result.push_back(CreateNewCdcStreamImpl(NextPartId(opId, result), outTx)); } - { auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable); - FillModifySchemaForCdc(outTx, op, opId, indexName, acceptExisted, initialScan); + FillModifySchemaForCdc(outTx, op, opId, acceptExisted, initialScan); result.push_back(CreateNewCdcStreamAtTable(NextPartId(opId, result), outTx, initialScan)); } } @@ -826,10 +797,13 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat .NotDeleted() .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .NotUnderDeleting() .NotUnderOperation(); + if (checks && !tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); + } + if (!checks) { return CreateReject(opId, checks.GetStatus(), checks.GetError()); } @@ -837,19 +811,7 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat return nullptr; } -void CalcBoundaries(const TTableInfo& table, TVector<TString>& boundaries) { - const auto& partitions = table.GetPartitions(); - boundaries.reserve(partitions.size() - 1); - - for (ui32 i = 0; i < partitions.size(); ++i) { - const auto& partition = partitions.at(i); - if (i != partitions.size() - 1) { - boundaries.push_back(partition.EndOfRange); - } - } -} - -bool FillBoundaries(const TTableInfo& table, const ::NKikimrSchemeOp::TCreateCdcStream& op, TVector<TString>& boundaries, TString& errStr) { +bool FillBoundaries(const TTableInfo& table, const NKikimrSchemeOp::TCreateCdcStream& op, TVector<TString>& boundaries, TString& errStr) { if (op.HasTopicPartitions()) { const auto& keyColumns = table.KeyColumnIds; const auto& columns = table.Columns; @@ -862,8 +824,17 @@ bool FillBoundaries(const TTableInfo& table, const ::NKikimrSchemeOp::TCreateCdc return false; } } else { - CalcBoundaries(table, boundaries); + const auto& partitions = table.GetPartitions(); + boundaries.reserve(partitions.size() - 1); + + for (ui32 i = 0; i < partitions.size(); ++i) { + const auto& partition = partitions.at(i); + if (i != partitions.size() - 1) { + boundaries.push_back(partition.EndOfRange); + } + } } + return true; } @@ -921,7 +892,6 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran const auto& tableName = op.GetTableName(); const auto& streamDesc = op.GetStreamDescription(); const auto& streamName = streamDesc.GetName(); - const auto workingDirPath = TPath::Resolve(tx.GetWorkingDir(), context.SS); const auto checksResult = DoNewStreamPathChecks(opId, workingDirPath, tableName, streamName, acceptExisted); @@ -971,76 +941,27 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran << "Initial scan is not supported yet")}; } - if (op.HasTopicPartitions()) { - if (op.GetTopicPartitions() <= 0) { - return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, "Topic partitions count must be greater than 0")}; - } - } - - std::vector<TString> candidates; - - if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kAllIndexes) { - candidates.reserve(tablePath->GetChildren().size()); - for (const auto& child : tablePath->GetChildren()) { - candidates.emplace_back(child.first); - } - } else if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kIndexName) { - auto it = tablePath->GetChildren().find(op.GetIndexName()); - if (it == tablePath->GetChildren().end()) { - return {CreateReject(opId, NKikimrScheme::StatusSchemeError, - "requested particular path hasn't been found")}; - } - candidates.emplace_back(it->first); + if (op.HasTopicPartitions() && op.GetTopicPartitions() <= 0) { + return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, "Topic partitions count must be greater than 0")}; } TVector<ISubOperation::TPtr> result; - for (const auto& name : candidates) { - const TPath indexPath = tablePath.Child(name); - if (!indexPath.IsTableIndex() || indexPath.IsDeleted()) { - continue; - } - - const TPath indexImplPath = indexPath.Child("indexImplTable"); - if (!indexImplPath) { - return {CreateReject(opId, NKikimrScheme::StatusSchemeError, - "indexImplTable hasn't been found")}; - } - - Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); - auto indexImplTable = context.SS->Tables.at(indexImplPath.Base()->PathId); - - const TPath indexStreamPath = indexImplPath.Child(streamName); - if (auto reject = RejectOnCdcChecks(opId, indexStreamPath, acceptExisted)) { - return {reject}; - } - - if (initialScan) { - DoCreateLock(opId, indexPath, indexImplPath, true, result); - } - - TVector<TString> boundaries; - if (!FillBoundaries(*indexImplTable, op, boundaries, errStr)) { - return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; - } - - DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, name, result); - DoCreatePqPart(opId, indexStreamPath, streamName, indexImplTable, op, boundaries, acceptExisted, result); - } - if (initialScan) { - DoCreateLock(opId, workingDirPath, tablePath, false, result); + DoCreateLock(result, opId, workingDirPath, tablePath); } Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); auto table = context.SS->Tables.at(tablePath.Base()->PathId); + TVector<TString> boundaries; if (!FillBoundaries(*table, op, boundaries, errStr)) { return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; } - DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, {}, result); - DoCreatePqPart(opId, streamPath, streamName, table, op, boundaries, acceptExisted, result); + DoCreateStream(result, op, opId, workingDirPath, tablePath, acceptExisted, initialScan); + DoCreatePqPart(result, op, opId, streamPath, streamName, table, boundaries, acceptExisted); + return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h index 11a921d841..635e57a28b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h @@ -1,7 +1,7 @@ #pragma once -#include "schemeshard__operation_part.h" #include "schemeshard__operation_common.h" +#include "schemeshard__operation_part.h" #include "schemeshard_impl.h" #include <ydb/core/engine/mkql_proto.h> @@ -22,23 +22,22 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoNewStreamPathChecks( bool acceptExisted); void DoCreateStream( + TVector<ISubOperation::TPtr>& result, const NKikimrSchemeOp::TCreateCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath, const bool acceptExisted, - const bool initialScan, - const TString& indexName, - TVector<ISubOperation::TPtr>& result); + const bool initialScan); void DoCreatePqPart( + TVector<ISubOperation::TPtr>& result, + const NKikimrSchemeOp::TCreateCdcStream& op, const TOperationId& opId, const TPath& streamPath, const TString& streamName, - const TIntrusivePtr<TTableInfo> table, - const NKikimrSchemeOp::TCreateCdcStream& op, + TTableInfo::TCPtr table, const TVector<TString>& boundaries, - const bool acceptExisted, - TVector<ISubOperation::TPtr>& result); + const bool acceptExisted); } // namespace NKikimr::NSchemesShard::NCdc diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp index 6bb2803161..ab2c187d24 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp @@ -68,8 +68,8 @@ TVector<ISubOperation::TPtr> CreateNewContinuousBackup(TOperationId opId, const TVector<ISubOperation::TPtr> result; - NCdc::DoCreateStream(createCdcStreamOp, opId, workingDirPath, tablePath, acceptExisted, false, {}, result); - NCdc::DoCreatePqPart(opId, streamPath, NBackup::CB_CDC_STREAM_NAME, table, createCdcStreamOp, boundaries, acceptExisted, result); + NCdc::DoCreateStream(result, createCdcStreamOp, opId, workingDirPath, tablePath, acceptExisted, false); + NCdc::DoCreatePqPart(result, createCdcStreamOp, opId, streamPath, NBackup::CB_CDC_STREAM_NAME, table, boundaries, acceptExisted); return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp index bd7ad54009..c119f253f4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp @@ -127,9 +127,7 @@ public: .IsLikeDirectory() .FailOnRestrictedCreateInTempZone(); - if (op.GetAllowIndexImplLock()) { - checks.IsInsideTableIndexPath(); - } else { + if (checks && !parentPath.IsTableIndex()) { checks.IsCommonSensePath(); } @@ -151,7 +149,7 @@ public: .IsTable() .NotAsyncReplicaTable(); - if (!op.GetAllowIndexImplLock()) { + if (checks && !parentPath.IsTableIndex()) { checks.IsCommonSensePath(); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp index 07d9bd17e0..fac3fe3443 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp @@ -150,10 +150,13 @@ public: .NotDeleted() .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .IsUnderOperation() .IsUnderTheSameOperation(OperationId.GetTxId()); + if (checks && !tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); return result; @@ -332,10 +335,13 @@ public: .NotDeleted() .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .NotUnderDeleting() .NotUnderOperation(); + if (checks && !tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); return result; @@ -438,10 +444,10 @@ private: } // anonymous std::variant<TStreamPaths, ISubOperation::TPtr> DoDropStreamPathChecks( - const TOperationId& opId, - const TPath& workingDirPath, - const TString& tableName, - const TString& streamName) + const TOperationId& opId, + const TPath& workingDirPath, + const TString& tableName, + const TString& streamName) { const auto tablePath = workingDirPath.Child(tableName); { @@ -454,10 +460,13 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoDropStreamPathChecks( .NotDeleted() .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .NotUnderDeleting() .NotUnderOperation(); + if (checks && !tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); + } + if (!checks) { return CreateReject(opId, checks.GetStatus(), checks.GetError()); } @@ -485,10 +494,11 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoDropStreamPathChecks( } ISubOperation::TPtr DoDropStreamChecks( - const TOperationId& opId, - const TPath& tablePath, - const TTxId lockTxId, - TOperationContext& context) { + const TOperationId& opId, + const TPath& tablePath, + const TTxId lockTxId, + TOperationContext& context) +{ TString errStr; if (!context.SS->CheckLocks(tablePath.Base()->PathId, lockTxId, errStr)) { @@ -499,14 +509,14 @@ ISubOperation::TPtr DoDropStreamChecks( } void DoDropStream( - const NKikimrSchemeOp::TDropCdcStream& op, - const TOperationId& opId, - const TPath& workingDirPath, - const TPath& tablePath, - const TPath& streamPath, - const TTxId lockTxId, - TOperationContext& context, - TVector<ISubOperation::TPtr>& result) + TVector<ISubOperation::TPtr>& result, + const NKikimrSchemeOp::TDropCdcStream& op, + const TOperationId& opId, + const TPath& workingDirPath, + const TPath& tablePath, + const TPath& streamPath, + const TTxId lockTxId, + TOperationContext& context) { { auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamAtTable); @@ -615,7 +625,7 @@ TVector<ISubOperation::TPtr> CreateDropCdcStream(TOperationId opId, const TTxTra TVector<ISubOperation::TPtr> result; - DoDropStream(op, opId, workingDirPath, tablePath, streamPath, lockTxId, context, result); + DoDropStream(result, op, opId, workingDirPath, tablePath, streamPath, lockTxId, context); return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.h b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.h index ec4720da71..12be710268 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.h @@ -1,8 +1,8 @@ #pragma once +#include "schemeshard__operation_common.h" #include "schemeshard__operation_create_cdc_stream.h" // for TStreamPaths #include "schemeshard__operation_part.h" -#include "schemeshard__operation_common.h" #include "schemeshard_impl.h" #include <ydb/core/engine/mkql_proto.h> @@ -23,13 +23,13 @@ ISubOperation::TPtr DoDropStreamChecks( TOperationContext& context); void DoDropStream( + TVector<ISubOperation::TPtr>& result, const NKikimrSchemeOp::TDropCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath, const TPath& streamPath, const TTxId lockTxId, - TOperationContext& context, - TVector<ISubOperation::TPtr>& result); + TOperationContext& context); } // namespace NKikimr::NSchemesShard::NCdc diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_continuous_backup.cpp index 9e7ec8ac7e..e0e882b84d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_continuous_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_continuous_backup.cpp @@ -1,8 +1,7 @@ -#include "schemeshard__operation_part.h" #include "schemeshard__operation_common.h" -#include "schemeshard_impl.h" - #include "schemeshard__operation_drop_cdc_stream.h" +#include "schemeshard__operation_part.h" +#include "schemeshard_impl.h" #include <ydb/core/tx/schemeshard/backup/constants.h> @@ -40,7 +39,7 @@ TVector<ISubOperation::TPtr> CreateDropContinuousBackup(TOperationId opId, const TVector<ISubOperation::TPtr> result; - NCdc::DoDropStream(dropCdcStreamOp, opId, workingDirPath, tablePath, streamPath, InvalidTxId, context, result); + NCdc::DoDropStream(result, dropCdcStreamOp, opId, workingDirPath, tablePath, streamPath, InvalidTxId, context); return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp index fb268a8b06..a0d6ff3551 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp @@ -473,21 +473,22 @@ TVector<ISubOperation::TPtr> CreateDropIndex(TOperationId nextId, const TTxTrans result.push_back(CreateDropTableIndex(NextPartId(nextId, result), indexDropping)); } - for (const auto& items: indexPath.Base()->GetChildren()) { - Y_ABORT_UNLESS(context.SS->PathsById.contains(items.second)); - auto implPath = context.SS->PathsById.at(items.second); - if (implPath->Dropped()) { + for (const auto& [childName, childPathId] : indexPath.Base()->GetChildren()) { + TPath child = indexPath.Child(childName); + if (child.IsDeleted()) { continue; } - auto implTable = context.SS->PathsById.at(items.second); - Y_ABORT_UNLESS(implTable->IsTable()); + Y_ABORT_UNLESS(child.Base()->IsTable()); auto implTableDropping = TransactionTemplate(indexPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTable); auto operation = implTableDropping.MutableDrop(); - operation->SetName(items.first); + operation->SetName(child.LeafName()); result.push_back(CreateDropTable(NextPartId(nextId, result), implTableDropping)); + if (auto reject = CascadeDropTableChildren(result, nextId, child)) { + return {reject}; + } } return result; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp index 51bf6f69a0..c0ef94bbda 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp @@ -443,104 +443,8 @@ TVector<ISubOperation::TPtr> CreateDropIndexedTable(TOperationId nextId, const T TVector<ISubOperation::TPtr> result; result.push_back(CreateDropTable(NextPartId(nextId, result), tx)); - - for (const auto& [childName, childPathId] : table.Base()->GetChildren()) { - TPath child = table.Child(childName); - { - TPath::TChecker checks = child.Check(); - checks - .NotEmpty() - .IsResolved(); - - if (checks) { - if (child.IsDeleted()) { - continue; - } - } - - if (child.IsTableIndex()) { - checks.IsTableIndex(); - } else if (child.IsCdcStream()) { - checks.IsCdcStream(); - } else if (child.IsSequence()) { - checks.IsSequence(); - } - - checks.NotDeleted() - .NotUnderDeleting() - .NotUnderOperation(); - - if (!checks) { - return {CreateReject(nextId, checks.GetStatus(), checks.GetError())}; - } - } - Y_ABORT_UNLESS(child.Base()->PathId == childPathId); - - if (child.IsSequence()) { - auto dropSequence = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropSequence); - dropSequence.MutableDrop()->SetName(ToString(child->Name)); - - result.push_back(CreateDropSequence(NextPartId(nextId, result), dropSequence)); - continue; - } else if (child.IsTableIndex()) { - auto dropIndex = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTableIndex); - dropIndex.MutableDrop()->SetName(ToString(child.Base()->Name)); - - result.push_back(CreateDropTableIndex(NextPartId(nextId, result), dropIndex)); - } else if (child.IsCdcStream()) { - auto dropStream = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamImpl); - dropStream.MutableDrop()->SetName(ToString(child.Base()->Name)); - - result.push_back(CreateDropCdcStreamImpl(NextPartId(nextId, result), dropStream)); - } - - for (auto& [implName, implPathId] : child.Base()->GetChildren()) { - Y_ABORT_UNLESS(implName == "indexImplTable" - || implName == "streamImpl" - || implName == NTableIndex::NTableVectorKmeansTreeIndex::LevelTable - || implName == NTableIndex::NTableVectorKmeansTreeIndex::PostingTable - , "unexpected name %s", implName.c_str()); - - TPath implPath = child.Child(implName); - { - TPath::TChecker checks = implPath.Check(); - checks - .NotEmpty() - .IsResolved() - .NotDeleted() - .NotUnderDeleting() - .NotUnderOperation(); - - if (checks) { - if (implPath.Base()->IsTable()) { - checks - .IsTable() - .IsInsideTableIndexPath(); - } else if (implPath.Base()->IsPQGroup()) { - checks - .IsPQGroup() - .IsInsideCdcStreamPath(); - } - } - - if (!checks) { - return {CreateReject(nextId, checks.GetStatus(), checks.GetError())}; - } - } - Y_ABORT_UNLESS(implPath.Base()->PathId == implPathId); - - if (implPath.Base()->IsTable()) { - auto dropIndexTable = TransactionTemplate(child.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTable); - dropIndexTable.MutableDrop()->SetName(ToString(implPath.Base()->Name)); - - result.push_back(CreateDropTable(NextPartId(nextId, result), dropIndexTable)); - } else if (implPath.Base()->IsPQGroup()) { - auto dropPQGroup = TransactionTemplate(child.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropPersQueueGroup); - dropPQGroup.MutableDrop()->SetName(ToString(implPath.Base()->Name)); - - result.push_back(CreateDropPQ(NextPartId(nextId, result), dropPQGroup)); - } - } + if (auto reject = CascadeDropTableChildren(result, nextId, table)) { + return {reject}; } return result; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp index c5026592a0..8dc9cf01b5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp @@ -119,9 +119,12 @@ public: .IsResolved() .NotDeleted() .NotUnderDeleting() - .IsCommonSensePath() .IsLikeDirectory(); + if (checks && !parentPath.IsTableIndex()) { + checks.IsCommonSensePath(); + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); return result; @@ -134,10 +137,12 @@ public: checks .IsAtLocalSchemeShard() .IsResolved() - .NotUnderDeleting() - .IsCommonSensePath(); + .NotUnderDeleting(); if (checks) { + if (!parentPath.IsTableIndex()) { + checks.IsCommonSensePath(); + } if (dstPath.IsUnderOperation()) { // may be part of a consistent operation checks.IsUnderTheSameOperation(OperationId.GetTxId()); } else { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp index e06c7c1cca..8e8eccc1d3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp @@ -1,5 +1,6 @@ #include "schemeshard__operation_part.h" #include "schemeshard_impl.h" +#include "schemeshard_path.h" namespace NKikimr::NSchemeShard { @@ -99,4 +100,110 @@ void TSubOperationState::IgnoreMessages(TString debugHint, TSet<ui32> mgsIds) { MsgToIgnore.swap(mgsIds); } +ISubOperation::TPtr CascadeDropTableChildren(TVector<ISubOperation::TPtr>& result, const TOperationId& id, const TPath& table) { + for (const auto& [childName, childPathId] : table.Base()->GetChildren()) { + TPath child = table.Child(childName); + { + TPath::TChecker checks = child.Check(); + checks + .NotEmpty() + .IsResolved(); + + if (checks) { + if (child.IsDeleted()) { + continue; + } + } + + if (child.IsTableIndex()) { + checks.IsTableIndex(); + } else if (child.IsCdcStream()) { + checks.IsCdcStream(); + } else if (child.IsSequence()) { + checks.IsSequence(); + } + + checks.NotDeleted() + .NotUnderDeleting() + .NotUnderOperation(); + + if (!checks) { + return CreateReject(id, checks.GetStatus(), checks.GetError()); + } + } + Y_ABORT_UNLESS(child.Base()->PathId == childPathId); + + if (child.IsSequence()) { + auto dropSequence = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropSequence); + dropSequence.MutableDrop()->SetName(ToString(child->Name)); + + result.push_back(CreateDropSequence(NextPartId(id, result), dropSequence)); + continue; + } else if (child.IsTableIndex()) { + auto dropIndex = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTableIndex); + dropIndex.MutableDrop()->SetName(ToString(child.Base()->Name)); + + result.push_back(CreateDropTableIndex(NextPartId(id, result), dropIndex)); + } else if (child.IsCdcStream()) { + auto dropStream = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamImpl); + dropStream.MutableDrop()->SetName(ToString(child.Base()->Name)); + + result.push_back(CreateDropCdcStreamImpl(NextPartId(id, result), dropStream)); + } + + for (auto& [implName, implPathId] : child.Base()->GetChildren()) { + Y_ABORT_UNLESS(implName == "indexImplTable" + || implName == "streamImpl" + || implName == NTableIndex::NTableVectorKmeansTreeIndex::LevelTable + || implName == NTableIndex::NTableVectorKmeansTreeIndex::PostingTable + , "unexpected name %s", implName.c_str()); + + TPath implPath = child.Child(implName); + { + TPath::TChecker checks = implPath.Check(); + checks + .NotEmpty() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .NotUnderOperation(); + + if (checks) { + if (implPath.Base()->IsTable()) { + checks + .IsTable() + .IsInsideTableIndexPath(); + } else if (implPath.Base()->IsPQGroup()) { + checks + .IsPQGroup() + .IsInsideCdcStreamPath(); + } + } + + if (!checks) { + return CreateReject(id, checks.GetStatus(), checks.GetError()); + } + } + Y_ABORT_UNLESS(implPath.Base()->PathId == implPathId); + + if (implPath.Base()->IsTable()) { + auto dropIndexTable = TransactionTemplate(child.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTable); + dropIndexTable.MutableDrop()->SetName(ToString(implPath.Base()->Name)); + + result.push_back(CreateDropTable(NextPartId(id, result), dropIndexTable)); + if (auto reject = CascadeDropTableChildren(result, id, implPath)) { + return reject; + } + } else if (implPath.Base()->IsPQGroup()) { + auto dropPQGroup = TransactionTemplate(child.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropPersQueueGroup); + dropPQGroup.MutableDrop()->SetName(ToString(implPath.Base()->Name)); + + result.push_back(CreateDropPQ(NextPartId(id, result), dropPQGroup)); + } + } + } + + return nullptr; +} + } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index 386dcb2076..12d67f768e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -84,6 +84,7 @@ namespace NKikimr { namespace NSchemeShard { class TSchemeShard; +class TPath; struct TOperationContext { public: @@ -620,5 +621,8 @@ ISubOperation::TPtr CreateAlterResourcePool(TOperationId id, TTxState::ETxState ISubOperation::TPtr CreateDropResourcePool(TOperationId id, const TTxTransaction& tx); ISubOperation::TPtr CreateDropResourcePool(TOperationId id, TTxState::ETxState state); +// returns Reject in case of error, nullptr otherwise +ISubOperation::TPtr CascadeDropTableChildren(TVector<ISubOperation::TPtr>& result, const TOperationId& id, const TPath& table); + } } diff --git a/ydb/core/tx/schemeshard/schemeshard_path.cpp b/ydb/core/tx/schemeshard/schemeshard_path.cpp index f45e6190eb..4d5d534b27 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path.cpp @@ -1596,13 +1596,6 @@ bool TPath::IsInsideCdcStreamPath() const { return false; } - ++item; - for (; item != Elements.rend(); ++item) { - if (!(*item)->IsDirectory() && !(*item)->IsSubDomainRoot()) { - return false; - } - } - return true; } diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp index a4e9f6a1b2..a2aea05c91 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp @@ -591,15 +591,6 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { } )", {NKikimrScheme::StatusNameConflict}); - TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( - TableName: "indexImplTable" - StreamDescription { - Name: "Stream" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - } - )", {NKikimrScheme::StatusNameConflict}); - TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( TableName: "Table" StreamDescription { @@ -617,29 +608,6 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { )"); env.TestWaitNotification(runtime, txId); - TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( - TableName: "Table" - StreamDescription { - Name: "StreamWithIndex" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - } - IndexName: "NotExistedIndex" - )", {NKikimrScheme::StatusSchemeError}); - - TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( - TableName: "Table" - StreamDescription { - Name: "StreamWithIndex" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - } - IndexName: "Index" - )"); - env.TestWaitNotification(runtime, txId); - - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/StreamWithIndex/streamImpl"), {NLs::PathExist}); - TestDropTable(runtime, ++txId, "/MyRoot", "Table"); env.TestWaitNotification(runtime, txId); @@ -1232,6 +1200,170 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { } } + Y_UNIT_TEST(StreamOnIndexTable) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); + ui64 txId = 100; + + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "indexed" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Index" + KeyColumnNames: ["indexed"] + } + )"); + env.TestWaitNotification(runtime, txId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/UnknownIndex", R"( + TableName: "indexImplTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )", {NKikimrScheme::StatusPathDoesNotExist}); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( + TableName: "indexImplTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), { + NLs::PathExist, + }); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream/streamImpl"), { + NLs::PathExist, + }); + + TestAlterCdcStream(runtime, ++txId, "/MyRoot/Table/UnknownIndex", R"( + TableName: "indexImplTable" + StreamName: "Stream" + Disable {} + )", {NKikimrScheme::StatusPathDoesNotExist}); + + TestAlterCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( + TableName: "indexImplTable" + StreamName: "Stream" + Disable {} + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), { + NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateDisabled), + }); + + TestDropCdcStream(runtime, ++txId, "/MyRoot/Table/UnknownIndex", R"( + TableName: "indexImplTable" + StreamName: "Stream" + )", {NKikimrScheme::StatusPathDoesNotExist}); + + TestDropCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( + TableName: "indexImplTable" + StreamName: "Stream" + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), { + NLs::PathNotExist, + }); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream/streamImpl"), { + NLs::PathNotExist, + }); + } + + Y_UNIT_TEST(DropIndexWithStream) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); + ui64 txId = 100; + + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "indexed" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Index" + KeyColumnNames: ["indexed"] + } + )"); + env.TestWaitNotification(runtime, txId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( + TableName: "indexImplTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + env.TestWaitNotification(runtime, txId); + + TestDropTableIndex(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + IndexName: "Index" + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), { + NLs::PathNotExist, + }); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream/streamImpl"), { + NLs::PathNotExist, + }); + } + + Y_UNIT_TEST(DropTableWithIndexWithStream) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); + ui64 txId = 100; + + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "indexed" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Index" + KeyColumnNames: ["indexed"] + } + )"); + env.TestWaitNotification(runtime, txId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( + TableName: "indexImplTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + env.TestWaitNotification(runtime, txId); + + TestDropTable(runtime, ++txId, "/MyRoot", "Table"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), { + NLs::PathNotExist, + }); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream/streamImpl"), { + NLs::PathNotExist, + }); + } + } // TCdcStreamTests Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) { diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp index 7444ec4dac..cd6f40640d 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp @@ -6,29 +6,9 @@ using namespace NSchemeShardUT_Private; -static const TString createTableProto = R"( - Name: "Table" - Columns { Name: "key" Type: "Uint64" } - Columns { Name: "value" Type: "Uint64" } - KeyColumnNames: ["key"] -)"; - -static const TString createTableWithIndexProto = R"( - TableDescription { - Name: "Table" - Columns { Name: "key" Type: "Uint64" } - Columns { Name: "value" Type: "Uint64" } - KeyColumnNames: ["key"] - } - IndexDescription { - Name: "SyncIndex" - KeyColumnNames: ["value"] - } -)"; - Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { template <typename T> - void CreateStream(const TMaybe<NKikimrSchemeOp::ECdcStreamState>& state = Nothing(), bool vt = false, bool tableWithIndex = false) { + void CreateStream(const TMaybe<NKikimrSchemeOp::ECdcStreamState>& state = Nothing(), bool vt = false, bool onIndex = false) { T t; t.GetTestEnvOptions().EnableChangefeedInitialScan(true); @@ -36,10 +16,26 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { { TInactiveZone inactive(activeZone); runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true; - if (tableWithIndex) { - TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", createTableWithIndexProto); + if (!onIndex) { + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); } else { - TestCreateTable(runtime, ++t.TxId, "/MyRoot", createTableProto); + TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "indexed" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Index" + KeyColumnNames: ["indexed"] + } + )"); } t.TestEnv->TestWaitNotification(runtime, t.TxId); } @@ -58,24 +54,19 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { const bool ok = google::protobuf::TextFormat::PrintToString(streamDesc, &strDesc); UNIT_ASSERT_C(ok, "protobuf serialization failed"); - TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", Sprintf(R"( - TableName: "Table" + const TString path = !onIndex ? "/MyRoot" : "/MyRoot/Table/Index"; + const TString tableName = !onIndex ? "Table": "indexImplTable"; + + TestCreateCdcStream(runtime, ++t.TxId, path, Sprintf(R"( + TableName: "%s" StreamDescription { %s } - AllIndexes {} - )", strDesc.c_str())); + )", tableName.c_str(), strDesc.c_str())); t.TestEnv->TestWaitNotification(runtime, t.TxId); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + TestDescribeResult(DescribePrivatePath(runtime, path + "/" + tableName + "/Stream"), { NLs::PathExist, NLs::StreamVirtualTimestamps(vt), }); - - if (tableWithIndex) { - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), { - NLs::PathExist, - NLs::StreamVirtualTimestamps(vt), - }); - } }); } @@ -83,15 +74,15 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { CreateStream<T>(); } - Y_UNIT_TEST_WITH_REBOOTS(CreateStreamTableWithIndex) { - CreateStream<T>(Nothing(), false, true); + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamOnIndexTable) { + CreateStream<T>({}, false, true); } Y_UNIT_TEST_WITH_REBOOTS(CreateStreamExplicitReady) { CreateStream<T>(NKikimrSchemeOp::ECdcStreamStateReady); } - Y_UNIT_TEST_WITH_REBOOTS(CreateStreamExplicitReadyTableWithIndex) { + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamOnIndexTableExplicitReady) { CreateStream<T>(NKikimrSchemeOp::ECdcStreamStateReady, false, true); } @@ -99,7 +90,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { CreateStream<T>(NKikimrSchemeOp::ECdcStreamStateScan); } - Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithInitialScanTableWithIndex) { + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamOnIndexTableWithInitialScan) { CreateStream<T>(NKikimrSchemeOp::ECdcStreamStateScan, false, true); } @@ -107,6 +98,10 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { CreateStream<T>({}, true); } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamOnIndexTableWithVirtualTimestamps) { + CreateStream<T>({}, true, true); + } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithAwsRegion) { T t; t.GetTestEnvOptions().EnableChangefeedDynamoDBStreamsFormat(true); @@ -293,21 +288,39 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { } template <typename T> - void DropStream(const TMaybe<NKikimrSchemeOp::ECdcStreamState>& state = Nothing()) { + void DropStream(const TMaybe<NKikimrSchemeOp::ECdcStreamState>& state = Nothing(), bool onIndex = false) { T t; t.GetTestEnvOptions().EnableChangefeedInitialScan(true); t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + const TString path = !onIndex ? "/MyRoot" : "/MyRoot/Table/Index"; + const TString tableName = !onIndex ? "Table": "indexImplTable"; + { TInactiveZone inactive(activeZone); runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true; - TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( - Name: "Table" - Columns { Name: "key" Type: "Uint64" } - Columns { Name: "value" Type: "Uint64" } - KeyColumnNames: ["key"] - )"); + if (!onIndex) { + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + } else { + TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "indexed" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Index" + KeyColumnNames: ["indexed"] + } + )"); + } t.TestEnv->TestWaitNotification(runtime, t.TxId); NKikimrSchemeOp::TCdcStreamDescription streamDesc; @@ -323,20 +336,20 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { const bool ok = google::protobuf::TextFormat::PrintToString(streamDesc, &strDesc); UNIT_ASSERT_C(ok, "protobuf serialization failed"); - TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", Sprintf(R"( - TableName: "Table" + TestCreateCdcStream(runtime, ++t.TxId, path, Sprintf(R"( + TableName: "%s" StreamDescription { %s } - )", strDesc.c_str())); + )", tableName.c_str(), strDesc.c_str())); t.TestEnv->TestWaitNotification(runtime, t.TxId); } - TestDropCdcStream(runtime, ++t.TxId, "/MyRoot", R"( - TableName: "Table" + TestDropCdcStream(runtime, ++t.TxId, path, Sprintf(R"( + TableName: "%s" StreamName: "Stream" - )"); + )", tableName.c_str())); t.TestEnv->TestWaitNotification(runtime, t.TxId); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {NLs::PathNotExist}); + TestDescribeResult(DescribePrivatePath(runtime, path + "/" + tableName + "/Stream"), {NLs::PathNotExist}); }); } @@ -344,14 +357,26 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { DropStream<T>(); } + Y_UNIT_TEST_WITH_REBOOTS(DropStreamOnIndexTable) { + DropStream<T>({}, true); + } + Y_UNIT_TEST_WITH_REBOOTS(DropStreamExplicitReady) { DropStream<T>(NKikimrSchemeOp::ECdcStreamStateReady); } + Y_UNIT_TEST_WITH_REBOOTS(DropStreamOnIndexTableExplicitReady) { + DropStream<T>(NKikimrSchemeOp::ECdcStreamStateReady, true); + } + Y_UNIT_TEST_WITH_REBOOTS(DropStreamCreatedWithInitialScan) { DropStream<T>(NKikimrSchemeOp::ECdcStreamStateScan); } + Y_UNIT_TEST_WITH_REBOOTS(DropStreamOnIndexTableCreatedWithInitialScan) { + DropStream<T>(NKikimrSchemeOp::ECdcStreamStateScan, true); + } + Y_UNIT_TEST_WITH_REBOOTS(CreateDropRecreate) { T t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { |