aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-11-25 22:57:58 +0300
committerGitHub <noreply@github.com>2024-11-25 19:57:58 +0000
commit76440978e610a982a5e1dec23514d1be54268e35 (patch)
treef2a6ce585aa87e778e299bcfef0aaeaa360a97b4
parenta328b3002af3e74977283814bdeeb064c3b83645 (diff)
downloadydb-76440978e610a982a5e1dec23514d1be54268e35.tar.gz
Consistency option (core part) (#11970)
-rw-r--r--ydb/core/protos/replication.proto14
-rw-r--r--ydb/core/tx/replication/controller/dst_creator.cpp93
-rw-r--r--ydb/core/tx/replication/controller/dst_creator.h27
-rw-r--r--ydb/core/tx/replication/controller/dst_creator_ut.cpp38
-rw-r--r--ydb/core/tx/replication/controller/stream_creator.cpp40
-rw-r--r--ydb/core/tx/replication/controller/stream_creator.h6
-rw-r--r--ydb/core/tx/replication/controller/stream_creator_ut.cpp75
-rw-r--r--ydb/core/tx/replication/controller/ut_stream_creator/ya.make20
-rw-r--r--ydb/core/tx/replication/controller/ya.make1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp4
-rw-r--r--ydb/core/tx/schemeshard/ut_replication/ut_replication.cpp68
11 files changed, 340 insertions, 46 deletions
diff --git a/ydb/core/protos/replication.proto b/ydb/core/protos/replication.proto
index 078f09d58d..64088cef07 100644
--- a/ydb/core/protos/replication.proto
+++ b/ydb/core/protos/replication.proto
@@ -69,7 +69,19 @@ message TReplicationConfig {
TTargetSpecific Specific = 5;
}
- optional bool InitialSync = 6;
+ reserved 6; // InitialSync
+
+ message TWeakConsistency {
+ }
+
+ message TStrongConsistency {
+ optional uint64 CommitIntervalMilliSeconds = 1;
+ }
+
+ oneof Consistency {
+ TWeakConsistency WeakConsistency = 7;
+ TStrongConsistency StrongConsistency = 8;
+ }
}
message TReplicationState {
diff --git a/ydb/core/tx/replication/controller/dst_creator.cpp b/ydb/core/tx/replication/controller/dst_creator.cpp
index 4f241702e0..2ae56f68c5 100644
--- a/ydb/core/tx/replication/controller/dst_creator.cpp
+++ b/ydb/core/tx/replication/controller/dst_creator.cpp
@@ -7,6 +7,7 @@
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/protos/console_config.pb.h>
+#include <ydb/core/protos/replication.pb.h>
#include <ydb/core/protos/schemeshard/operations.pb.h>
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
#include <ydb/core/tx/scheme_board/events.h>
@@ -226,10 +227,8 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
AllocateTxId();
}
- static void FillReplicationConfig(NKikimrSchemeOp::TTableReplicationConfig& replicationConfig) {
- // TODO: support other modes
- replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
- replicationConfig.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
+ void FillReplicationConfig(NKikimrSchemeOp::TTableReplicationConfig& replicationConfig) const {
+ NController::FillReplicationConfig(replicationConfig, Mode, Consistency);
}
void AllocateTxId() {
@@ -375,22 +374,7 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
return false;
}
- const auto& replicationConfig = got.GetReplicationConfig();
-
- switch (replicationConfig.GetMode()) {
- case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY:
- break;
- default:
- error = "Unsupported replication mode";
- return false;
- }
-
- switch (replicationConfig.GetConsistency()) {
- case NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK:
- break;
- default:
- error = TStringBuilder() << "Unsupported replication consistency"
- << ": " << static_cast<int>(replicationConfig.GetConsistency());
+ if (!CheckReplicationConfig(got.GetReplicationConfig(), Mode, Consistency, error)) {
return false;
}
@@ -623,7 +607,9 @@ public:
ui64 tid,
TReplication::ETargetKind kind,
const TString& srcPath,
- const TString& dstPath)
+ const TString& dstPath,
+ EReplicationMode mode,
+ EReplicaConsistency consistency)
: Parent(parent)
, SchemeShardId(schemeShardId)
, YdbProxy(proxy)
@@ -633,6 +619,8 @@ public:
, Kind(kind)
, SrcPath(srcPath)
, DstPath(dstPath)
+ , Mode(mode)
+ , Consistency(consistency)
, LogPrefix("DstCreator", ReplicationId, TargetId)
{
}
@@ -665,6 +653,8 @@ private:
const TReplication::ETargetKind Kind;
const TString SrcPath;
const TString DstPath;
+ const EReplicationMode Mode;
+ const EReplicaConsistency Consistency;
const TActorLogPrefix LogPrefix;
TPathId DomainKey;
@@ -680,17 +670,72 @@ private:
}; // TDstCreator
+static NKikimrSchemeOp::TTableReplicationConfig::EConsistency ConvertConsistency(EReplicaConsistency value) {
+ switch (value) {
+ case EReplicaConsistency::Weak:
+ return NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK;
+ case EReplicaConsistency::Strong:
+ return NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_STRONG;
+ }
+}
+
+static NKikimrSchemeOp::TTableReplicationConfig::EReplicationMode ConvertMode(EReplicationMode value) {
+ switch (value) {
+ case EReplicationMode::ReadOnly:
+ return NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY;
+ }
+}
+
+void FillReplicationConfig(
+ NKikimrSchemeOp::TTableReplicationConfig& out,
+ EReplicationMode mode,
+ EReplicaConsistency consistency
+) {
+ out.SetMode(ConvertMode(mode));
+ out.SetConsistency(ConvertConsistency(consistency));
+}
+
+bool CheckReplicationConfig(
+ const NKikimrSchemeOp::TTableReplicationConfig& in,
+ EReplicationMode mode,
+ EReplicaConsistency consistency,
+ TString& error
+) {
+ if (in.GetMode() != ConvertMode(mode)) {
+ error = TStringBuilder() << "Replication mode mismatch"
+ << ": expected: " << ConvertMode(mode)
+ << ", got: " << static_cast<int>(in.GetMode());
+ return false;
+ }
+
+ if (in.GetConsistency() != ConvertConsistency(consistency)) {
+ error = TStringBuilder() << "Replication consistency mismatch"
+ << ": expected: " << ConvertConsistency(consistency)
+ << ", got: " << static_cast<int>(in.GetConsistency());
+ return false;
+ }
+
+ return true;
+}
+
IActor* CreateDstCreator(TReplication* replication, ui64 targetId, const TActorContext& ctx) {
const auto* target = replication->FindTarget(targetId);
Y_ABORT_UNLESS(target);
+
+ const auto consistency = replication->GetConfig().HasStrongConsistency()
+ ? EReplicaConsistency::Strong
+ : EReplicaConsistency::Weak;
+
return CreateDstCreator(ctx.SelfID, replication->GetSchemeShardId(), replication->GetYdbProxy(), replication->GetPathId(),
- replication->GetId(), target->GetId(), target->GetKind(), target->GetSrcPath(), target->GetDstPath());
+ replication->GetId(), target->GetId(), target->GetKind(), target->GetSrcPath(), target->GetDstPath(),
+ EReplicationMode::ReadOnly, consistency);
}
IActor* CreateDstCreator(const TActorId& parent, ui64 schemeShardId, const TActorId& proxy, const TPathId& pathId,
- ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath)
+ ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath,
+ EReplicationMode mode, EReplicaConsistency consistency)
{
- return new TDstCreator(parent, schemeShardId, proxy, pathId, rid, tid, kind, srcPath, dstPath);
+ return new TDstCreator(parent, schemeShardId, proxy, pathId, rid, tid, kind, srcPath, dstPath, mode, consistency);
}
}
diff --git a/ydb/core/tx/replication/controller/dst_creator.h b/ydb/core/tx/replication/controller/dst_creator.h
index 4027cf089f..5f6bdba2e8 100644
--- a/ydb/core/tx/replication/controller/dst_creator.h
+++ b/ydb/core/tx/replication/controller/dst_creator.h
@@ -2,10 +2,35 @@
#include "replication.h"
+namespace NKikimrSchemeOp {
+ class TTableReplicationConfig;
+}
+
namespace NKikimr::NReplication::NController {
+enum class EReplicationMode {
+ ReadOnly,
+};
+
+enum class EReplicaConsistency {
+ Weak,
+ Strong,
+};
+
+void FillReplicationConfig(
+ NKikimrSchemeOp::TTableReplicationConfig& out,
+ EReplicationMode mode,
+ EReplicaConsistency consistency);
+bool CheckReplicationConfig(
+ const NKikimrSchemeOp::TTableReplicationConfig& in,
+ EReplicationMode mode,
+ EReplicaConsistency consistency,
+ TString& error);
+
IActor* CreateDstCreator(TReplication* replication, ui64 targetId, const TActorContext& ctx);
IActor* CreateDstCreator(const TActorId& parent, ui64 schemeShardId, const TActorId& proxy, const TPathId& pathId,
- ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath);
+ ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath,
+ EReplicationMode mode = EReplicationMode::ReadOnly,
+ EReplicaConsistency consistency = EReplicaConsistency::Weak);
}
diff --git a/ydb/core/tx/replication/controller/dst_creator_ut.cpp b/ydb/core/tx/replication/controller/dst_creator_ut.cpp
index 805cd2b1e3..12e94919f6 100644
--- a/ydb/core/tx/replication/controller/dst_creator_ut.cpp
+++ b/ydb/core/tx/replication/controller/dst_creator_ut.cpp
@@ -13,7 +13,12 @@ namespace NKikimr::NReplication::NController {
Y_UNIT_TEST_SUITE(DstCreator) {
using namespace NTestHelpers;
- void CheckTableReplica(const TTestTableDescription& tableDesc, const NKikimrSchemeOp::TTableDescription& replicatedDesc) {
+ void CheckTableReplica(
+ const TTestTableDescription& tableDesc,
+ const NKikimrSchemeOp::TTableDescription& replicatedDesc,
+ EReplicationMode mode = EReplicationMode::ReadOnly,
+ EReplicaConsistency consistency = EReplicaConsistency::Weak
+ ) {
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.KeyColumnNamesSize(), tableDesc.KeyColumns.size());
for (ui32 i = 0; i < replicatedDesc.KeyColumnNamesSize(); ++i) {
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.GetKeyColumnNames(i), tableDesc.KeyColumns[i]);
@@ -28,12 +33,15 @@ Y_UNIT_TEST_SUITE(DstCreator) {
UNIT_ASSERT(FindIfPtr(tableDesc.Columns, pred));
}
- const auto& replCfg = replicatedDesc.GetReplicationConfig();
- UNIT_ASSERT_VALUES_EQUAL(replCfg.GetMode(), NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
- UNIT_ASSERT_VALUES_EQUAL(replCfg.GetConsistency(), NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
+ TString error;
+ UNIT_ASSERT_C(CheckReplicationConfig(replicatedDesc.GetReplicationConfig(), mode, consistency, error), error);
}
- void Basic(const TString& replicatedPath) {
+ void Basic(
+ const TString& replicatedPath,
+ EReplicationMode mode = EReplicationMode::ReadOnly,
+ EReplicaConsistency consistency = EReplicaConsistency::Weak
+ ) {
TEnv env;
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);
@@ -50,7 +58,7 @@ Y_UNIT_TEST_SUITE(DstCreator) {
env.CreateTable("/Root", *MakeTableDescription(tableDesc));
env.GetRuntime().Register(CreateDstCreator(
env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"),
- 1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", replicatedPath
+ 1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", replicatedPath, mode, consistency
));
auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateDstResult>(env.GetSender());
@@ -59,7 +67,7 @@ Y_UNIT_TEST_SUITE(DstCreator) {
auto desc = env.GetDescription(replicatedPath);
const auto& replicatedDesc = desc.GetPathDescription().GetTable();
- CheckTableReplica(tableDesc, replicatedDesc);
+ CheckTableReplica(tableDesc, replicatedDesc, mode, consistency);
}
Y_UNIT_TEST(Basic) {
@@ -70,6 +78,10 @@ Y_UNIT_TEST_SUITE(DstCreator) {
Basic("/Root/Dir/Replicated");
}
+ Y_UNIT_TEST(StrongConsistency) {
+ Basic("/Root/Replicated", EReplicationMode::ReadOnly, EReplicaConsistency::Strong);
+ }
+
void WithIndex(const TString& replicatedPath, NKikimrSchemeOp::EIndexType indexType) {
TEnv env(TFeatureFlags().SetEnableChangefeedsOnIndexTables(true));
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);
@@ -370,7 +382,7 @@ Y_UNIT_TEST_SUITE(DstCreator) {
});
}
- Y_UNIT_TEST(UnsupportedReplicationMode) {
+ Y_UNIT_TEST(ReplicationModeMismatch) {
auto changeMode = [](const TTestTableDescription& desc) {
auto copy = desc;
copy.ReplicationConfig->Mode = TTestTableDescription::TReplicationConfig::MODE_NONE;
@@ -378,7 +390,7 @@ Y_UNIT_TEST_SUITE(DstCreator) {
return copy;
};
- ExistingDst(NKikimrScheme::StatusSchemeError, "Unsupported replication mode", changeMode, TTestTableDescription{
+ ExistingDst(NKikimrScheme::StatusSchemeError, "Replication mode mismatch", changeMode, TTestTableDescription{
.Name = "Table",
.KeyColumns = {"key"},
.Columns = {
@@ -388,20 +400,24 @@ Y_UNIT_TEST_SUITE(DstCreator) {
});
}
- Y_UNIT_TEST(UnsupportedReplicationConsistency) {
+ Y_UNIT_TEST(ReplicationConsistencyMismatch) {
auto changeConsistency = [](const TTestTableDescription& desc) {
auto copy = desc;
copy.ReplicationConfig->Consistency = TTestTableDescription::TReplicationConfig::CONSISTENCY_STRONG;
return copy;
};
- ExistingDst(NKikimrScheme::StatusSchemeError, "Unsupported replication consistency", changeConsistency, TTestTableDescription{
+ ExistingDst(NKikimrScheme::StatusSchemeError, "Replication consistency mismatch", changeConsistency, TTestTableDescription{
.Name = "Table",
.KeyColumns = {"key"},
.Columns = {
{.Name = "key", .Type = "Uint32"},
{.Name = "value", .Type = "Utf8"},
},
+ .ReplicationConfig = TTestTableDescription::TReplicationConfig{
+ .Mode = TTestTableDescription::TReplicationConfig::MODE_READ_ONLY,
+ .Consistency = TTestTableDescription::TReplicationConfig::CONSISTENCY_WEAK,
+ },
});
}
}
diff --git a/ydb/core/tx/replication/controller/stream_creator.cpp b/ydb/core/tx/replication/controller/stream_creator.cpp
index 7200b32db8..ce6fb2bcd2 100644
--- a/ydb/core/tx/replication/controller/stream_creator.cpp
+++ b/ydb/core/tx/replication/controller/stream_creator.cpp
@@ -19,13 +19,25 @@ namespace NKikimr::NReplication::NController {
class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
static NYdb::NTable::TChangefeedDescription MakeChangefeed(
- const TString& name, const TDuration& retentionPeriod, const NJson::TJsonMap& attrs)
+ const TString& name,
+ const TDuration& retentionPeriod,
+ const std::optional<TDuration>& resolvedTimestamps,
+ const NJson::TJsonMap& attrs)
{
using namespace NYdb::NTable;
- return TChangefeedDescription(name, EChangefeedMode::Updates, EChangefeedFormat::Json)
+
+ auto desc = TChangefeedDescription(name, EChangefeedMode::Updates, EChangefeedFormat::Json)
.WithRetentionPeriod(retentionPeriod)
.WithInitialScan()
.AddAttribute("__async_replication", NJson::WriteJson(attrs, false));
+
+ if (resolvedTimestamps) {
+ desc
+ .WithVirtualTimestamps()
+ .WithResolvedTimestamps(*resolvedTimestamps);
+ }
+
+ return desc;
}
void RequestPermission() {
@@ -161,17 +173,19 @@ public:
const TString& srcPath,
const TString& dstPath,
const TString& streamName,
- const TDuration& streamRetentionPeriod)
+ const TDuration& retentionPeriod,
+ const std::optional<TDuration>& resolvedTimestamps,
+ bool supportsTopicAutopartitioning)
: Parent(parent)
, YdbProxy(proxy)
, ReplicationId(rid)
, TargetId(tid)
, Kind(kind)
, SrcPath(srcPath)
- , Changefeed(MakeChangefeed(streamName, streamRetentionPeriod, NJson::TJsonMap{
+ , Changefeed(MakeChangefeed(streamName, retentionPeriod, resolvedTimestamps, NJson::TJsonMap{
{"path", dstPath},
{"id", ToString(rid)},
- {"supports_topic_autopartitioning", AppData()->FeatureFlags.GetEnableTopicAutopartitioningForReplication()},
+ {"supports_topic_autopartitioning", supportsTopicAutopartitioning},
}))
, LogPrefix("StreamCreator", ReplicationId, TargetId)
{
@@ -202,17 +216,27 @@ private:
IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TActorContext& ctx) {
const auto* target = replication->FindTarget(targetId);
Y_ABORT_UNLESS(target);
+
+ const auto& config = replication->GetConfig();
+ const auto resolvedTimestamps = config.HasStrongConsistency()
+ ? std::make_optional(TDuration::MilliSeconds(config.GetStrongConsistency().GetCommitIntervalMilliSeconds()))
+ : std::nullopt;
+
return CreateStreamCreator(ctx.SelfID, replication->GetYdbProxy(),
replication->GetId(), target->GetId(), target->GetKind(),
target->GetSrcPath(), target->GetDstPath(), target->GetStreamName(),
- TDuration::Seconds(AppData()->ReplicationConfig.GetRetentionPeriodSeconds()));
+ TDuration::Seconds(AppData()->ReplicationConfig.GetRetentionPeriodSeconds()), resolvedTimestamps,
+ AppData()->FeatureFlags.GetEnableTopicAutopartitioningForReplication());
}
IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath,
- const TString& streamName, const TDuration& streamRetentionPeriod)
+ const TString& streamName, const TDuration& retentionPeriod,
+ const std::optional<TDuration>& resolvedTimestamps,
+ bool supportsTopicAutopartitioning)
{
- return new TStreamCreator(parent, proxy, rid, tid, kind, srcPath, dstPath, streamName, streamRetentionPeriod);
+ return new TStreamCreator(parent, proxy, rid, tid, kind, srcPath, dstPath,
+ streamName, retentionPeriod, resolvedTimestamps, supportsTopicAutopartitioning);
}
}
diff --git a/ydb/core/tx/replication/controller/stream_creator.h b/ydb/core/tx/replication/controller/stream_creator.h
index 1eca930efb..c9ec2cc39a 100644
--- a/ydb/core/tx/replication/controller/stream_creator.h
+++ b/ydb/core/tx/replication/controller/stream_creator.h
@@ -2,11 +2,15 @@
#include "replication.h"
+#include <optional>
+
namespace NKikimr::NReplication::NController {
IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TActorContext& ctx);
IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath,
- const TString& streamName, const TDuration& streamRetentionPeriod);
+ const TString& streamName, const TDuration& streamRetentionPeriod,
+ const std::optional<TDuration>& resolvedTimestamps = std::nullopt,
+ bool supportsTopicAutopartitioning = false);
}
diff --git a/ydb/core/tx/replication/controller/stream_creator_ut.cpp b/ydb/core/tx/replication/controller/stream_creator_ut.cpp
new file mode 100644
index 0000000000..136a43364b
--- /dev/null
+++ b/ydb/core/tx/replication/controller/stream_creator_ut.cpp
@@ -0,0 +1,75 @@
+#include "dst_creator.h"
+#include "private_events.h"
+#include "stream_creator.h"
+
+#include <ydb/core/tx/replication/ut_helpers/test_env.h>
+#include <ydb/core/tx/replication/ut_helpers/test_table.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NReplication::NController {
+
+Y_UNIT_TEST_SUITE(StreamCreator) {
+ using namespace NTestHelpers;
+
+ void Basic(const std::optional<TDuration>& resolvedTimestamps = {}) {
+ TEnv env;
+ env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);
+
+ const auto tableDesc = TTestTableDescription{
+ .Name = "Table",
+ .KeyColumns = {"key"},
+ .Columns = {
+ {.Name = "key", .Type = "Uint32"},
+ {.Name = "value", .Type = "Utf8"},
+ },
+ .ReplicationConfig = Nothing(),
+ };
+
+ env.CreateTable("/Root", *MakeTableDescription(tableDesc));
+
+ env.GetRuntime().Register(CreateDstCreator(
+ env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"),
+ 1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", "/Root/Replica"
+ ));
+ {
+ auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateDstResult>(env.GetSender());
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess);
+ }
+
+ env.GetRuntime().Register(CreateStreamCreator(
+ env.GetSender(), env.GetYdbProxy(), 1 /* rid */, 1 /* tid */,
+ TReplication::ETargetKind::Table, "/Root/Table", "/Root/Replica",
+ "Stream", TDuration::Hours(1), resolvedTimestamps
+ ));
+ {
+ auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvRequestCreateStream>(env.GetSender());
+ env.GetRuntime().Send(ev->Sender, env.GetSender(), new TEvPrivate::TEvAllowCreateStream());
+ }
+ {
+ auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateStreamResult>(env.GetSender());
+ UNIT_ASSERT(ev->Get()->IsSuccess());
+ }
+
+ auto desc = env.GetDescription("/Root/Table");
+
+ const auto& streams = desc.GetPathDescription().GetTable().GetCdcStreams();
+ UNIT_ASSERT_VALUES_EQUAL(streams.size(), 1);
+
+ const auto& stream = streams.at(0);
+ UNIT_ASSERT_VALUES_EQUAL(stream.GetMode(), NKikimrSchemeOp::ECdcStreamModeUpdate);
+ UNIT_ASSERT_VALUES_EQUAL(stream.GetFormat(), NKikimrSchemeOp::ECdcStreamFormatJson);
+ UNIT_ASSERT_VALUES_EQUAL(stream.GetVirtualTimestamps(), resolvedTimestamps.has_value());
+ UNIT_ASSERT_VALUES_EQUAL(stream.GetResolvedTimestampsIntervalMs(), resolvedTimestamps.value_or(TDuration::Zero()).MilliSeconds());
+ }
+
+ Y_UNIT_TEST(Basic) {
+ Basic();
+ }
+
+ Y_UNIT_TEST(WithResolvedTimestamps) {
+ Basic(TDuration::Seconds(10));
+ }
+}
+
+}
diff --git a/ydb/core/tx/replication/controller/ut_stream_creator/ya.make b/ydb/core/tx/replication/controller/ut_stream_creator/ya.make
new file mode 100644
index 0000000000..fb4abc7109
--- /dev/null
+++ b/ydb/core/tx/replication/controller/ut_stream_creator/ya.make
@@ -0,0 +1,20 @@
+UNITTEST_FOR(ydb/core/tx/replication/controller)
+
+FORK_SUBTESTS()
+
+SIZE(MEDIUM)
+
+TIMEOUT(600)
+
+PEERDIR(
+ ydb/core/tx/replication/ut_helpers
+ library/cpp/testing/unittest
+)
+
+SRCS(
+ stream_creator_ut.cpp
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/tx/replication/controller/ya.make b/ydb/core/tx/replication/controller/ya.make
index 5acb9bbacd..97a07bd54b 100644
--- a/ydb/core/tx/replication/controller/ya.make
+++ b/ydb/core/tx/replication/controller/ya.make
@@ -62,5 +62,6 @@ END()
RECURSE_FOR_TESTS(
ut_dst_creator
+ ut_stream_creator
ut_target_discoverer
)
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp
index 75b03b39be..b8a2a038ea 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp
@@ -339,6 +339,10 @@ public:
desc.MutableConfig()->MutableSrcConnectionParams()->MutableOAuthToken()->SetToken(BUILTIN_ACL_ROOT);
}
+ if (desc.GetConfig().GetConsistencyCase() == NKikimrReplication::TReplicationConfig::CONSISTENCY_NOT_SET) {
+ desc.MutableConfig()->MutableWeakConsistency();
+ }
+
desc.MutableState()->MutableStandBy();
auto replication = TReplicationInfo::Create(std::move(desc));
context.SS->Replications[path->PathId] = replication;
diff --git a/ydb/core/tx/schemeshard/ut_replication/ut_replication.cpp b/ydb/core/tx/schemeshard/ut_replication/ut_replication.cpp
index f29efff076..be3922e8a0 100644
--- a/ydb/core/tx/schemeshard/ut_replication/ut_replication.cpp
+++ b/ydb/core/tx/schemeshard/ut_replication/ut_replication.cpp
@@ -168,6 +168,74 @@ Y_UNIT_TEST_SUITE(TReplicationTests) {
UNIT_ASSERT_VALUES_UNEQUAL("root@builtin", params.GetOAuthToken().GetToken());
}
+ Y_UNIT_TEST(ConsistencyMode) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions().InitYdbDriver(true));
+ ui64 txId = 100;
+
+ SetupLogging(runtime);
+
+ TestCreateReplication(runtime, ++txId, "/MyRoot", R"(
+ Name: "Replication1"
+ Config {
+ Specific {
+ Targets {
+ SrcPath: "/MyRoot1/Table"
+ DstPath: "/MyRoot2/Table"
+ }
+ }
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+ {
+ const auto desc = DescribePath(runtime, "/MyRoot/Replication1");
+ const auto& config = desc.GetPathDescription().GetReplicationDescription().GetConfig();
+ UNIT_ASSERT(config.HasWeakConsistency());
+ }
+
+ TestCreateReplication(runtime, ++txId, "/MyRoot", R"(
+ Name: "Replication2"
+ Config {
+ Specific {
+ Targets {
+ SrcPath: "/MyRoot1/Table"
+ DstPath: "/MyRoot2/Table"
+ }
+ }
+ WeakConsistency {
+ }
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+ {
+ const auto desc = DescribePath(runtime, "/MyRoot/Replication2");
+ const auto& config = desc.GetPathDescription().GetReplicationDescription().GetConfig();
+ UNIT_ASSERT(config.HasWeakConsistency());
+ }
+
+ TestCreateReplication(runtime, ++txId, "/MyRoot", R"(
+ Name: "Replication3"
+ Config {
+ Specific {
+ Targets {
+ SrcPath: "/MyRoot1/Table"
+ DstPath: "/MyRoot2/Table"
+ }
+ }
+ StrongConsistency {
+ CommitIntervalMilliSeconds: 10000
+ }
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+ {
+ const auto desc = DescribePath(runtime, "/MyRoot/Replication3");
+ const auto& config = desc.GetPathDescription().GetReplicationDescription().GetConfig();
+ UNIT_ASSERT(config.HasStrongConsistency());
+ UNIT_ASSERT_VALUES_EQUAL(config.GetStrongConsistency().GetCommitIntervalMilliSeconds(), 10000);
+ }
+ }
+
Y_UNIT_TEST(Alter) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().InitYdbDriver(true));