aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-07-18 22:27:04 +0300
committerGitHub <noreply@github.com>2024-07-18 19:27:04 +0000
commitcabd6c51842839f2d56dc5eb344fa8b46d969366 (patch)
treebc3c5405cd7c50de37f691bf036cc091e40cb1cf
parent96c9c9b02f126e535f516946f8d46a96c922109d (diff)
downloadydb-cabd6c51842839f2d56dc5eb344fa8b46d969366.tar.gz
Allow streams on sync-index table (#6842)
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path.cpp13
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path.h4
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp32
4 files changed, 43 insertions, 14 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index 8ec0165f8b..b759660e81 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -800,8 +800,12 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat
.NotUnderDeleting()
.NotUnderOperation();
- if (checks && !tablePath.IsInsideTableIndexPath()) {
- checks.IsCommonSensePath();
+ if (checks) {
+ if (!tablePath.IsInsideTableIndexPath()) {
+ checks.IsCommonSensePath();
+ } else if (!tablePath.Parent().IsTableIndex(NKikimrSchemeOp::EIndexTypeGlobal)) {
+ return CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, "Cannot add changefeed to index table");
+ }
}
if (!checks) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_path.cpp b/ydb/core/tx/schemeshard/schemeshard_path.cpp
index 4d5d534b27..0704fbcb5c 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path.cpp
@@ -1599,10 +1599,19 @@ bool TPath::IsInsideCdcStreamPath() const {
return true;
}
-bool TPath::IsTableIndex() const {
+bool TPath::IsTableIndex(const TMaybe<NKikimrSchemeOp::EIndexType>& type) const {
Y_ABORT_UNLESS(IsResolved());
- return Base()->IsTableIndex();
+ if (!Base()->IsTableIndex()) {
+ return false;
+ }
+
+ if (!type.Defined()) {
+ return true;
+ }
+
+ Y_ABORT_UNLESS(SS->Indexes.contains(Base()->PathId));
+ return SS->Indexes.at(Base()->PathId)->Type == *type;
}
bool TPath::IsBackupTable() const {
diff --git a/ydb/core/tx/schemeshard/schemeshard_path.h b/ydb/core/tx/schemeshard/schemeshard_path.h
index a1474fef71..4e6aec639d 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path.h
+++ b/ydb/core/tx/schemeshard/schemeshard_path.h
@@ -5,6 +5,8 @@
#include <ydb/core/protos/flat_tx_scheme.pb.h>
+#include <util/generic/maybe.h>
+
namespace NKikimr::NSchemeShard {
class TSchemeShard;
@@ -159,7 +161,7 @@ public:
bool AtLocalSchemeShardPath() const;
bool IsInsideTableIndexPath() const;
bool IsInsideCdcStreamPath() const;
- bool IsTableIndex() const;
+ bool IsTableIndex(const TMaybe<NKikimrSchemeOp::EIndexType>& type = {}) const;
bool IsBackupTable() const;
bool IsAsyncReplicaTable() const;
bool IsCdcStream() const;
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
index a2aea05c91..256ef3cdcf 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
@@ -1213,9 +1213,14 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
KeyColumnNames: ["key"]
}
IndexDescription {
- Name: "Index"
+ Name: "SyncIndex"
KeyColumnNames: ["indexed"]
}
+ IndexDescription {
+ Name: "AsyncIndex"
+ KeyColumnNames: ["indexed"]
+ Type: EIndexTypeGlobalAsync
+ }
)");
env.TestWaitNotification(runtime, txId);
@@ -1228,7 +1233,16 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
}
)", {NKikimrScheme::StatusPathDoesNotExist});
- TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"(
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/AsyncIndex", R"(
+ TableName: "indexImplTable"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ )", {NKikimrScheme::StatusPreconditionFailed});
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/SyncIndex", R"(
TableName: "indexImplTable"
StreamDescription {
Name: "Stream"
@@ -1238,10 +1252,10 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
)");
env.TestWaitNotification(runtime, txId);
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), {
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), {
NLs::PathExist,
});
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream/streamImpl"), {
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream/streamImpl"), {
NLs::PathExist,
});
@@ -1251,14 +1265,14 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
Disable {}
)", {NKikimrScheme::StatusPathDoesNotExist});
- TestAlterCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"(
+ TestAlterCdcStream(runtime, ++txId, "/MyRoot/Table/SyncIndex", R"(
TableName: "indexImplTable"
StreamName: "Stream"
Disable {}
)");
env.TestWaitNotification(runtime, txId);
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), {
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), {
NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateDisabled),
});
@@ -1267,16 +1281,16 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
StreamName: "Stream"
)", {NKikimrScheme::StatusPathDoesNotExist});
- TestDropCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"(
+ TestDropCdcStream(runtime, ++txId, "/MyRoot/Table/SyncIndex", R"(
TableName: "indexImplTable"
StreamName: "Stream"
)");
env.TestWaitNotification(runtime, txId);
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), {
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), {
NLs::PathNotExist,
});
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream/streamImpl"), {
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream/streamImpl"), {
NLs::PathNotExist,
});
}