aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-07-18 19:43:49 +0300
committerGitHub <noreply@github.com>2024-07-18 19:43:49 +0300
commitb4dd540e5f1c0122dda9ebc6bad10074f7193bd6 (patch)
tree6226fd4f98b0c4f9f119b9c2d6a8e2ce38cebe71
parentd1fc6c4593eb353405e4c000f96d83441e04e194 (diff)
downloadydb-b4dd540e5f1c0122dda9ebc6bad10074f7193bd6.tar.gz
Allow streams on index table (#6827)
-rw-r--r--ydb/core/protos/flat_scheme_op.proto5
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp35
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.h6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp7
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp213
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h15
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp50
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.h6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_continuous_backup.cpp7
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp15
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp100
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp11
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_part.cpp107
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_part.h4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path.cpp7
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp196
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp133
19 files changed, 514 insertions, 413 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index f13628c7ae..5dd0c004fc 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -922,10 +922,6 @@ message TCreateCdcStream {
optional TCdcStreamDescription StreamDescription = 2;
optional uint64 RetentionPeriodSeconds = 3 [default = 86400]; // 1d by default
optional uint32 TopicPartitions = 4;
- oneof IndexMode {
- google.protobuf.Empty AllIndexes = 5; // Create topic per each index
- string IndexName = 6;
- }
}
message TAlterCdcStream {
@@ -1653,7 +1649,6 @@ message TIndexBuildControl {
message TLockConfig {
optional string Name = 1;
- optional bool AllowIndexImplLock = 2;
}
message TLockGuard {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
index 4efa03b179..7b95b2b3ac 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
@@ -143,9 +143,12 @@ public:
.NotDeleted()
.IsTable()
.NotAsyncReplicaTable()
- .IsCommonSensePath()
.NotUnderOperation();
+ if (checks && !tablePath.IsInsideTableIndexPath()) {
+ checks.IsCommonSensePath();
+ }
+
if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
return result;
@@ -370,10 +373,13 @@ public:
.NotDeleted()
.IsTable()
.NotAsyncReplicaTable()
- .IsCommonSensePath()
.NotUnderDeleting()
.NotUnderOperation();
+ if (checks && !tablePath.IsInsideTableIndexPath()) {
+ checks.IsCommonSensePath();
+ }
+
if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
return result;
@@ -476,10 +482,10 @@ private:
} // anonymous
std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks(
- const TOperationId& opId,
- const TPath& workingDirPath,
- const TString& tableName,
- const TString& streamName)
+ const TOperationId& opId,
+ const TPath& workingDirPath,
+ const TString& tableName,
+ const TString& streamName)
{
const auto tablePath = workingDirPath.Child(tableName);
{
@@ -492,9 +498,12 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks(
.NotDeleted()
.IsTable()
.NotAsyncReplicaTable()
- .IsCommonSensePath()
.NotUnderOperation();
+ if (checks && !tablePath.IsInsideTableIndexPath()) {
+ checks.IsCommonSensePath();
+ }
+
if (!checks) {
return CreateReject(opId, checks.GetStatus(), checks.GetError());
}
@@ -521,11 +530,11 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks(
}
void DoAlterStream(
- const NKikimrSchemeOp::TAlterCdcStream& op,
- const TOperationId& opId,
- const TPath& workingDirPath,
- const TPath& tablePath,
- TVector<ISubOperation::TPtr>& result)
+ TVector<ISubOperation::TPtr>& result,
+ const NKikimrSchemeOp::TAlterCdcStream& op,
+ const TOperationId& opId,
+ const TPath& workingDirPath,
+ const TPath& tablePath)
{
{
auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl);
@@ -601,7 +610,7 @@ TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTr
TVector<ISubOperation::TPtr> result;
- DoAlterStream(op, opId, workingDirPath, tablePath, result);
+ DoAlterStream(result, op, opId, workingDirPath, tablePath);
if (op.HasGetReady()) {
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropLock);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.h b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.h
index 198d5ae35c..6154ee05ed 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.h
@@ -1,7 +1,7 @@
#pragma once
-#include "schemeshard__operation_create_cdc_stream.h" // for TStreamPaths
#include "schemeshard__operation_common.h"
+#include "schemeshard__operation_create_cdc_stream.h" // for TStreamPaths
#include "schemeshard__operation_part.h"
#include "schemeshard_impl.h"
@@ -17,10 +17,10 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks(
const TString& streamName);
void DoAlterStream(
+ TVector<ISubOperation::TPtr>& result,
const NKikimrSchemeOp::TAlterCdcStream& op,
const TOperationId& opId,
const TPath& workingDirPath,
- const TPath& tablePath,
- TVector<ISubOperation::TPtr>& result);
+ const TPath& tablePath);
} // namespace NKikimr::NSchemesShard::NCdc
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp
index 4c7d2f282c..69883a91a2 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp
@@ -1,9 +1,8 @@
-#include "schemeshard__operation_part.h"
+#include "schemeshard__operation_alter_cdc_stream.h"
#include "schemeshard__operation_common.h"
+#include "schemeshard__operation_part.h"
#include "schemeshard_impl.h"
-#include "schemeshard__operation_alter_cdc_stream.h"
-
#include <ydb/core/tx/schemeshard/backup/constants.h>
#include <ydb/core/engine/mkql_proto.h>
@@ -111,7 +110,7 @@ TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, cons
TVector<ISubOperation::TPtr> result;
- NCdc::DoAlterStream(alterCdcStreamOp, opId, workingDirPath, tablePath, result);
+ NCdc::DoAlterStream(result, alterCdcStreamOp, opId, workingDirPath, tablePath);
if (cbOp.GetActionCase() == NKikimrSchemeOp::TAlterContinuousBackup::kTakeIncrementalBackup) {
DoCreateIncBackupTable(opId, backupTablePath, schema, result);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index a615d6b68a..8ec0165f8b 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -105,38 +105,22 @@ class TNewCdcStream: public TSubOperation {
}
}
- TString BuildWorkingDir() const {
- if (Transaction.GetCreateCdcStream().HasIndexName()) {
- return Transaction.GetWorkingDir() + "/"
- + Transaction.GetCreateCdcStream().GetIndexName() + "/indexImplTable";
- } else {
- return Transaction.GetWorkingDir();
- }
- }
-
public:
using TSubOperation::TSubOperation;
THolder<TProposeResponse> Propose(const TString& owner, TOperationContext& context) override {
+ const auto& workingDir = Transaction.GetWorkingDir();
const auto& op = Transaction.GetCreateCdcStream();
const auto& streamDesc = op.GetStreamDescription();
const auto& streamName = streamDesc.GetName();
const auto acceptExisted = !Transaction.GetFailOnExist();
- auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID());
-
- if (op.HasAllIndexes()) {
- result->SetError(NKikimrScheme::StatusInvalidParameter,
- "Illigal part operation with all indexes flag");
- return result;
- }
-
- const auto& workingDir = BuildWorkingDir();
-
LOG_N("TNewCdcStream Propose"
<< ": opId# " << OperationId
<< ", stream# " << workingDir << "/" << streamName);
+ auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID());
+
const auto tablePath = TPath::Resolve(workingDir, context.SS);
{
const auto checks = tablePath.Check();
@@ -146,15 +130,12 @@ public:
.IsAtLocalSchemeShard()
.IsResolved()
.NotDeleted()
+ .IsTable()
.NotAsyncReplicaTable()
.NotUnderDeleting();
- if (op.HasIndexName() && op.GetIndexName()) {
- checks.IsInsideTableIndexPath();
- } else {
- checks
- .IsTable()
- .IsCommonSensePath();
+ if (checks && !tablePath.IsInsideTableIndexPath()) {
+ checks.IsCommonSensePath();
}
if (!checks) {
@@ -529,35 +510,17 @@ public:
}
THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override {
- auto workingDir = Transaction.GetWorkingDir();
+ const auto& workingDir = Transaction.GetWorkingDir();
const auto& op = Transaction.GetCreateCdcStream();
- auto tableName = op.GetTableName();
+ const auto& tableName = op.GetTableName();
const auto& streamName = op.GetStreamDescription().GetName();
- auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID());
- bool isIndexTable = false;
-
- if (op.HasAllIndexes()) {
- result->SetError(NKikimrScheme::StatusInvalidParameter,
- "Illigal part operation with all indexes flag");
- return result;
- }
-
- if (op.HasIndexName()) {
- if (!op.GetIndexName()) {
- result->SetError(NKikimrScheme::StatusInvalidParameter,
- "Unexpected empty index name");
- return result;
- }
- isIndexTable = true;
- workingDir += ("/" + tableName + "/" + op.GetIndexName());
- tableName = "indexImplTable";
- }
-
LOG_N("TNewCdcStreamAtTable Propose"
<< ": opId# " << OperationId
<< ", stream# " << workingDir << "/" << tableName << "/" << streamName);
+ auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID());
+
const auto workingDirPath = TPath::Resolve(workingDir, context.SS);
{
const auto checks = workingDirPath.Check();
@@ -569,9 +532,7 @@ public:
.IsLikeDirectory()
.NotUnderDeleting();
- if (isIndexTable) {
- checks.IsInsideTableIndexPath();
- } else {
+ if (checks && !workingDirPath.IsTableIndex()) {
checks.IsCommonSensePath();
}
@@ -595,7 +556,7 @@ public:
.NotUnderDeleting();
if (checks) {
- if (!isIndexTable) {
+ if (!tablePath.IsInsideTableIndexPath()) {
checks.IsCommonSensePath();
}
if (InitialScan) {
@@ -679,27 +640,34 @@ public:
private:
const bool InitialScan;
+
}; // TNewCdcStreamAtTable
-void DoCreateLock(const TOperationId opId, const TPath& workingDirPath, const TPath& tablePath, bool allowIndexImplLock,
- TVector<ISubOperation::TPtr>& result)
+void DoCreateLock(
+ TVector<ISubOperation::TPtr>& result,
+ const TOperationId opId,
+ const TPath& workingDirPath,
+ const TPath& tablePath)
{
- auto outTx = TransactionTemplate(workingDirPath.PathString(),
- NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock);
+ auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock);
outTx.SetFailOnExist(false);
outTx.SetInternal(true);
- auto cfg = outTx.MutableLockConfig();
- cfg->SetName(tablePath.LeafName());
- cfg->SetAllowIndexImplLock(allowIndexImplLock);
+ outTx.MutableLockConfig()->SetName(tablePath.LeafName());
result.push_back(CreateLock(NextPartId(opId, result), outTx));
}
} // anonymous
-void DoCreatePqPart(const TOperationId& opId, const TPath& streamPath, const TString& streamName,
- const TIntrusivePtr<TTableInfo> table, const NKikimrSchemeOp::TCreateCdcStream& op,
- const TVector<TString>& boundaries, const bool acceptExisted, TVector<ISubOperation::TPtr>& result)
+void DoCreatePqPart(
+ TVector<ISubOperation::TPtr>& result,
+ const NKikimrSchemeOp::TCreateCdcStream& op,
+ const TOperationId& opId,
+ const TPath& streamPath,
+ const TString& streamName,
+ TTableInfo::TCPtr table,
+ const TVector<TString>& boundaries,
+ const bool acceptExisted)
{
auto outTx = TransactionTemplate(streamPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup);
outTx.SetFailOnExist(!acceptExisted);
@@ -752,34 +720,37 @@ void DoCreatePqPart(const TOperationId& opId, const TPath& streamPath, const TSt
result.push_back(CreateNewPQ(NextPartId(opId, result), outTx));
}
-void FillModifySchemaForCdc(NKikimrSchemeOp::TModifyScheme& outTx, const NKikimrSchemeOp::TCreateCdcStream& op,
- const TOperationId& opId, const TString& indexName, bool acceptExisted, bool initialScan)
+static void FillModifySchemaForCdc(
+ NKikimrSchemeOp::TModifyScheme& outTx,
+ const NKikimrSchemeOp::TCreateCdcStream& op,
+ const TOperationId& opId,
+ bool acceptExisted,
+ bool initialScan)
{
outTx.SetFailOnExist(!acceptExisted);
outTx.MutableCreateCdcStream()->CopyFrom(op);
- if (indexName) {
- outTx.MutableCreateCdcStream()->SetIndexName(indexName);
- } else {
- outTx.MutableCreateCdcStream()->ClearIndexMode();
- }
-
if (initialScan) {
outTx.MutableLockGuard()->SetOwnerTxId(ui64(opId.GetTxId()));
}
}
-void DoCreateStream(const NKikimrSchemeOp::TCreateCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath,
- const bool acceptExisted, const bool initialScan, const TString& indexName, TVector<ISubOperation::TPtr>& result)
+void DoCreateStream(
+ TVector<ISubOperation::TPtr>& result,
+ const NKikimrSchemeOp::TCreateCdcStream& op,
+ const TOperationId& opId,
+ const TPath& workingDirPath,
+ const TPath& tablePath,
+ const bool acceptExisted,
+ const bool initialScan)
{
{
auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl);
- FillModifySchemaForCdc(outTx, op, opId, indexName, acceptExisted, initialScan);
+ FillModifySchemaForCdc(outTx, op, opId, acceptExisted, initialScan);
result.push_back(CreateNewCdcStreamImpl(NextPartId(opId, result), outTx));
}
-
{
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable);
- FillModifySchemaForCdc(outTx, op, opId, indexName, acceptExisted, initialScan);
+ FillModifySchemaForCdc(outTx, op, opId, acceptExisted, initialScan);
result.push_back(CreateNewCdcStreamAtTable(NextPartId(opId, result), outTx, initialScan));
}
}
@@ -826,10 +797,13 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat
.NotDeleted()
.IsTable()
.NotAsyncReplicaTable()
- .IsCommonSensePath()
.NotUnderDeleting()
.NotUnderOperation();
+ if (checks && !tablePath.IsInsideTableIndexPath()) {
+ checks.IsCommonSensePath();
+ }
+
if (!checks) {
return CreateReject(opId, checks.GetStatus(), checks.GetError());
}
@@ -837,19 +811,7 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat
return nullptr;
}
-void CalcBoundaries(const TTableInfo& table, TVector<TString>& boundaries) {
- const auto& partitions = table.GetPartitions();
- boundaries.reserve(partitions.size() - 1);
-
- for (ui32 i = 0; i < partitions.size(); ++i) {
- const auto& partition = partitions.at(i);
- if (i != partitions.size() - 1) {
- boundaries.push_back(partition.EndOfRange);
- }
- }
-}
-
-bool FillBoundaries(const TTableInfo& table, const ::NKikimrSchemeOp::TCreateCdcStream& op, TVector<TString>& boundaries, TString& errStr) {
+bool FillBoundaries(const TTableInfo& table, const NKikimrSchemeOp::TCreateCdcStream& op, TVector<TString>& boundaries, TString& errStr) {
if (op.HasTopicPartitions()) {
const auto& keyColumns = table.KeyColumnIds;
const auto& columns = table.Columns;
@@ -862,8 +824,17 @@ bool FillBoundaries(const TTableInfo& table, const ::NKikimrSchemeOp::TCreateCdc
return false;
}
} else {
- CalcBoundaries(table, boundaries);
+ const auto& partitions = table.GetPartitions();
+ boundaries.reserve(partitions.size() - 1);
+
+ for (ui32 i = 0; i < partitions.size(); ++i) {
+ const auto& partition = partitions.at(i);
+ if (i != partitions.size() - 1) {
+ boundaries.push_back(partition.EndOfRange);
+ }
+ }
}
+
return true;
}
@@ -921,7 +892,6 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
const auto& tableName = op.GetTableName();
const auto& streamDesc = op.GetStreamDescription();
const auto& streamName = streamDesc.GetName();
-
const auto workingDirPath = TPath::Resolve(tx.GetWorkingDir(), context.SS);
const auto checksResult = DoNewStreamPathChecks(opId, workingDirPath, tableName, streamName, acceptExisted);
@@ -971,76 +941,27 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
<< "Initial scan is not supported yet")};
}
- if (op.HasTopicPartitions()) {
- if (op.GetTopicPartitions() <= 0) {
- return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, "Topic partitions count must be greater than 0")};
- }
- }
-
- std::vector<TString> candidates;
-
- if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kAllIndexes) {
- candidates.reserve(tablePath->GetChildren().size());
- for (const auto& child : tablePath->GetChildren()) {
- candidates.emplace_back(child.first);
- }
- } else if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kIndexName) {
- auto it = tablePath->GetChildren().find(op.GetIndexName());
- if (it == tablePath->GetChildren().end()) {
- return {CreateReject(opId, NKikimrScheme::StatusSchemeError,
- "requested particular path hasn't been found")};
- }
- candidates.emplace_back(it->first);
+ if (op.HasTopicPartitions() && op.GetTopicPartitions() <= 0) {
+ return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, "Topic partitions count must be greater than 0")};
}
TVector<ISubOperation::TPtr> result;
- for (const auto& name : candidates) {
- const TPath indexPath = tablePath.Child(name);
- if (!indexPath.IsTableIndex() || indexPath.IsDeleted()) {
- continue;
- }
-
- const TPath indexImplPath = indexPath.Child("indexImplTable");
- if (!indexImplPath) {
- return {CreateReject(opId, NKikimrScheme::StatusSchemeError,
- "indexImplTable hasn't been found")};
- }
-
- Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId));
- auto indexImplTable = context.SS->Tables.at(indexImplPath.Base()->PathId);
-
- const TPath indexStreamPath = indexImplPath.Child(streamName);
- if (auto reject = RejectOnCdcChecks(opId, indexStreamPath, acceptExisted)) {
- return {reject};
- }
-
- if (initialScan) {
- DoCreateLock(opId, indexPath, indexImplPath, true, result);
- }
-
- TVector<TString> boundaries;
- if (!FillBoundaries(*indexImplTable, op, boundaries, errStr)) {
- return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)};
- }
-
- DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, name, result);
- DoCreatePqPart(opId, indexStreamPath, streamName, indexImplTable, op, boundaries, acceptExisted, result);
- }
-
if (initialScan) {
- DoCreateLock(opId, workingDirPath, tablePath, false, result);
+ DoCreateLock(result, opId, workingDirPath, tablePath);
}
Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId));
auto table = context.SS->Tables.at(tablePath.Base()->PathId);
+
TVector<TString> boundaries;
if (!FillBoundaries(*table, op, boundaries, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)};
}
- DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, {}, result);
- DoCreatePqPart(opId, streamPath, streamName, table, op, boundaries, acceptExisted, result);
+ DoCreateStream(result, op, opId, workingDirPath, tablePath, acceptExisted, initialScan);
+ DoCreatePqPart(result, op, opId, streamPath, streamName, table, boundaries, acceptExisted);
+
return result;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h
index 11a921d841..635e57a28b 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h
@@ -1,7 +1,7 @@
#pragma once
-#include "schemeshard__operation_part.h"
#include "schemeshard__operation_common.h"
+#include "schemeshard__operation_part.h"
#include "schemeshard_impl.h"
#include <ydb/core/engine/mkql_proto.h>
@@ -22,23 +22,22 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoNewStreamPathChecks(
bool acceptExisted);
void DoCreateStream(
+ TVector<ISubOperation::TPtr>& result,
const NKikimrSchemeOp::TCreateCdcStream& op,
const TOperationId& opId,
const TPath& workingDirPath,
const TPath& tablePath,
const bool acceptExisted,
- const bool initialScan,
- const TString& indexName,
- TVector<ISubOperation::TPtr>& result);
+ const bool initialScan);
void DoCreatePqPart(
+ TVector<ISubOperation::TPtr>& result,
+ const NKikimrSchemeOp::TCreateCdcStream& op,
const TOperationId& opId,
const TPath& streamPath,
const TString& streamName,
- const TIntrusivePtr<TTableInfo> table,
- const NKikimrSchemeOp::TCreateCdcStream& op,
+ TTableInfo::TCPtr table,
const TVector<TString>& boundaries,
- const bool acceptExisted,
- TVector<ISubOperation::TPtr>& result);
+ const bool acceptExisted);
} // namespace NKikimr::NSchemesShard::NCdc
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp
index 6bb2803161..ab2c187d24 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp
@@ -68,8 +68,8 @@ TVector<ISubOperation::TPtr> CreateNewContinuousBackup(TOperationId opId, const
TVector<ISubOperation::TPtr> result;
- NCdc::DoCreateStream(createCdcStreamOp, opId, workingDirPath, tablePath, acceptExisted, false, {}, result);
- NCdc::DoCreatePqPart(opId, streamPath, NBackup::CB_CDC_STREAM_NAME, table, createCdcStreamOp, boundaries, acceptExisted, result);
+ NCdc::DoCreateStream(result, createCdcStreamOp, opId, workingDirPath, tablePath, acceptExisted, false);
+ NCdc::DoCreatePqPart(result, createCdcStreamOp, opId, streamPath, NBackup::CB_CDC_STREAM_NAME, table, boundaries, acceptExisted);
return result;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp
index bd7ad54009..c119f253f4 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp
@@ -127,9 +127,7 @@ public:
.IsLikeDirectory()
.FailOnRestrictedCreateInTempZone();
- if (op.GetAllowIndexImplLock()) {
- checks.IsInsideTableIndexPath();
- } else {
+ if (checks && !parentPath.IsTableIndex()) {
checks.IsCommonSensePath();
}
@@ -151,7 +149,7 @@ public:
.IsTable()
.NotAsyncReplicaTable();
- if (!op.GetAllowIndexImplLock()) {
+ if (checks && !parentPath.IsTableIndex()) {
checks.IsCommonSensePath();
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp
index 07d9bd17e0..fac3fe3443 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp
@@ -150,10 +150,13 @@ public:
.NotDeleted()
.IsTable()
.NotAsyncReplicaTable()
- .IsCommonSensePath()
.IsUnderOperation()
.IsUnderTheSameOperation(OperationId.GetTxId());
+ if (checks && !tablePath.IsInsideTableIndexPath()) {
+ checks.IsCommonSensePath();
+ }
+
if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
return result;
@@ -332,10 +335,13 @@ public:
.NotDeleted()
.IsTable()
.NotAsyncReplicaTable()
- .IsCommonSensePath()
.NotUnderDeleting()
.NotUnderOperation();
+ if (checks && !tablePath.IsInsideTableIndexPath()) {
+ checks.IsCommonSensePath();
+ }
+
if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
return result;
@@ -438,10 +444,10 @@ private:
} // anonymous
std::variant<TStreamPaths, ISubOperation::TPtr> DoDropStreamPathChecks(
- const TOperationId& opId,
- const TPath& workingDirPath,
- const TString& tableName,
- const TString& streamName)
+ const TOperationId& opId,
+ const TPath& workingDirPath,
+ const TString& tableName,
+ const TString& streamName)
{
const auto tablePath = workingDirPath.Child(tableName);
{
@@ -454,10 +460,13 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoDropStreamPathChecks(
.NotDeleted()
.IsTable()
.NotAsyncReplicaTable()
- .IsCommonSensePath()
.NotUnderDeleting()
.NotUnderOperation();
+ if (checks && !tablePath.IsInsideTableIndexPath()) {
+ checks.IsCommonSensePath();
+ }
+
if (!checks) {
return CreateReject(opId, checks.GetStatus(), checks.GetError());
}
@@ -485,10 +494,11 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoDropStreamPathChecks(
}
ISubOperation::TPtr DoDropStreamChecks(
- const TOperationId& opId,
- const TPath& tablePath,
- const TTxId lockTxId,
- TOperationContext& context) {
+ const TOperationId& opId,
+ const TPath& tablePath,
+ const TTxId lockTxId,
+ TOperationContext& context)
+{
TString errStr;
if (!context.SS->CheckLocks(tablePath.Base()->PathId, lockTxId, errStr)) {
@@ -499,14 +509,14 @@ ISubOperation::TPtr DoDropStreamChecks(
}
void DoDropStream(
- const NKikimrSchemeOp::TDropCdcStream& op,
- const TOperationId& opId,
- const TPath& workingDirPath,
- const TPath& tablePath,
- const TPath& streamPath,
- const TTxId lockTxId,
- TOperationContext& context,
- TVector<ISubOperation::TPtr>& result)
+ TVector<ISubOperation::TPtr>& result,
+ const NKikimrSchemeOp::TDropCdcStream& op,
+ const TOperationId& opId,
+ const TPath& workingDirPath,
+ const TPath& tablePath,
+ const TPath& streamPath,
+ const TTxId lockTxId,
+ TOperationContext& context)
{
{
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamAtTable);
@@ -615,7 +625,7 @@ TVector<ISubOperation::TPtr> CreateDropCdcStream(TOperationId opId, const TTxTra
TVector<ISubOperation::TPtr> result;
- DoDropStream(op, opId, workingDirPath, tablePath, streamPath, lockTxId, context, result);
+ DoDropStream(result, op, opId, workingDirPath, tablePath, streamPath, lockTxId, context);
return result;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.h b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.h
index ec4720da71..12be710268 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.h
@@ -1,8 +1,8 @@
#pragma once
+#include "schemeshard__operation_common.h"
#include "schemeshard__operation_create_cdc_stream.h" // for TStreamPaths
#include "schemeshard__operation_part.h"
-#include "schemeshard__operation_common.h"
#include "schemeshard_impl.h"
#include <ydb/core/engine/mkql_proto.h>
@@ -23,13 +23,13 @@ ISubOperation::TPtr DoDropStreamChecks(
TOperationContext& context);
void DoDropStream(
+ TVector<ISubOperation::TPtr>& result,
const NKikimrSchemeOp::TDropCdcStream& op,
const TOperationId& opId,
const TPath& workingDirPath,
const TPath& tablePath,
const TPath& streamPath,
const TTxId lockTxId,
- TOperationContext& context,
- TVector<ISubOperation::TPtr>& result);
+ TOperationContext& context);
} // namespace NKikimr::NSchemesShard::NCdc
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_continuous_backup.cpp
index 9e7ec8ac7e..e0e882b84d 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_continuous_backup.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_continuous_backup.cpp
@@ -1,8 +1,7 @@
-#include "schemeshard__operation_part.h"
#include "schemeshard__operation_common.h"
-#include "schemeshard_impl.h"
-
#include "schemeshard__operation_drop_cdc_stream.h"
+#include "schemeshard__operation_part.h"
+#include "schemeshard_impl.h"
#include <ydb/core/tx/schemeshard/backup/constants.h>
@@ -40,7 +39,7 @@ TVector<ISubOperation::TPtr> CreateDropContinuousBackup(TOperationId opId, const
TVector<ISubOperation::TPtr> result;
- NCdc::DoDropStream(dropCdcStreamOp, opId, workingDirPath, tablePath, streamPath, InvalidTxId, context, result);
+ NCdc::DoDropStream(result, dropCdcStreamOp, opId, workingDirPath, tablePath, streamPath, InvalidTxId, context);
return result;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp
index fb268a8b06..a0d6ff3551 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp
@@ -473,21 +473,22 @@ TVector<ISubOperation::TPtr> CreateDropIndex(TOperationId nextId, const TTxTrans
result.push_back(CreateDropTableIndex(NextPartId(nextId, result), indexDropping));
}
- for (const auto& items: indexPath.Base()->GetChildren()) {
- Y_ABORT_UNLESS(context.SS->PathsById.contains(items.second));
- auto implPath = context.SS->PathsById.at(items.second);
- if (implPath->Dropped()) {
+ for (const auto& [childName, childPathId] : indexPath.Base()->GetChildren()) {
+ TPath child = indexPath.Child(childName);
+ if (child.IsDeleted()) {
continue;
}
- auto implTable = context.SS->PathsById.at(items.second);
- Y_ABORT_UNLESS(implTable->IsTable());
+ Y_ABORT_UNLESS(child.Base()->IsTable());
auto implTableDropping = TransactionTemplate(indexPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTable);
auto operation = implTableDropping.MutableDrop();
- operation->SetName(items.first);
+ operation->SetName(child.LeafName());
result.push_back(CreateDropTable(NextPartId(nextId, result), implTableDropping));
+ if (auto reject = CascadeDropTableChildren(result, nextId, child)) {
+ return {reject};
+ }
}
return result;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp
index 51bf6f69a0..c0ef94bbda 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp
@@ -443,104 +443,8 @@ TVector<ISubOperation::TPtr> CreateDropIndexedTable(TOperationId nextId, const T
TVector<ISubOperation::TPtr> result;
result.push_back(CreateDropTable(NextPartId(nextId, result), tx));
-
- for (const auto& [childName, childPathId] : table.Base()->GetChildren()) {
- TPath child = table.Child(childName);
- {
- TPath::TChecker checks = child.Check();
- checks
- .NotEmpty()
- .IsResolved();
-
- if (checks) {
- if (child.IsDeleted()) {
- continue;
- }
- }
-
- if (child.IsTableIndex()) {
- checks.IsTableIndex();
- } else if (child.IsCdcStream()) {
- checks.IsCdcStream();
- } else if (child.IsSequence()) {
- checks.IsSequence();
- }
-
- checks.NotDeleted()
- .NotUnderDeleting()
- .NotUnderOperation();
-
- if (!checks) {
- return {CreateReject(nextId, checks.GetStatus(), checks.GetError())};
- }
- }
- Y_ABORT_UNLESS(child.Base()->PathId == childPathId);
-
- if (child.IsSequence()) {
- auto dropSequence = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropSequence);
- dropSequence.MutableDrop()->SetName(ToString(child->Name));
-
- result.push_back(CreateDropSequence(NextPartId(nextId, result), dropSequence));
- continue;
- } else if (child.IsTableIndex()) {
- auto dropIndex = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTableIndex);
- dropIndex.MutableDrop()->SetName(ToString(child.Base()->Name));
-
- result.push_back(CreateDropTableIndex(NextPartId(nextId, result), dropIndex));
- } else if (child.IsCdcStream()) {
- auto dropStream = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamImpl);
- dropStream.MutableDrop()->SetName(ToString(child.Base()->Name));
-
- result.push_back(CreateDropCdcStreamImpl(NextPartId(nextId, result), dropStream));
- }
-
- for (auto& [implName, implPathId] : child.Base()->GetChildren()) {
- Y_ABORT_UNLESS(implName == "indexImplTable"
- || implName == "streamImpl"
- || implName == NTableIndex::NTableVectorKmeansTreeIndex::LevelTable
- || implName == NTableIndex::NTableVectorKmeansTreeIndex::PostingTable
- , "unexpected name %s", implName.c_str());
-
- TPath implPath = child.Child(implName);
- {
- TPath::TChecker checks = implPath.Check();
- checks
- .NotEmpty()
- .IsResolved()
- .NotDeleted()
- .NotUnderDeleting()
- .NotUnderOperation();
-
- if (checks) {
- if (implPath.Base()->IsTable()) {
- checks
- .IsTable()
- .IsInsideTableIndexPath();
- } else if (implPath.Base()->IsPQGroup()) {
- checks
- .IsPQGroup()
- .IsInsideCdcStreamPath();
- }
- }
-
- if (!checks) {
- return {CreateReject(nextId, checks.GetStatus(), checks.GetError())};
- }
- }
- Y_ABORT_UNLESS(implPath.Base()->PathId == implPathId);
-
- if (implPath.Base()->IsTable()) {
- auto dropIndexTable = TransactionTemplate(child.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTable);
- dropIndexTable.MutableDrop()->SetName(ToString(implPath.Base()->Name));
-
- result.push_back(CreateDropTable(NextPartId(nextId, result), dropIndexTable));
- } else if (implPath.Base()->IsPQGroup()) {
- auto dropPQGroup = TransactionTemplate(child.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropPersQueueGroup);
- dropPQGroup.MutableDrop()->SetName(ToString(implPath.Base()->Name));
-
- result.push_back(CreateDropPQ(NextPartId(nextId, result), dropPQGroup));
- }
- }
+ if (auto reject = CascadeDropTableChildren(result, nextId, table)) {
+ return {reject};
}
return result;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp
index c5026592a0..8dc9cf01b5 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp
@@ -119,9 +119,12 @@ public:
.IsResolved()
.NotDeleted()
.NotUnderDeleting()
- .IsCommonSensePath()
.IsLikeDirectory();
+ if (checks && !parentPath.IsTableIndex()) {
+ checks.IsCommonSensePath();
+ }
+
if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
return result;
@@ -134,10 +137,12 @@ public:
checks
.IsAtLocalSchemeShard()
.IsResolved()
- .NotUnderDeleting()
- .IsCommonSensePath();
+ .NotUnderDeleting();
if (checks) {
+ if (!parentPath.IsTableIndex()) {
+ checks.IsCommonSensePath();
+ }
if (dstPath.IsUnderOperation()) { // may be part of a consistent operation
checks.IsUnderTheSameOperation(OperationId.GetTxId());
} else {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp
index e06c7c1cca..8e8eccc1d3 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp
@@ -1,5 +1,6 @@
#include "schemeshard__operation_part.h"
#include "schemeshard_impl.h"
+#include "schemeshard_path.h"
namespace NKikimr::NSchemeShard {
@@ -99,4 +100,110 @@ void TSubOperationState::IgnoreMessages(TString debugHint, TSet<ui32> mgsIds) {
MsgToIgnore.swap(mgsIds);
}
+ISubOperation::TPtr CascadeDropTableChildren(TVector<ISubOperation::TPtr>& result, const TOperationId& id, const TPath& table) {
+ for (const auto& [childName, childPathId] : table.Base()->GetChildren()) {
+ TPath child = table.Child(childName);
+ {
+ TPath::TChecker checks = child.Check();
+ checks
+ .NotEmpty()
+ .IsResolved();
+
+ if (checks) {
+ if (child.IsDeleted()) {
+ continue;
+ }
+ }
+
+ if (child.IsTableIndex()) {
+ checks.IsTableIndex();
+ } else if (child.IsCdcStream()) {
+ checks.IsCdcStream();
+ } else if (child.IsSequence()) {
+ checks.IsSequence();
+ }
+
+ checks.NotDeleted()
+ .NotUnderDeleting()
+ .NotUnderOperation();
+
+ if (!checks) {
+ return CreateReject(id, checks.GetStatus(), checks.GetError());
+ }
+ }
+ Y_ABORT_UNLESS(child.Base()->PathId == childPathId);
+
+ if (child.IsSequence()) {
+ auto dropSequence = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropSequence);
+ dropSequence.MutableDrop()->SetName(ToString(child->Name));
+
+ result.push_back(CreateDropSequence(NextPartId(id, result), dropSequence));
+ continue;
+ } else if (child.IsTableIndex()) {
+ auto dropIndex = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTableIndex);
+ dropIndex.MutableDrop()->SetName(ToString(child.Base()->Name));
+
+ result.push_back(CreateDropTableIndex(NextPartId(id, result), dropIndex));
+ } else if (child.IsCdcStream()) {
+ auto dropStream = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamImpl);
+ dropStream.MutableDrop()->SetName(ToString(child.Base()->Name));
+
+ result.push_back(CreateDropCdcStreamImpl(NextPartId(id, result), dropStream));
+ }
+
+ for (auto& [implName, implPathId] : child.Base()->GetChildren()) {
+ Y_ABORT_UNLESS(implName == "indexImplTable"
+ || implName == "streamImpl"
+ || implName == NTableIndex::NTableVectorKmeansTreeIndex::LevelTable
+ || implName == NTableIndex::NTableVectorKmeansTreeIndex::PostingTable
+ , "unexpected name %s", implName.c_str());
+
+ TPath implPath = child.Child(implName);
+ {
+ TPath::TChecker checks = implPath.Check();
+ checks
+ .NotEmpty()
+ .IsResolved()
+ .NotDeleted()
+ .NotUnderDeleting()
+ .NotUnderOperation();
+
+ if (checks) {
+ if (implPath.Base()->IsTable()) {
+ checks
+ .IsTable()
+ .IsInsideTableIndexPath();
+ } else if (implPath.Base()->IsPQGroup()) {
+ checks
+ .IsPQGroup()
+ .IsInsideCdcStreamPath();
+ }
+ }
+
+ if (!checks) {
+ return CreateReject(id, checks.GetStatus(), checks.GetError());
+ }
+ }
+ Y_ABORT_UNLESS(implPath.Base()->PathId == implPathId);
+
+ if (implPath.Base()->IsTable()) {
+ auto dropIndexTable = TransactionTemplate(child.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTable);
+ dropIndexTable.MutableDrop()->SetName(ToString(implPath.Base()->Name));
+
+ result.push_back(CreateDropTable(NextPartId(id, result), dropIndexTable));
+ if (auto reject = CascadeDropTableChildren(result, id, implPath)) {
+ return reject;
+ }
+ } else if (implPath.Base()->IsPQGroup()) {
+ auto dropPQGroup = TransactionTemplate(child.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropPersQueueGroup);
+ dropPQGroup.MutableDrop()->SetName(ToString(implPath.Base()->Name));
+
+ result.push_back(CreateDropPQ(NextPartId(id, result), dropPQGroup));
+ }
+ }
+ }
+
+ return nullptr;
+}
+
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h
index 386dcb2076..12d67f768e 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h
@@ -84,6 +84,7 @@ namespace NKikimr {
namespace NSchemeShard {
class TSchemeShard;
+class TPath;
struct TOperationContext {
public:
@@ -620,5 +621,8 @@ ISubOperation::TPtr CreateAlterResourcePool(TOperationId id, TTxState::ETxState
ISubOperation::TPtr CreateDropResourcePool(TOperationId id, const TTxTransaction& tx);
ISubOperation::TPtr CreateDropResourcePool(TOperationId id, TTxState::ETxState state);
+// returns Reject in case of error, nullptr otherwise
+ISubOperation::TPtr CascadeDropTableChildren(TVector<ISubOperation::TPtr>& result, const TOperationId& id, const TPath& table);
+
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_path.cpp b/ydb/core/tx/schemeshard/schemeshard_path.cpp
index f45e6190eb..4d5d534b27 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path.cpp
@@ -1596,13 +1596,6 @@ bool TPath::IsInsideCdcStreamPath() const {
return false;
}
- ++item;
- for (; item != Elements.rend(); ++item) {
- if (!(*item)->IsDirectory() && !(*item)->IsSubDomainRoot()) {
- return false;
- }
- }
-
return true;
}
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
index a4e9f6a1b2..a2aea05c91 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
@@ -591,15 +591,6 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
}
)", {NKikimrScheme::StatusNameConflict});
- TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"(
- TableName: "indexImplTable"
- StreamDescription {
- Name: "Stream"
- Mode: ECdcStreamModeKeysOnly
- Format: ECdcStreamFormatProto
- }
- )", {NKikimrScheme::StatusNameConflict});
-
TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
TableName: "Table"
StreamDescription {
@@ -617,29 +608,6 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
)");
env.TestWaitNotification(runtime, txId);
- TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
- TableName: "Table"
- StreamDescription {
- Name: "StreamWithIndex"
- Mode: ECdcStreamModeKeysOnly
- Format: ECdcStreamFormatProto
- }
- IndexName: "NotExistedIndex"
- )", {NKikimrScheme::StatusSchemeError});
-
- TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
- TableName: "Table"
- StreamDescription {
- Name: "StreamWithIndex"
- Mode: ECdcStreamModeKeysOnly
- Format: ECdcStreamFormatProto
- }
- IndexName: "Index"
- )");
- env.TestWaitNotification(runtime, txId);
-
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/StreamWithIndex/streamImpl"), {NLs::PathExist});
-
TestDropTable(runtime, ++txId, "/MyRoot", "Table");
env.TestWaitNotification(runtime, txId);
@@ -1232,6 +1200,170 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
}
}
+ Y_UNIT_TEST(StreamOnIndexTable) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
+ ui64 txId = 100;
+
+ TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
+ TableDescription {
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "indexed" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ }
+ IndexDescription {
+ Name: "Index"
+ KeyColumnNames: ["indexed"]
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/UnknownIndex", R"(
+ TableName: "indexImplTable"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ )", {NKikimrScheme::StatusPathDoesNotExist});
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"(
+ TableName: "indexImplTable"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), {
+ NLs::PathExist,
+ });
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream/streamImpl"), {
+ NLs::PathExist,
+ });
+
+ TestAlterCdcStream(runtime, ++txId, "/MyRoot/Table/UnknownIndex", R"(
+ TableName: "indexImplTable"
+ StreamName: "Stream"
+ Disable {}
+ )", {NKikimrScheme::StatusPathDoesNotExist});
+
+ TestAlterCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"(
+ TableName: "indexImplTable"
+ StreamName: "Stream"
+ Disable {}
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), {
+ NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateDisabled),
+ });
+
+ TestDropCdcStream(runtime, ++txId, "/MyRoot/Table/UnknownIndex", R"(
+ TableName: "indexImplTable"
+ StreamName: "Stream"
+ )", {NKikimrScheme::StatusPathDoesNotExist});
+
+ TestDropCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"(
+ TableName: "indexImplTable"
+ StreamName: "Stream"
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), {
+ NLs::PathNotExist,
+ });
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream/streamImpl"), {
+ NLs::PathNotExist,
+ });
+ }
+
+ Y_UNIT_TEST(DropIndexWithStream) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
+ ui64 txId = 100;
+
+ TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
+ TableDescription {
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "indexed" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ }
+ IndexDescription {
+ Name: "Index"
+ KeyColumnNames: ["indexed"]
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"(
+ TableName: "indexImplTable"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDropTableIndex(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ IndexName: "Index"
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), {
+ NLs::PathNotExist,
+ });
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream/streamImpl"), {
+ NLs::PathNotExist,
+ });
+ }
+
+ Y_UNIT_TEST(DropTableWithIndexWithStream) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
+ ui64 txId = 100;
+
+ TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
+ TableDescription {
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "indexed" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ }
+ IndexDescription {
+ Name: "Index"
+ KeyColumnNames: ["indexed"]
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"(
+ TableName: "indexImplTable"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDropTable(runtime, ++txId, "/MyRoot", "Table");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), {
+ NLs::PathNotExist,
+ });
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream/streamImpl"), {
+ NLs::PathNotExist,
+ });
+ }
+
} // TCdcStreamTests
Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) {
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
index 7444ec4dac..cd6f40640d 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
@@ -6,29 +6,9 @@
using namespace NSchemeShardUT_Private;
-static const TString createTableProto = R"(
- Name: "Table"
- Columns { Name: "key" Type: "Uint64" }
- Columns { Name: "value" Type: "Uint64" }
- KeyColumnNames: ["key"]
-)";
-
-static const TString createTableWithIndexProto = R"(
- TableDescription {
- Name: "Table"
- Columns { Name: "key" Type: "Uint64" }
- Columns { Name: "value" Type: "Uint64" }
- KeyColumnNames: ["key"]
- }
- IndexDescription {
- Name: "SyncIndex"
- KeyColumnNames: ["value"]
- }
-)";
-
Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
template <typename T>
- void CreateStream(const TMaybe<NKikimrSchemeOp::ECdcStreamState>& state = Nothing(), bool vt = false, bool tableWithIndex = false) {
+ void CreateStream(const TMaybe<NKikimrSchemeOp::ECdcStreamState>& state = Nothing(), bool vt = false, bool onIndex = false) {
T t;
t.GetTestEnvOptions().EnableChangefeedInitialScan(true);
@@ -36,10 +16,26 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
{
TInactiveZone inactive(activeZone);
runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true;
- if (tableWithIndex) {
- TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", createTableWithIndexProto);
+ if (!onIndex) {
+ TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
} else {
- TestCreateTable(runtime, ++t.TxId, "/MyRoot", createTableProto);
+ TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", R"(
+ TableDescription {
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "indexed" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ }
+ IndexDescription {
+ Name: "Index"
+ KeyColumnNames: ["indexed"]
+ }
+ )");
}
t.TestEnv->TestWaitNotification(runtime, t.TxId);
}
@@ -58,24 +54,19 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
const bool ok = google::protobuf::TextFormat::PrintToString(streamDesc, &strDesc);
UNIT_ASSERT_C(ok, "protobuf serialization failed");
- TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", Sprintf(R"(
- TableName: "Table"
+ const TString path = !onIndex ? "/MyRoot" : "/MyRoot/Table/Index";
+ const TString tableName = !onIndex ? "Table": "indexImplTable";
+
+ TestCreateCdcStream(runtime, ++t.TxId, path, Sprintf(R"(
+ TableName: "%s"
StreamDescription { %s }
- AllIndexes {}
- )", strDesc.c_str()));
+ )", tableName.c_str(), strDesc.c_str()));
t.TestEnv->TestWaitNotification(runtime, t.TxId);
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ TestDescribeResult(DescribePrivatePath(runtime, path + "/" + tableName + "/Stream"), {
NLs::PathExist,
NLs::StreamVirtualTimestamps(vt),
});
-
- if (tableWithIndex) {
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), {
- NLs::PathExist,
- NLs::StreamVirtualTimestamps(vt),
- });
- }
});
}
@@ -83,15 +74,15 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
CreateStream<T>();
}
- Y_UNIT_TEST_WITH_REBOOTS(CreateStreamTableWithIndex) {
- CreateStream<T>(Nothing(), false, true);
+ Y_UNIT_TEST_WITH_REBOOTS(CreateStreamOnIndexTable) {
+ CreateStream<T>({}, false, true);
}
Y_UNIT_TEST_WITH_REBOOTS(CreateStreamExplicitReady) {
CreateStream<T>(NKikimrSchemeOp::ECdcStreamStateReady);
}
- Y_UNIT_TEST_WITH_REBOOTS(CreateStreamExplicitReadyTableWithIndex) {
+ Y_UNIT_TEST_WITH_REBOOTS(CreateStreamOnIndexTableExplicitReady) {
CreateStream<T>(NKikimrSchemeOp::ECdcStreamStateReady, false, true);
}
@@ -99,7 +90,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
CreateStream<T>(NKikimrSchemeOp::ECdcStreamStateScan);
}
- Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithInitialScanTableWithIndex) {
+ Y_UNIT_TEST_WITH_REBOOTS(CreateStreamOnIndexTableWithInitialScan) {
CreateStream<T>(NKikimrSchemeOp::ECdcStreamStateScan, false, true);
}
@@ -107,6 +98,10 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
CreateStream<T>({}, true);
}
+ Y_UNIT_TEST_WITH_REBOOTS(CreateStreamOnIndexTableWithVirtualTimestamps) {
+ CreateStream<T>({}, true, true);
+ }
+
Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithAwsRegion) {
T t;
t.GetTestEnvOptions().EnableChangefeedDynamoDBStreamsFormat(true);
@@ -293,21 +288,39 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
}
template <typename T>
- void DropStream(const TMaybe<NKikimrSchemeOp::ECdcStreamState>& state = Nothing()) {
+ void DropStream(const TMaybe<NKikimrSchemeOp::ECdcStreamState>& state = Nothing(), bool onIndex = false) {
T t;
t.GetTestEnvOptions().EnableChangefeedInitialScan(true);
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ const TString path = !onIndex ? "/MyRoot" : "/MyRoot/Table/Index";
+ const TString tableName = !onIndex ? "Table": "indexImplTable";
+
{
TInactiveZone inactive(activeZone);
runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true;
- TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
- Name: "Table"
- Columns { Name: "key" Type: "Uint64" }
- Columns { Name: "value" Type: "Uint64" }
- KeyColumnNames: ["key"]
- )");
+ if (!onIndex) {
+ TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ } else {
+ TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", R"(
+ TableDescription {
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "indexed" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ }
+ IndexDescription {
+ Name: "Index"
+ KeyColumnNames: ["indexed"]
+ }
+ )");
+ }
t.TestEnv->TestWaitNotification(runtime, t.TxId);
NKikimrSchemeOp::TCdcStreamDescription streamDesc;
@@ -323,20 +336,20 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
const bool ok = google::protobuf::TextFormat::PrintToString(streamDesc, &strDesc);
UNIT_ASSERT_C(ok, "protobuf serialization failed");
- TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", Sprintf(R"(
- TableName: "Table"
+ TestCreateCdcStream(runtime, ++t.TxId, path, Sprintf(R"(
+ TableName: "%s"
StreamDescription { %s }
- )", strDesc.c_str()));
+ )", tableName.c_str(), strDesc.c_str()));
t.TestEnv->TestWaitNotification(runtime, t.TxId);
}
- TestDropCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
- TableName: "Table"
+ TestDropCdcStream(runtime, ++t.TxId, path, Sprintf(R"(
+ TableName: "%s"
StreamName: "Stream"
- )");
+ )", tableName.c_str()));
t.TestEnv->TestWaitNotification(runtime, t.TxId);
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {NLs::PathNotExist});
+ TestDescribeResult(DescribePrivatePath(runtime, path + "/" + tableName + "/Stream"), {NLs::PathNotExist});
});
}
@@ -344,14 +357,26 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
DropStream<T>();
}
+ Y_UNIT_TEST_WITH_REBOOTS(DropStreamOnIndexTable) {
+ DropStream<T>({}, true);
+ }
+
Y_UNIT_TEST_WITH_REBOOTS(DropStreamExplicitReady) {
DropStream<T>(NKikimrSchemeOp::ECdcStreamStateReady);
}
+ Y_UNIT_TEST_WITH_REBOOTS(DropStreamOnIndexTableExplicitReady) {
+ DropStream<T>(NKikimrSchemeOp::ECdcStreamStateReady, true);
+ }
+
Y_UNIT_TEST_WITH_REBOOTS(DropStreamCreatedWithInitialScan) {
DropStream<T>(NKikimrSchemeOp::ECdcStreamStateScan);
}
+ Y_UNIT_TEST_WITH_REBOOTS(DropStreamOnIndexTableCreatedWithInitialScan) {
+ DropStream<T>(NKikimrSchemeOp::ECdcStreamStateScan, true);
+ }
+
Y_UNIT_TEST_WITH_REBOOTS(CreateDropRecreate) {
T t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {