diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2024-11-25 22:57:58 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-25 19:57:58 +0000 |
commit | 76440978e610a982a5e1dec23514d1be54268e35 (patch) | |
tree | f2a6ce585aa87e778e299bcfef0aaeaa360a97b4 | |
parent | a328b3002af3e74977283814bdeeb064c3b83645 (diff) | |
download | ydb-76440978e610a982a5e1dec23514d1be54268e35.tar.gz |
Consistency option (core part) (#11970)
11 files changed, 340 insertions, 46 deletions
diff --git a/ydb/core/protos/replication.proto b/ydb/core/protos/replication.proto index 078f09d58d..64088cef07 100644 --- a/ydb/core/protos/replication.proto +++ b/ydb/core/protos/replication.proto @@ -69,7 +69,19 @@ message TReplicationConfig { TTargetSpecific Specific = 5; } - optional bool InitialSync = 6; + reserved 6; // InitialSync + + message TWeakConsistency { + } + + message TStrongConsistency { + optional uint64 CommitIntervalMilliSeconds = 1; + } + + oneof Consistency { + TWeakConsistency WeakConsistency = 7; + TStrongConsistency StrongConsistency = 8; + } } message TReplicationState { diff --git a/ydb/core/tx/replication/controller/dst_creator.cpp b/ydb/core/tx/replication/controller/dst_creator.cpp index 4f241702e0..2ae56f68c5 100644 --- a/ydb/core/tx/replication/controller/dst_creator.cpp +++ b/ydb/core/tx/replication/controller/dst_creator.cpp @@ -7,6 +7,7 @@ #include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/cms/console/configs_dispatcher.h> #include <ydb/core/protos/console_config.pb.h> +#include <ydb/core/protos/replication.pb.h> #include <ydb/core/protos/schemeshard/operations.pb.h> #include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h> #include <ydb/core/tx/scheme_board/events.h> @@ -226,10 +227,8 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> { AllocateTxId(); } - static void FillReplicationConfig(NKikimrSchemeOp::TTableReplicationConfig& replicationConfig) { - // TODO: support other modes - replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY); - replicationConfig.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK); + void FillReplicationConfig(NKikimrSchemeOp::TTableReplicationConfig& replicationConfig) const { + NController::FillReplicationConfig(replicationConfig, Mode, Consistency); } void AllocateTxId() { @@ -375,22 +374,7 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> { return false; } - const auto& replicationConfig = got.GetReplicationConfig(); - - switch (replicationConfig.GetMode()) { - case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY: - break; - default: - error = "Unsupported replication mode"; - return false; - } - - switch (replicationConfig.GetConsistency()) { - case NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK: - break; - default: - error = TStringBuilder() << "Unsupported replication consistency" - << ": " << static_cast<int>(replicationConfig.GetConsistency()); + if (!CheckReplicationConfig(got.GetReplicationConfig(), Mode, Consistency, error)) { return false; } @@ -623,7 +607,9 @@ public: ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, - const TString& dstPath) + const TString& dstPath, + EReplicationMode mode, + EReplicaConsistency consistency) : Parent(parent) , SchemeShardId(schemeShardId) , YdbProxy(proxy) @@ -633,6 +619,8 @@ public: , Kind(kind) , SrcPath(srcPath) , DstPath(dstPath) + , Mode(mode) + , Consistency(consistency) , LogPrefix("DstCreator", ReplicationId, TargetId) { } @@ -665,6 +653,8 @@ private: const TReplication::ETargetKind Kind; const TString SrcPath; const TString DstPath; + const EReplicationMode Mode; + const EReplicaConsistency Consistency; const TActorLogPrefix LogPrefix; TPathId DomainKey; @@ -680,17 +670,72 @@ private: }; // TDstCreator +static NKikimrSchemeOp::TTableReplicationConfig::EConsistency ConvertConsistency(EReplicaConsistency value) { + switch (value) { + case EReplicaConsistency::Weak: + return NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK; + case EReplicaConsistency::Strong: + return NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_STRONG; + } +} + +static NKikimrSchemeOp::TTableReplicationConfig::EReplicationMode ConvertMode(EReplicationMode value) { + switch (value) { + case EReplicationMode::ReadOnly: + return NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY; + } +} + +void FillReplicationConfig( + NKikimrSchemeOp::TTableReplicationConfig& out, + EReplicationMode mode, + EReplicaConsistency consistency +) { + out.SetMode(ConvertMode(mode)); + out.SetConsistency(ConvertConsistency(consistency)); +} + +bool CheckReplicationConfig( + const NKikimrSchemeOp::TTableReplicationConfig& in, + EReplicationMode mode, + EReplicaConsistency consistency, + TString& error +) { + if (in.GetMode() != ConvertMode(mode)) { + error = TStringBuilder() << "Replication mode mismatch" + << ": expected: " << ConvertMode(mode) + << ", got: " << static_cast<int>(in.GetMode()); + return false; + } + + if (in.GetConsistency() != ConvertConsistency(consistency)) { + error = TStringBuilder() << "Replication consistency mismatch" + << ": expected: " << ConvertConsistency(consistency) + << ", got: " << static_cast<int>(in.GetConsistency()); + return false; + } + + return true; +} + IActor* CreateDstCreator(TReplication* replication, ui64 targetId, const TActorContext& ctx) { const auto* target = replication->FindTarget(targetId); Y_ABORT_UNLESS(target); + + const auto consistency = replication->GetConfig().HasStrongConsistency() + ? EReplicaConsistency::Strong + : EReplicaConsistency::Weak; + return CreateDstCreator(ctx.SelfID, replication->GetSchemeShardId(), replication->GetYdbProxy(), replication->GetPathId(), - replication->GetId(), target->GetId(), target->GetKind(), target->GetSrcPath(), target->GetDstPath()); + replication->GetId(), target->GetId(), target->GetKind(), target->GetSrcPath(), target->GetDstPath(), + EReplicationMode::ReadOnly, consistency); } IActor* CreateDstCreator(const TActorId& parent, ui64 schemeShardId, const TActorId& proxy, const TPathId& pathId, - ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath) + ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath, + EReplicationMode mode, EReplicaConsistency consistency) { - return new TDstCreator(parent, schemeShardId, proxy, pathId, rid, tid, kind, srcPath, dstPath); + return new TDstCreator(parent, schemeShardId, proxy, pathId, rid, tid, kind, srcPath, dstPath, mode, consistency); } } diff --git a/ydb/core/tx/replication/controller/dst_creator.h b/ydb/core/tx/replication/controller/dst_creator.h index 4027cf089f..5f6bdba2e8 100644 --- a/ydb/core/tx/replication/controller/dst_creator.h +++ b/ydb/core/tx/replication/controller/dst_creator.h @@ -2,10 +2,35 @@ #include "replication.h" +namespace NKikimrSchemeOp { + class TTableReplicationConfig; +} + namespace NKikimr::NReplication::NController { +enum class EReplicationMode { + ReadOnly, +}; + +enum class EReplicaConsistency { + Weak, + Strong, +}; + +void FillReplicationConfig( + NKikimrSchemeOp::TTableReplicationConfig& out, + EReplicationMode mode, + EReplicaConsistency consistency); +bool CheckReplicationConfig( + const NKikimrSchemeOp::TTableReplicationConfig& in, + EReplicationMode mode, + EReplicaConsistency consistency, + TString& error); + IActor* CreateDstCreator(TReplication* replication, ui64 targetId, const TActorContext& ctx); IActor* CreateDstCreator(const TActorId& parent, ui64 schemeShardId, const TActorId& proxy, const TPathId& pathId, - ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath); + ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath, + EReplicationMode mode = EReplicationMode::ReadOnly, + EReplicaConsistency consistency = EReplicaConsistency::Weak); } diff --git a/ydb/core/tx/replication/controller/dst_creator_ut.cpp b/ydb/core/tx/replication/controller/dst_creator_ut.cpp index 805cd2b1e3..12e94919f6 100644 --- a/ydb/core/tx/replication/controller/dst_creator_ut.cpp +++ b/ydb/core/tx/replication/controller/dst_creator_ut.cpp @@ -13,7 +13,12 @@ namespace NKikimr::NReplication::NController { Y_UNIT_TEST_SUITE(DstCreator) { using namespace NTestHelpers; - void CheckTableReplica(const TTestTableDescription& tableDesc, const NKikimrSchemeOp::TTableDescription& replicatedDesc) { + void CheckTableReplica( + const TTestTableDescription& tableDesc, + const NKikimrSchemeOp::TTableDescription& replicatedDesc, + EReplicationMode mode = EReplicationMode::ReadOnly, + EReplicaConsistency consistency = EReplicaConsistency::Weak + ) { UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.KeyColumnNamesSize(), tableDesc.KeyColumns.size()); for (ui32 i = 0; i < replicatedDesc.KeyColumnNamesSize(); ++i) { UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.GetKeyColumnNames(i), tableDesc.KeyColumns[i]); @@ -28,12 +33,15 @@ Y_UNIT_TEST_SUITE(DstCreator) { UNIT_ASSERT(FindIfPtr(tableDesc.Columns, pred)); } - const auto& replCfg = replicatedDesc.GetReplicationConfig(); - UNIT_ASSERT_VALUES_EQUAL(replCfg.GetMode(), NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY); - UNIT_ASSERT_VALUES_EQUAL(replCfg.GetConsistency(), NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK); + TString error; + UNIT_ASSERT_C(CheckReplicationConfig(replicatedDesc.GetReplicationConfig(), mode, consistency, error), error); } - void Basic(const TString& replicatedPath) { + void Basic( + const TString& replicatedPath, + EReplicationMode mode = EReplicationMode::ReadOnly, + EReplicaConsistency consistency = EReplicaConsistency::Weak + ) { TEnv env; env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE); @@ -50,7 +58,7 @@ Y_UNIT_TEST_SUITE(DstCreator) { env.CreateTable("/Root", *MakeTableDescription(tableDesc)); env.GetRuntime().Register(CreateDstCreator( env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"), - 1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", replicatedPath + 1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", replicatedPath, mode, consistency )); auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateDstResult>(env.GetSender()); @@ -59,7 +67,7 @@ Y_UNIT_TEST_SUITE(DstCreator) { auto desc = env.GetDescription(replicatedPath); const auto& replicatedDesc = desc.GetPathDescription().GetTable(); - CheckTableReplica(tableDesc, replicatedDesc); + CheckTableReplica(tableDesc, replicatedDesc, mode, consistency); } Y_UNIT_TEST(Basic) { @@ -70,6 +78,10 @@ Y_UNIT_TEST_SUITE(DstCreator) { Basic("/Root/Dir/Replicated"); } + Y_UNIT_TEST(StrongConsistency) { + Basic("/Root/Replicated", EReplicationMode::ReadOnly, EReplicaConsistency::Strong); + } + void WithIndex(const TString& replicatedPath, NKikimrSchemeOp::EIndexType indexType) { TEnv env(TFeatureFlags().SetEnableChangefeedsOnIndexTables(true)); env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE); @@ -370,7 +382,7 @@ Y_UNIT_TEST_SUITE(DstCreator) { }); } - Y_UNIT_TEST(UnsupportedReplicationMode) { + Y_UNIT_TEST(ReplicationModeMismatch) { auto changeMode = [](const TTestTableDescription& desc) { auto copy = desc; copy.ReplicationConfig->Mode = TTestTableDescription::TReplicationConfig::MODE_NONE; @@ -378,7 +390,7 @@ Y_UNIT_TEST_SUITE(DstCreator) { return copy; }; - ExistingDst(NKikimrScheme::StatusSchemeError, "Unsupported replication mode", changeMode, TTestTableDescription{ + ExistingDst(NKikimrScheme::StatusSchemeError, "Replication mode mismatch", changeMode, TTestTableDescription{ .Name = "Table", .KeyColumns = {"key"}, .Columns = { @@ -388,20 +400,24 @@ Y_UNIT_TEST_SUITE(DstCreator) { }); } - Y_UNIT_TEST(UnsupportedReplicationConsistency) { + Y_UNIT_TEST(ReplicationConsistencyMismatch) { auto changeConsistency = [](const TTestTableDescription& desc) { auto copy = desc; copy.ReplicationConfig->Consistency = TTestTableDescription::TReplicationConfig::CONSISTENCY_STRONG; return copy; }; - ExistingDst(NKikimrScheme::StatusSchemeError, "Unsupported replication consistency", changeConsistency, TTestTableDescription{ + ExistingDst(NKikimrScheme::StatusSchemeError, "Replication consistency mismatch", changeConsistency, TTestTableDescription{ .Name = "Table", .KeyColumns = {"key"}, .Columns = { {.Name = "key", .Type = "Uint32"}, {.Name = "value", .Type = "Utf8"}, }, + .ReplicationConfig = TTestTableDescription::TReplicationConfig{ + .Mode = TTestTableDescription::TReplicationConfig::MODE_READ_ONLY, + .Consistency = TTestTableDescription::TReplicationConfig::CONSISTENCY_WEAK, + }, }); } } diff --git a/ydb/core/tx/replication/controller/stream_creator.cpp b/ydb/core/tx/replication/controller/stream_creator.cpp index 7200b32db8..ce6fb2bcd2 100644 --- a/ydb/core/tx/replication/controller/stream_creator.cpp +++ b/ydb/core/tx/replication/controller/stream_creator.cpp @@ -19,13 +19,25 @@ namespace NKikimr::NReplication::NController { class TStreamCreator: public TActorBootstrapped<TStreamCreator> { static NYdb::NTable::TChangefeedDescription MakeChangefeed( - const TString& name, const TDuration& retentionPeriod, const NJson::TJsonMap& attrs) + const TString& name, + const TDuration& retentionPeriod, + const std::optional<TDuration>& resolvedTimestamps, + const NJson::TJsonMap& attrs) { using namespace NYdb::NTable; - return TChangefeedDescription(name, EChangefeedMode::Updates, EChangefeedFormat::Json) + + auto desc = TChangefeedDescription(name, EChangefeedMode::Updates, EChangefeedFormat::Json) .WithRetentionPeriod(retentionPeriod) .WithInitialScan() .AddAttribute("__async_replication", NJson::WriteJson(attrs, false)); + + if (resolvedTimestamps) { + desc + .WithVirtualTimestamps() + .WithResolvedTimestamps(*resolvedTimestamps); + } + + return desc; } void RequestPermission() { @@ -161,17 +173,19 @@ public: const TString& srcPath, const TString& dstPath, const TString& streamName, - const TDuration& streamRetentionPeriod) + const TDuration& retentionPeriod, + const std::optional<TDuration>& resolvedTimestamps, + bool supportsTopicAutopartitioning) : Parent(parent) , YdbProxy(proxy) , ReplicationId(rid) , TargetId(tid) , Kind(kind) , SrcPath(srcPath) - , Changefeed(MakeChangefeed(streamName, streamRetentionPeriod, NJson::TJsonMap{ + , Changefeed(MakeChangefeed(streamName, retentionPeriod, resolvedTimestamps, NJson::TJsonMap{ {"path", dstPath}, {"id", ToString(rid)}, - {"supports_topic_autopartitioning", AppData()->FeatureFlags.GetEnableTopicAutopartitioningForReplication()}, + {"supports_topic_autopartitioning", supportsTopicAutopartitioning}, })) , LogPrefix("StreamCreator", ReplicationId, TargetId) { @@ -202,17 +216,27 @@ private: IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TActorContext& ctx) { const auto* target = replication->FindTarget(targetId); Y_ABORT_UNLESS(target); + + const auto& config = replication->GetConfig(); + const auto resolvedTimestamps = config.HasStrongConsistency() + ? std::make_optional(TDuration::MilliSeconds(config.GetStrongConsistency().GetCommitIntervalMilliSeconds())) + : std::nullopt; + return CreateStreamCreator(ctx.SelfID, replication->GetYdbProxy(), replication->GetId(), target->GetId(), target->GetKind(), target->GetSrcPath(), target->GetDstPath(), target->GetStreamName(), - TDuration::Seconds(AppData()->ReplicationConfig.GetRetentionPeriodSeconds())); + TDuration::Seconds(AppData()->ReplicationConfig.GetRetentionPeriodSeconds()), resolvedTimestamps, + AppData()->FeatureFlags.GetEnableTopicAutopartitioningForReplication()); } IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath, - const TString& streamName, const TDuration& streamRetentionPeriod) + const TString& streamName, const TDuration& retentionPeriod, + const std::optional<TDuration>& resolvedTimestamps, + bool supportsTopicAutopartitioning) { - return new TStreamCreator(parent, proxy, rid, tid, kind, srcPath, dstPath, streamName, streamRetentionPeriod); + return new TStreamCreator(parent, proxy, rid, tid, kind, srcPath, dstPath, + streamName, retentionPeriod, resolvedTimestamps, supportsTopicAutopartitioning); } } diff --git a/ydb/core/tx/replication/controller/stream_creator.h b/ydb/core/tx/replication/controller/stream_creator.h index 1eca930efb..c9ec2cc39a 100644 --- a/ydb/core/tx/replication/controller/stream_creator.h +++ b/ydb/core/tx/replication/controller/stream_creator.h @@ -2,11 +2,15 @@ #include "replication.h" +#include <optional> + namespace NKikimr::NReplication::NController { IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TActorContext& ctx); IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath, - const TString& streamName, const TDuration& streamRetentionPeriod); + const TString& streamName, const TDuration& streamRetentionPeriod, + const std::optional<TDuration>& resolvedTimestamps = std::nullopt, + bool supportsTopicAutopartitioning = false); } diff --git a/ydb/core/tx/replication/controller/stream_creator_ut.cpp b/ydb/core/tx/replication/controller/stream_creator_ut.cpp new file mode 100644 index 0000000000..136a43364b --- /dev/null +++ b/ydb/core/tx/replication/controller/stream_creator_ut.cpp @@ -0,0 +1,75 @@ +#include "dst_creator.h" +#include "private_events.h" +#include "stream_creator.h" + +#include <ydb/core/tx/replication/ut_helpers/test_env.h> +#include <ydb/core/tx/replication/ut_helpers/test_table.h> + +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NReplication::NController { + +Y_UNIT_TEST_SUITE(StreamCreator) { + using namespace NTestHelpers; + + void Basic(const std::optional<TDuration>& resolvedTimestamps = {}) { + TEnv env; + env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE); + + const auto tableDesc = TTestTableDescription{ + .Name = "Table", + .KeyColumns = {"key"}, + .Columns = { + {.Name = "key", .Type = "Uint32"}, + {.Name = "value", .Type = "Utf8"}, + }, + .ReplicationConfig = Nothing(), + }; + + env.CreateTable("/Root", *MakeTableDescription(tableDesc)); + + env.GetRuntime().Register(CreateDstCreator( + env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"), + 1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", "/Root/Replica" + )); + { + auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateDstResult>(env.GetSender()); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess); + } + + env.GetRuntime().Register(CreateStreamCreator( + env.GetSender(), env.GetYdbProxy(), 1 /* rid */, 1 /* tid */, + TReplication::ETargetKind::Table, "/Root/Table", "/Root/Replica", + "Stream", TDuration::Hours(1), resolvedTimestamps + )); + { + auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvRequestCreateStream>(env.GetSender()); + env.GetRuntime().Send(ev->Sender, env.GetSender(), new TEvPrivate::TEvAllowCreateStream()); + } + { + auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateStreamResult>(env.GetSender()); + UNIT_ASSERT(ev->Get()->IsSuccess()); + } + + auto desc = env.GetDescription("/Root/Table"); + + const auto& streams = desc.GetPathDescription().GetTable().GetCdcStreams(); + UNIT_ASSERT_VALUES_EQUAL(streams.size(), 1); + + const auto& stream = streams.at(0); + UNIT_ASSERT_VALUES_EQUAL(stream.GetMode(), NKikimrSchemeOp::ECdcStreamModeUpdate); + UNIT_ASSERT_VALUES_EQUAL(stream.GetFormat(), NKikimrSchemeOp::ECdcStreamFormatJson); + UNIT_ASSERT_VALUES_EQUAL(stream.GetVirtualTimestamps(), resolvedTimestamps.has_value()); + UNIT_ASSERT_VALUES_EQUAL(stream.GetResolvedTimestampsIntervalMs(), resolvedTimestamps.value_or(TDuration::Zero()).MilliSeconds()); + } + + Y_UNIT_TEST(Basic) { + Basic(); + } + + Y_UNIT_TEST(WithResolvedTimestamps) { + Basic(TDuration::Seconds(10)); + } +} + +} diff --git a/ydb/core/tx/replication/controller/ut_stream_creator/ya.make b/ydb/core/tx/replication/controller/ut_stream_creator/ya.make new file mode 100644 index 0000000000..fb4abc7109 --- /dev/null +++ b/ydb/core/tx/replication/controller/ut_stream_creator/ya.make @@ -0,0 +1,20 @@ +UNITTEST_FOR(ydb/core/tx/replication/controller) + +FORK_SUBTESTS() + +SIZE(MEDIUM) + +TIMEOUT(600) + +PEERDIR( + ydb/core/tx/replication/ut_helpers + library/cpp/testing/unittest +) + +SRCS( + stream_creator_ut.cpp +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/tx/replication/controller/ya.make b/ydb/core/tx/replication/controller/ya.make index 5acb9bbacd..97a07bd54b 100644 --- a/ydb/core/tx/replication/controller/ya.make +++ b/ydb/core/tx/replication/controller/ya.make @@ -62,5 +62,6 @@ END() RECURSE_FOR_TESTS( ut_dst_creator + ut_stream_creator ut_target_discoverer ) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp index 75b03b39be..b8a2a038ea 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp @@ -339,6 +339,10 @@ public: desc.MutableConfig()->MutableSrcConnectionParams()->MutableOAuthToken()->SetToken(BUILTIN_ACL_ROOT); } + if (desc.GetConfig().GetConsistencyCase() == NKikimrReplication::TReplicationConfig::CONSISTENCY_NOT_SET) { + desc.MutableConfig()->MutableWeakConsistency(); + } + desc.MutableState()->MutableStandBy(); auto replication = TReplicationInfo::Create(std::move(desc)); context.SS->Replications[path->PathId] = replication; diff --git a/ydb/core/tx/schemeshard/ut_replication/ut_replication.cpp b/ydb/core/tx/schemeshard/ut_replication/ut_replication.cpp index f29efff076..be3922e8a0 100644 --- a/ydb/core/tx/schemeshard/ut_replication/ut_replication.cpp +++ b/ydb/core/tx/schemeshard/ut_replication/ut_replication.cpp @@ -168,6 +168,74 @@ Y_UNIT_TEST_SUITE(TReplicationTests) { UNIT_ASSERT_VALUES_UNEQUAL("root@builtin", params.GetOAuthToken().GetToken()); } + Y_UNIT_TEST(ConsistencyMode) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().InitYdbDriver(true)); + ui64 txId = 100; + + SetupLogging(runtime); + + TestCreateReplication(runtime, ++txId, "/MyRoot", R"( + Name: "Replication1" + Config { + Specific { + Targets { + SrcPath: "/MyRoot1/Table" + DstPath: "/MyRoot2/Table" + } + } + } + )"); + env.TestWaitNotification(runtime, txId); + { + const auto desc = DescribePath(runtime, "/MyRoot/Replication1"); + const auto& config = desc.GetPathDescription().GetReplicationDescription().GetConfig(); + UNIT_ASSERT(config.HasWeakConsistency()); + } + + TestCreateReplication(runtime, ++txId, "/MyRoot", R"( + Name: "Replication2" + Config { + Specific { + Targets { + SrcPath: "/MyRoot1/Table" + DstPath: "/MyRoot2/Table" + } + } + WeakConsistency { + } + } + )"); + env.TestWaitNotification(runtime, txId); + { + const auto desc = DescribePath(runtime, "/MyRoot/Replication2"); + const auto& config = desc.GetPathDescription().GetReplicationDescription().GetConfig(); + UNIT_ASSERT(config.HasWeakConsistency()); + } + + TestCreateReplication(runtime, ++txId, "/MyRoot", R"( + Name: "Replication3" + Config { + Specific { + Targets { + SrcPath: "/MyRoot1/Table" + DstPath: "/MyRoot2/Table" + } + } + StrongConsistency { + CommitIntervalMilliSeconds: 10000 + } + } + )"); + env.TestWaitNotification(runtime, txId); + { + const auto desc = DescribePath(runtime, "/MyRoot/Replication3"); + const auto& config = desc.GetPathDescription().GetReplicationDescription().GetConfig(); + UNIT_ASSERT(config.HasStrongConsistency()); + UNIT_ASSERT_VALUES_EQUAL(config.GetStrongConsistency().GetCommitIntervalMilliSeconds(), 10000); + } + } + Y_UNIT_TEST(Alter) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().InitYdbDriver(true)); |