diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-03-24 14:29:25 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-03-24 14:29:25 +0300 |
commit | 5d3a16dbb82b337e09db43c42aa12445e1f678c4 (patch) | |
tree | 0257ea3c73e345b50ecab7a008f9ebeb03d923ac | |
parent | 6b09dd4323839241524b076042c4ec1db82e329b (diff) | |
download | ydb-5d3a16dbb82b337e09db43c42aa12445e1f678c4.tar.gz |
Restrict schema operations for replicated tables
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp | 23 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_cdc_stream.cpp | 61 |
2 files changed, 71 insertions, 13 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp index e24350ae30..b1faeb20b1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp @@ -527,17 +527,19 @@ public: return result; } - if (path.Base()->GetAliveChildren() && alter.HasTTLSettings()) { + bool isReplicated = false; + if (path.Base()->GetAliveChildren()) { for (const auto& [_, childPathId] : path.Base()->GetChildren()) { Y_VERIFY(context.SS->PathsById.contains(childPathId)); auto childPath = context.SS->PathsById.at(childPathId); - if (!childPath->IsTableIndex() || childPath->Dropped()) { + if (!childPath->IsCdcStream() || childPath->Dropped()) { continue; } - Y_VERIFY(context.SS->Indexes.contains(childPathId)); - auto indexInfo = context.SS->Indexes.at(childPathId); + if (isReplicated = childPath->AsyncReplication.IsDefined()) { + break; + } } } @@ -555,6 +557,19 @@ public: return result; } + if (isReplicated) { + for (const auto& [id, column] : alterData->Columns) { + if (column.CreateVersion == alterData->AlterVersion) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, "Cannot add columns to replicated table"); + return result; + } + if (column.DeleteVersion == alterData->AlterVersion) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, "Cannot drop columns of replicated table"); + return result; + } + } + } + TBindingsRoomsChanges bindingChanges; if (context.SS->IsStorageConfigLogic(table)) { diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp index 6e92c829e3..95b5c025c6 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp @@ -156,11 +156,10 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { )"); env.TestWaitNotification(runtime, txId); - // user attr TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( TableName: "Table" StreamDescription { - Name: "Stream1" + Name: "Stream" Mode: ECdcStreamModeKeysOnly Format: ECdcStreamFormatProto UserAttributes { Key: "key" Value: "value" } @@ -182,17 +181,30 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { } }); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream1"), { + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { NLs::UserAttrsHas({ {"key", "value"}, }) }); + } + + Y_UNIT_TEST(ReplicationAttribute) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); - // async replication attr TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( TableName: "Table" StreamDescription { - Name: "Stream2" + Name: "Stream" Mode: ECdcStreamModeKeysOnly Format: ECdcStreamFormatProto UserAttributes { Key: "__async_replication" Value: "value" } @@ -207,7 +219,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"( TableName: "Table" StreamDescription { - Name: "Stream2" + Name: "Stream" Mode: ECdcStreamModeKeysOnly Format: ECdcStreamFormatProto UserAttributes { Key: "__async_replication" Value: "%s" } @@ -215,11 +227,42 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { )", EscapeC(jsonString).c_str())); env.TestWaitNotification(runtime, txId); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream2"), { + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { NLs::UserAttrsHas({ {"__async_replication", jsonString}, }) }); + + // now it is forbidden to change the scheme + TestAlterTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "extra" Type: "Uint64" } + )", {NKikimrScheme::StatusPreconditionFailed}); + + TestAlterTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + DropColumns { Name: "value" } + )", {NKikimrScheme::StatusPreconditionFailed}); + + // drop stream + TestDropCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamName: "Stream" + )"); + env.TestWaitNotification(runtime, txId); + + // now it is allowed to change the scheme + TestAlterTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "extra" Type: "Uint64" } + )"); + env.TestWaitNotification(runtime, txId); + + TestAlterTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + DropColumns { Name: "value" } + )"); + env.TestWaitNotification(runtime, txId); } Y_UNIT_TEST(Negative) { @@ -1134,7 +1177,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) { // the table is locked now TestAlterTable(runtime, ++txId, "/MyRoot", R"( Name: "Table" - Columns { Name: "extra" Type: "Uint64"} + Columns { Name: "extra" Type: "Uint64" } )", {NKikimrScheme::StatusMultipleModifications}); TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( @@ -1156,7 +1199,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) { // the table is no longer locked TestAlterTable(runtime, ++txId, "/MyRoot", R"( Name: "Table" - Columns { Name: "extra" Type: "Uint64"} + Columns { Name: "extra" Type: "Uint64" } )"); env.TestWaitNotification(runtime, txId); |