aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-03-24 14:29:25 +0300
committerilnaz <ilnaz@ydb.tech>2023-03-24 14:29:25 +0300
commit5d3a16dbb82b337e09db43c42aa12445e1f678c4 (patch)
tree0257ea3c73e345b50ecab7a008f9ebeb03d923ac
parent6b09dd4323839241524b076042c4ec1db82e329b (diff)
downloadydb-5d3a16dbb82b337e09db43c42aa12445e1f678c4.tar.gz
Restrict schema operations for replicated tables
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp23
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream.cpp61
2 files changed, 71 insertions, 13 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp
index e24350ae30..b1faeb20b1 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp
@@ -527,17 +527,19 @@ public:
return result;
}
- if (path.Base()->GetAliveChildren() && alter.HasTTLSettings()) {
+ bool isReplicated = false;
+ if (path.Base()->GetAliveChildren()) {
for (const auto& [_, childPathId] : path.Base()->GetChildren()) {
Y_VERIFY(context.SS->PathsById.contains(childPathId));
auto childPath = context.SS->PathsById.at(childPathId);
- if (!childPath->IsTableIndex() || childPath->Dropped()) {
+ if (!childPath->IsCdcStream() || childPath->Dropped()) {
continue;
}
- Y_VERIFY(context.SS->Indexes.contains(childPathId));
- auto indexInfo = context.SS->Indexes.at(childPathId);
+ if (isReplicated = childPath->AsyncReplication.IsDefined()) {
+ break;
+ }
}
}
@@ -555,6 +557,19 @@ public:
return result;
}
+ if (isReplicated) {
+ for (const auto& [id, column] : alterData->Columns) {
+ if (column.CreateVersion == alterData->AlterVersion) {
+ result->SetError(NKikimrScheme::StatusPreconditionFailed, "Cannot add columns to replicated table");
+ return result;
+ }
+ if (column.DeleteVersion == alterData->AlterVersion) {
+ result->SetError(NKikimrScheme::StatusPreconditionFailed, "Cannot drop columns of replicated table");
+ return result;
+ }
+ }
+ }
+
TBindingsRoomsChanges bindingChanges;
if (context.SS->IsStorageConfigLogic(table)) {
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
index 6e92c829e3..95b5c025c6 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
@@ -156,11 +156,10 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
)");
env.TestWaitNotification(runtime, txId);
- // user attr
TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
TableName: "Table"
StreamDescription {
- Name: "Stream1"
+ Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
UserAttributes { Key: "key" Value: "value" }
@@ -182,17 +181,30 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
}
});
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream1"), {
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
NLs::UserAttrsHas({
{"key", "value"},
})
});
+ }
+
+ Y_UNIT_TEST(ReplicationAttribute) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
- // async replication attr
TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
TableName: "Table"
StreamDescription {
- Name: "Stream2"
+ Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
UserAttributes { Key: "__async_replication" Value: "value" }
@@ -207,7 +219,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"(
TableName: "Table"
StreamDescription {
- Name: "Stream2"
+ Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
UserAttributes { Key: "__async_replication" Value: "%s" }
@@ -215,11 +227,42 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
)", EscapeC(jsonString).c_str()));
env.TestWaitNotification(runtime, txId);
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream2"), {
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
NLs::UserAttrsHas({
{"__async_replication", jsonString},
})
});
+
+ // now it is forbidden to change the scheme
+ TestAlterTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "extra" Type: "Uint64" }
+ )", {NKikimrScheme::StatusPreconditionFailed});
+
+ TestAlterTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ DropColumns { Name: "value" }
+ )", {NKikimrScheme::StatusPreconditionFailed});
+
+ // drop stream
+ TestDropCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamName: "Stream"
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ // now it is allowed to change the scheme
+ TestAlterTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "extra" Type: "Uint64" }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestAlterTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ DropColumns { Name: "value" }
+ )");
+ env.TestWaitNotification(runtime, txId);
}
Y_UNIT_TEST(Negative) {
@@ -1134,7 +1177,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) {
// the table is locked now
TestAlterTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
- Columns { Name: "extra" Type: "Uint64"}
+ Columns { Name: "extra" Type: "Uint64" }
)", {NKikimrScheme::StatusMultipleModifications});
TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
@@ -1156,7 +1199,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) {
// the table is no longer locked
TestAlterTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
- Columns { Name: "extra" Type: "Uint64"}
+ Columns { Name: "extra" Type: "Uint64" }
)");
env.TestWaitNotification(runtime, txId);