diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-04-18 12:48:11 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-04-18 12:48:11 +0300 |
commit | 9d3e934b2ffe7d708002cd135a742e564199c52b (patch) | |
tree | 2eb78c25557d72a408cce08ff23e14a0c7ecb3e0 | |
parent | 4dcb651d85a03d0f29eb5d15305efbb0f736a72b (diff) | |
download | ydb-9d3e934b2ffe7d708002cd135a742e564199c52b.tar.gz |
ReplicationConfig for replicated tables
10 files changed, 164 insertions, 10 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index e4c1743c1b3..87455da8b22 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -325,6 +325,15 @@ message TTTLSettings { optional string UseTiering = 3; } +message TTableReplicationConfig { + enum EReplicationMode { + REPLICATION_MODE_NONE = 0; + REPLICATION_MODE_READ_ONLY = 1; + } + + optional EReplicationMode Mode = 1; +} + message TTableDescription { optional string Name = 1; optional uint64 Id_Deprecated = 2; // LocalPathId, deprecated @@ -360,6 +369,8 @@ message TTableDescription { repeated TCdcStreamDescription CdcStreams = 38; repeated TSequenceDescription Sequences = 39; + + optional TTableReplicationConfig ReplicationConfig = 40; } message TCompressionOptions { diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 0f2a1dfd7ae..04abd5df592 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -302,7 +302,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { return true; } - typedef std::tuple<TPathId, ui32, ui64, TString, TString, TString, ui64, TString, bool> TTableRec; + typedef std::tuple<TPathId, ui32, ui64, TString, TString, TString, ui64, TString, bool, TString> TTableRec; typedef TDeque<TTableRec> TTableRows; template <typename SchemaTable, typename TRowSet> @@ -315,7 +315,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { rowSet.template GetValueOrDefault<typename SchemaTable::AlterTable>(), rowSet.template GetValueOrDefault<typename SchemaTable::PartitioningVersion>(0), rowSet.template GetValueOrDefault<typename SchemaTable::TTLSettings>(), - rowSet.template GetValueOrDefault<typename SchemaTable::IsBackup>(false) + rowSet.template GetValueOrDefault<typename SchemaTable::IsBackup>(false), + rowSet.template GetValueOrDefault<typename SchemaTable::ReplicationConfig>() ); } @@ -1760,12 +1761,16 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { tableInfo->PartitioningVersion = std::get<6>(rec); - TString ttlSettings = std::get<7>(rec); - if (ttlSettings) { + if (const auto ttlSettings = std::get<7>(rec)) { bool parseOk = ParseFromStringNoSizeLimit(tableInfo->MutableTTLSettings(), ttlSettings); Y_VERIFY(parseOk); } + if (const auto replicationConfig = std::get<9>(rec)) { + bool parseOk = ParseFromStringNoSizeLimit(tableInfo->MutableReplicationConfig(), replicationConfig); + Y_VERIFY(parseOk); + } + tableInfo->IsBackup = std::get<8>(rec); Self->Tables[pathId] = tableInfo; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp index 2d7505d56e7..e324f138df6 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp @@ -446,6 +446,9 @@ public: schema.ClearTTLSettings(); } + // replication config is not copied + schema.ClearReplicationConfig(); + NKikimrSchemeOp::TPartitionConfig compilationPartitionConfig; if (!TPartitionConfigMerger::ApplyChanges(compilationPartitionConfig, srcTableInfo->PartitionConfig(), schema.GetPartitionConfig(), AppData(), errStr) || !TPartitionConfigMerger::VerifyCreateParams(compilationPartitionConfig, AppData(), IsShadowDataAllowed(), errStr)) { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 5d6d365ee55..5ef47bb957b 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -2361,6 +2361,11 @@ void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId Y_PROTOBUF_SUPPRESS_NODISCARD tableInfo->TTLSettings().SerializeToString(&ttlSettings); } + TString replicationConfig; + if (tableInfo->HasReplicationConfig()) { + Y_PROTOBUF_SUPPRESS_NODISCARD tableInfo->ReplicationConfig().SerializeToString(&replicationConfig); + } + if (pathId.OwnerId == TabletID()) { db.Table<Schema::Tables>().Key(pathId.LocalPathId).Update( NIceDb::TUpdate<Schema::Tables::NextColId>(tableInfo->NextColumnId), @@ -2369,7 +2374,8 @@ void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId NIceDb::TUpdate<Schema::Tables::AlterTable>(TString()), NIceDb::TUpdate<Schema::Tables::AlterTableFull>(TString()), NIceDb::TUpdate<Schema::Tables::TTLSettings>(ttlSettings), - NIceDb::TUpdate<Schema::Tables::IsBackup>(tableInfo->IsBackup)); + NIceDb::TUpdate<Schema::Tables::IsBackup>(tableInfo->IsBackup), + NIceDb::TUpdate<Schema::Tables::ReplicationConfig>(replicationConfig)); } else { db.Table<Schema::MigratedTables>().Key(pathId.OwnerId, pathId.LocalPathId).Update( NIceDb::TUpdate<Schema::MigratedTables::NextColId>(tableInfo->NextColumnId), @@ -2378,7 +2384,8 @@ void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId NIceDb::TUpdate<Schema::MigratedTables::AlterTable>(TString()), NIceDb::TUpdate<Schema::MigratedTables::AlterTableFull>(TString()), NIceDb::TUpdate<Schema::MigratedTables::TTLSettings>(ttlSettings), - NIceDb::TUpdate<Schema::MigratedTables::IsBackup>(tableInfo->IsBackup)); + NIceDb::TUpdate<Schema::MigratedTables::IsBackup>(tableInfo->IsBackup), + NIceDb::TUpdate<Schema::MigratedTables::ReplicationConfig>(replicationConfig)); } for (auto col : tableInfo->Columns) { diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 00f45139c61..8fa7b71d1be 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -228,6 +228,26 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData( alterData->TableDescriptionFull->MutableTTLSettings()->CopyFrom(ttl); } + if (op.HasReplicationConfig()) { + const auto& cfg = op.GetReplicationConfig(); + + if (source) { + errStr = "Cannot alter replication config"; + return nullptr; + } + + switch (cfg.GetMode()) { + case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE: + case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY: + break; + default: + errStr = "Unknown replication mode"; + return nullptr; + } + + alterData->TableDescriptionFull->MutableReplicationConfig()->CopyFrom(cfg); + } + alterData->IsBackup = op.GetIsBackup(); if (source && op.KeyColumnNamesSize() == 0) @@ -1258,6 +1278,11 @@ void TTableInfo::FinishAlter() { MutableTTLSettings().Swap(AlterData->TableDescriptionFull->MutableTTLSettings()); } + // Apply replication config + if (AlterData->TableDescriptionFull.Defined() && AlterData->TableDescriptionFull->HasReplicationConfig()) { + MutableReplicationConfig().Swap(AlterData->TableDescriptionFull->MutableReplicationConfig()); + } + // Force FillDescription to regenerate TableDescription ResetDescriptionCache(); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index b1fc4f9a377..ad5ae6a118a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -415,6 +415,10 @@ struct TTableInfo : public TSimpleRefCount<TTableInfo> { const NKikimrSchemeOp::TPartitionConfig& PartitionConfig() const { return TableDescription.GetPartitionConfig(); } NKikimrSchemeOp::TPartitionConfig& MutablePartitionConfig() { return *TableDescription.MutablePartitionConfig(); } + bool HasReplicationConfig() { return TableDescription.HasReplicationConfig(); } + const NKikimrSchemeOp::TTableReplicationConfig& ReplicationConfig() { return TableDescription.GetReplicationConfig(); } + NKikimrSchemeOp::TTableReplicationConfig& MutableReplicationConfig() { return *TableDescription.MutableReplicationConfig(); } + bool HasTTLSettings() const { return TableDescription.HasTTLSettings(); } const NKikimrSchemeOp::TTTLSettings& TTLSettings() const { return TableDescription.GetTTLSettings(); } bool IsTTLEnabled() const { return HasTTLSettings() && TTLSettings().HasEnabled(); } diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index 31c7c5246e3..124092d7061 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -1091,6 +1091,10 @@ void TSchemeShard::DescribeTable(const TTableInfo::TPtr tableInfo, const NScheme entry->MutableTTLSettings()->CopyFrom(tableInfo->TTLSettings()); } + if (tableInfo->HasReplicationConfig()) { + entry->MutableReplicationConfig()->CopyFrom(tableInfo->ReplicationConfig()); + } + entry->SetIsBackup(tableInfo->IsBackup); } diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 9736e6ff837..723ecda8b77 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -117,6 +117,7 @@ struct Schema : NIceDb::Schema { struct PartitioningVersion : Column<7, NScheme::NTypeIds::Uint64> {}; struct TTLSettings : Column<8, NScheme::NTypeIds::String> {}; struct IsBackup : Column<9, NScheme::NTypeIds::Bool> {}; + struct ReplicationConfig : Column<10, NScheme::NTypeIds::String> {}; using TKey = TableKey<TabId>; using TColumns = TableColumns< @@ -128,7 +129,8 @@ struct Schema : NIceDb::Schema { AlterTableFull, PartitioningVersion, TTLSettings, - IsBackup + IsBackup, + ReplicationConfig >; }; @@ -144,6 +146,7 @@ struct Schema : NIceDb::Schema { struct PartitioningVersion : Column<8, NScheme::NTypeIds::Uint64> {}; struct TTLSettings : Column<9, NScheme::NTypeIds::String> {}; struct IsBackup : Column<10, NScheme::NTypeIds::Bool> {}; + struct ReplicationConfig : Column<11, NScheme::NTypeIds::String> {}; using TKey = TableKey<OwnerPathId, LocalPathId>; using TColumns = TableColumns< @@ -156,7 +159,8 @@ struct Schema : NIceDb::Schema { AlterTableFull, PartitioningVersion, TTLSettings, - IsBackup + IsBackup, + ReplicationConfig >; }; diff --git a/ydb/core/tx/schemeshard/ut_replication.cpp b/ydb/core/tx/schemeshard/ut_replication.cpp index cb581c9c1db..8107c059cdb 100644 --- a/ydb/core/tx/schemeshard/ut_replication.cpp +++ b/ydb/core/tx/schemeshard/ut_replication.cpp @@ -136,4 +136,83 @@ Y_UNIT_TEST_SUITE(TReplicationTests) { } } + void CreateReplicatedTable(NKikimrSchemeOp::TTableReplicationConfig::EReplicationMode mode) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", Sprintf(R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + ReplicationConfig { + Mode: %s + } + )", NKikimrSchemeOp::TTableReplicationConfig::EReplicationMode_Name(mode).c_str())); + env.TestWaitNotification(runtime, txId); + + const auto desc = DescribePath(runtime, "/MyRoot/Table"); + const auto& table = desc.GetPathDescription().GetTable(); + UNIT_ASSERT(table.HasReplicationConfig()); + UNIT_ASSERT_EQUAL(table.GetReplicationConfig().GetMode(), mode); + } + + Y_UNIT_TEST(CreateReplicatedTable) { + CreateReplicatedTable(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE); + CreateReplicatedTable(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY); + } + + Y_UNIT_TEST(AlterReplicationConfig) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + ReplicationConfig { + Mode: REPLICATION_MODE_READ_ONLY + } + )"); + env.TestWaitNotification(runtime, txId); + + TestAlterTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + DropColumns { Name: "value" } + ReplicationConfig { + Mode: REPLICATION_MODE_NONE + } + )", {NKikimrScheme::StatusInvalidParameter}); + } + + Y_UNIT_TEST(CopyReplicatedTable) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + ReplicationConfig { + Mode: REPLICATION_MODE_READ_ONLY + } + )"); + env.TestWaitNotification(runtime, txId); + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "CopyTable" + CopyFromTable: "/MyRoot/Table" + )"); + env.TestWaitNotification(runtime, txId); + + const auto desc = DescribePath(runtime, "/MyRoot/CopyTable"); + const auto& table = desc.GetPathDescription().GetTable(); + UNIT_ASSERT(!table.HasReplicationConfig()); + } + } // TReplicationTests diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 8a93d30cc32..c20f1d8ce52 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -341,6 +341,11 @@ "ColumnId": 9, "ColumnName": "IsBackup", "ColumnType": "Bool" + }, + { + "ColumnId": 10, + "ColumnName": "ReplicationConfig", + "ColumnType": "String" } ], "ColumnsDropped": [], @@ -355,7 +360,8 @@ 6, 7, 8, - 9 + 9, + 10 ], "RoomID": 0, "Codec": 0, @@ -3450,6 +3456,11 @@ "ColumnId": 10, "ColumnName": "IsBackup", "ColumnType": "Bool" + }, + { + "ColumnId": 11, + "ColumnName": "ReplicationConfig", + "ColumnType": "String" } ], "ColumnsDropped": [], @@ -3465,7 +3476,8 @@ 7, 8, 9, - 10 + 10, + 11 ], "RoomID": 0, "Codec": 0, |