aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-04-18 12:48:11 +0300
committerilnaz <ilnaz@ydb.tech>2023-04-18 12:48:11 +0300
commit9d3e934b2ffe7d708002cd135a742e564199c52b (patch)
tree2eb78c25557d72a408cce08ff23e14a0c7ecb3e0
parent4dcb651d85a03d0f29eb5d15305efbb0f736a72b (diff)
downloadydb-9d3e934b2ffe7d708002cd135a742e564199c52b.tar.gz
ReplicationConfig for replicated tables
-rw-r--r--ydb/core/protos/flat_scheme_op.proto11
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp13
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp11
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.cpp25
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_schema.h8
-rw-r--r--ydb/core/tx/schemeshard/ut_replication.cpp79
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema16
10 files changed, 164 insertions, 10 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index e4c1743c1b3..87455da8b22 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -325,6 +325,15 @@ message TTTLSettings {
optional string UseTiering = 3;
}
+message TTableReplicationConfig {
+ enum EReplicationMode {
+ REPLICATION_MODE_NONE = 0;
+ REPLICATION_MODE_READ_ONLY = 1;
+ }
+
+ optional EReplicationMode Mode = 1;
+}
+
message TTableDescription {
optional string Name = 1;
optional uint64 Id_Deprecated = 2; // LocalPathId, deprecated
@@ -360,6 +369,8 @@ message TTableDescription {
repeated TCdcStreamDescription CdcStreams = 38;
repeated TSequenceDescription Sequences = 39;
+
+ optional TTableReplicationConfig ReplicationConfig = 40;
}
message TCompressionOptions {
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index 0f2a1dfd7ae..04abd5df592 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -302,7 +302,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
return true;
}
- typedef std::tuple<TPathId, ui32, ui64, TString, TString, TString, ui64, TString, bool> TTableRec;
+ typedef std::tuple<TPathId, ui32, ui64, TString, TString, TString, ui64, TString, bool, TString> TTableRec;
typedef TDeque<TTableRec> TTableRows;
template <typename SchemaTable, typename TRowSet>
@@ -315,7 +315,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
rowSet.template GetValueOrDefault<typename SchemaTable::AlterTable>(),
rowSet.template GetValueOrDefault<typename SchemaTable::PartitioningVersion>(0),
rowSet.template GetValueOrDefault<typename SchemaTable::TTLSettings>(),
- rowSet.template GetValueOrDefault<typename SchemaTable::IsBackup>(false)
+ rowSet.template GetValueOrDefault<typename SchemaTable::IsBackup>(false),
+ rowSet.template GetValueOrDefault<typename SchemaTable::ReplicationConfig>()
);
}
@@ -1760,12 +1761,16 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
tableInfo->PartitioningVersion = std::get<6>(rec);
- TString ttlSettings = std::get<7>(rec);
- if (ttlSettings) {
+ if (const auto ttlSettings = std::get<7>(rec)) {
bool parseOk = ParseFromStringNoSizeLimit(tableInfo->MutableTTLSettings(), ttlSettings);
Y_VERIFY(parseOk);
}
+ if (const auto replicationConfig = std::get<9>(rec)) {
+ bool parseOk = ParseFromStringNoSizeLimit(tableInfo->MutableReplicationConfig(), replicationConfig);
+ Y_VERIFY(parseOk);
+ }
+
tableInfo->IsBackup = std::get<8>(rec);
Self->Tables[pathId] = tableInfo;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp
index 2d7505d56e7..e324f138df6 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp
@@ -446,6 +446,9 @@ public:
schema.ClearTTLSettings();
}
+ // replication config is not copied
+ schema.ClearReplicationConfig();
+
NKikimrSchemeOp::TPartitionConfig compilationPartitionConfig;
if (!TPartitionConfigMerger::ApplyChanges(compilationPartitionConfig, srcTableInfo->PartitionConfig(), schema.GetPartitionConfig(), AppData(), errStr)
|| !TPartitionConfigMerger::VerifyCreateParams(compilationPartitionConfig, AppData(), IsShadowDataAllowed(), errStr)) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 5d6d365ee55..5ef47bb957b 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -2361,6 +2361,11 @@ void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId
Y_PROTOBUF_SUPPRESS_NODISCARD tableInfo->TTLSettings().SerializeToString(&ttlSettings);
}
+ TString replicationConfig;
+ if (tableInfo->HasReplicationConfig()) {
+ Y_PROTOBUF_SUPPRESS_NODISCARD tableInfo->ReplicationConfig().SerializeToString(&replicationConfig);
+ }
+
if (pathId.OwnerId == TabletID()) {
db.Table<Schema::Tables>().Key(pathId.LocalPathId).Update(
NIceDb::TUpdate<Schema::Tables::NextColId>(tableInfo->NextColumnId),
@@ -2369,7 +2374,8 @@ void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId
NIceDb::TUpdate<Schema::Tables::AlterTable>(TString()),
NIceDb::TUpdate<Schema::Tables::AlterTableFull>(TString()),
NIceDb::TUpdate<Schema::Tables::TTLSettings>(ttlSettings),
- NIceDb::TUpdate<Schema::Tables::IsBackup>(tableInfo->IsBackup));
+ NIceDb::TUpdate<Schema::Tables::IsBackup>(tableInfo->IsBackup),
+ NIceDb::TUpdate<Schema::Tables::ReplicationConfig>(replicationConfig));
} else {
db.Table<Schema::MigratedTables>().Key(pathId.OwnerId, pathId.LocalPathId).Update(
NIceDb::TUpdate<Schema::MigratedTables::NextColId>(tableInfo->NextColumnId),
@@ -2378,7 +2384,8 @@ void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId
NIceDb::TUpdate<Schema::MigratedTables::AlterTable>(TString()),
NIceDb::TUpdate<Schema::MigratedTables::AlterTableFull>(TString()),
NIceDb::TUpdate<Schema::MigratedTables::TTLSettings>(ttlSettings),
- NIceDb::TUpdate<Schema::MigratedTables::IsBackup>(tableInfo->IsBackup));
+ NIceDb::TUpdate<Schema::MigratedTables::IsBackup>(tableInfo->IsBackup),
+ NIceDb::TUpdate<Schema::MigratedTables::ReplicationConfig>(replicationConfig));
}
for (auto col : tableInfo->Columns) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
index 00f45139c61..8fa7b71d1be 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
@@ -228,6 +228,26 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData(
alterData->TableDescriptionFull->MutableTTLSettings()->CopyFrom(ttl);
}
+ if (op.HasReplicationConfig()) {
+ const auto& cfg = op.GetReplicationConfig();
+
+ if (source) {
+ errStr = "Cannot alter replication config";
+ return nullptr;
+ }
+
+ switch (cfg.GetMode()) {
+ case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE:
+ case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY:
+ break;
+ default:
+ errStr = "Unknown replication mode";
+ return nullptr;
+ }
+
+ alterData->TableDescriptionFull->MutableReplicationConfig()->CopyFrom(cfg);
+ }
+
alterData->IsBackup = op.GetIsBackup();
if (source && op.KeyColumnNamesSize() == 0)
@@ -1258,6 +1278,11 @@ void TTableInfo::FinishAlter() {
MutableTTLSettings().Swap(AlterData->TableDescriptionFull->MutableTTLSettings());
}
+ // Apply replication config
+ if (AlterData->TableDescriptionFull.Defined() && AlterData->TableDescriptionFull->HasReplicationConfig()) {
+ MutableReplicationConfig().Swap(AlterData->TableDescriptionFull->MutableReplicationConfig());
+ }
+
// Force FillDescription to regenerate TableDescription
ResetDescriptionCache();
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index b1fc4f9a377..ad5ae6a118a 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -415,6 +415,10 @@ struct TTableInfo : public TSimpleRefCount<TTableInfo> {
const NKikimrSchemeOp::TPartitionConfig& PartitionConfig() const { return TableDescription.GetPartitionConfig(); }
NKikimrSchemeOp::TPartitionConfig& MutablePartitionConfig() { return *TableDescription.MutablePartitionConfig(); }
+ bool HasReplicationConfig() { return TableDescription.HasReplicationConfig(); }
+ const NKikimrSchemeOp::TTableReplicationConfig& ReplicationConfig() { return TableDescription.GetReplicationConfig(); }
+ NKikimrSchemeOp::TTableReplicationConfig& MutableReplicationConfig() { return *TableDescription.MutableReplicationConfig(); }
+
bool HasTTLSettings() const { return TableDescription.HasTTLSettings(); }
const NKikimrSchemeOp::TTTLSettings& TTLSettings() const { return TableDescription.GetTTLSettings(); }
bool IsTTLEnabled() const { return HasTTLSettings() && TTLSettings().HasEnabled(); }
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
index 31c7c5246e3..124092d7061 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
@@ -1091,6 +1091,10 @@ void TSchemeShard::DescribeTable(const TTableInfo::TPtr tableInfo, const NScheme
entry->MutableTTLSettings()->CopyFrom(tableInfo->TTLSettings());
}
+ if (tableInfo->HasReplicationConfig()) {
+ entry->MutableReplicationConfig()->CopyFrom(tableInfo->ReplicationConfig());
+ }
+
entry->SetIsBackup(tableInfo->IsBackup);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h
index 9736e6ff837..723ecda8b77 100644
--- a/ydb/core/tx/schemeshard/schemeshard_schema.h
+++ b/ydb/core/tx/schemeshard/schemeshard_schema.h
@@ -117,6 +117,7 @@ struct Schema : NIceDb::Schema {
struct PartitioningVersion : Column<7, NScheme::NTypeIds::Uint64> {};
struct TTLSettings : Column<8, NScheme::NTypeIds::String> {};
struct IsBackup : Column<9, NScheme::NTypeIds::Bool> {};
+ struct ReplicationConfig : Column<10, NScheme::NTypeIds::String> {};
using TKey = TableKey<TabId>;
using TColumns = TableColumns<
@@ -128,7 +129,8 @@ struct Schema : NIceDb::Schema {
AlterTableFull,
PartitioningVersion,
TTLSettings,
- IsBackup
+ IsBackup,
+ ReplicationConfig
>;
};
@@ -144,6 +146,7 @@ struct Schema : NIceDb::Schema {
struct PartitioningVersion : Column<8, NScheme::NTypeIds::Uint64> {};
struct TTLSettings : Column<9, NScheme::NTypeIds::String> {};
struct IsBackup : Column<10, NScheme::NTypeIds::Bool> {};
+ struct ReplicationConfig : Column<11, NScheme::NTypeIds::String> {};
using TKey = TableKey<OwnerPathId, LocalPathId>;
using TColumns = TableColumns<
@@ -156,7 +159,8 @@ struct Schema : NIceDb::Schema {
AlterTableFull,
PartitioningVersion,
TTLSettings,
- IsBackup
+ IsBackup,
+ ReplicationConfig
>;
};
diff --git a/ydb/core/tx/schemeshard/ut_replication.cpp b/ydb/core/tx/schemeshard/ut_replication.cpp
index cb581c9c1db..8107c059cdb 100644
--- a/ydb/core/tx/schemeshard/ut_replication.cpp
+++ b/ydb/core/tx/schemeshard/ut_replication.cpp
@@ -136,4 +136,83 @@ Y_UNIT_TEST_SUITE(TReplicationTests) {
}
}
+ void CreateReplicatedTable(NKikimrSchemeOp::TTableReplicationConfig::EReplicationMode mode) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ ReplicationConfig {
+ Mode: %s
+ }
+ )", NKikimrSchemeOp::TTableReplicationConfig::EReplicationMode_Name(mode).c_str()));
+ env.TestWaitNotification(runtime, txId);
+
+ const auto desc = DescribePath(runtime, "/MyRoot/Table");
+ const auto& table = desc.GetPathDescription().GetTable();
+ UNIT_ASSERT(table.HasReplicationConfig());
+ UNIT_ASSERT_EQUAL(table.GetReplicationConfig().GetMode(), mode);
+ }
+
+ Y_UNIT_TEST(CreateReplicatedTable) {
+ CreateReplicatedTable(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE);
+ CreateReplicatedTable(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
+ }
+
+ Y_UNIT_TEST(AlterReplicationConfig) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ ReplicationConfig {
+ Mode: REPLICATION_MODE_READ_ONLY
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestAlterTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ DropColumns { Name: "value" }
+ ReplicationConfig {
+ Mode: REPLICATION_MODE_NONE
+ }
+ )", {NKikimrScheme::StatusInvalidParameter});
+ }
+
+ Y_UNIT_TEST(CopyReplicatedTable) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ ReplicationConfig {
+ Mode: REPLICATION_MODE_READ_ONLY
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "CopyTable"
+ CopyFromTable: "/MyRoot/Table"
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ const auto desc = DescribePath(runtime, "/MyRoot/CopyTable");
+ const auto& table = desc.GetPathDescription().GetTable();
+ UNIT_ASSERT(!table.HasReplicationConfig());
+ }
+
} // TReplicationTests
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
index 8a93d30cc32..c20f1d8ce52 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
@@ -341,6 +341,11 @@
"ColumnId": 9,
"ColumnName": "IsBackup",
"ColumnType": "Bool"
+ },
+ {
+ "ColumnId": 10,
+ "ColumnName": "ReplicationConfig",
+ "ColumnType": "String"
}
],
"ColumnsDropped": [],
@@ -355,7 +360,8 @@
6,
7,
8,
- 9
+ 9,
+ 10
],
"RoomID": 0,
"Codec": 0,
@@ -3450,6 +3456,11 @@
"ColumnId": 10,
"ColumnName": "IsBackup",
"ColumnType": "Bool"
+ },
+ {
+ "ColumnId": 11,
+ "ColumnName": "ReplicationConfig",
+ "ColumnType": "String"
}
],
"ColumnsDropped": [],
@@ -3465,7 +3476,8 @@
7,
8,
9,
- 10
+ 10,
+ 11
],
"RoomID": 0,
"Codec": 0,