summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/kqp/host/kqp_gateway_proxy.cpp6
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp59
-rw-r--r--ydb/tests/functional/transfer/replication.cpp65
-rw-r--r--ydb/tests/functional/transfer/transfer.cpp (renamed from ydb/tests/functional/transfer/main.cpp)518
-rw-r--r--ydb/tests/functional/transfer/utils.h545
-rw-r--r--ydb/tests/functional/transfer/ya.make7
6 files changed, 738 insertions, 462 deletions
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
index 90d42a56391..11cf4998af4 100644
--- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp
+++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
@@ -2513,6 +2513,12 @@ public:
auto& state = *op.MutableState();
state.MutableDone()->SetFailoverMode(
static_cast<NKikimrReplication::TReplicationState::TDone::EFailoverMode>(done->FailoverMode));
+ } else if (const auto& paused = settings.Settings.StatePaused) {
+ auto& state = *op.MutableState();
+ state.MutablePaused();
+ } else if (const auto& standBy = settings.Settings.StateStandBy) {
+ auto& state = *op.MutableState();
+ state.MutableStandBy();
}
if (settings.Settings.ConnectionString || settings.Settings.Endpoint || settings.Settings.Database ||
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index 6ee68fa8086..7cf744d0fa2 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -7642,6 +7642,35 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
--!syntax_v1
ALTER ASYNC REPLICATION `/Root/replication`
SET (
+ STATE = "Paused"
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ // alter state and config
+ {
+ auto query = R"(
+ --!syntax_v1
+ ALTER ASYNC REPLICATION `/Root/replication`
+ SET (
+ STATE = "StandBy"
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+
+ // alter state and config
+ {
+ auto query = R"(
+ --!syntax_v1
+ ALTER ASYNC REPLICATION `/Root/replication`
+ SET (
STATE = "DONE",
FAILOVER_MODE = "FORCE",
CONNECTION_STRING = "grpc://localhost:2135/?database=/Root"
@@ -7676,7 +7705,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
break;
}
- UNIT_ASSERT_C(i, "Alter timeout");
+ //UNIT_ASSERT_C(i, "Alter timeout");
Sleep(TDuration::Seconds(1));
}
}
@@ -9014,6 +9043,34 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
--!syntax_v1
ALTER TRANSFER `/Root/transfer`
SET (
+ STATE = "Paused"
+ );
+ )";
+
+ const auto result = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ // alter state and config
+ {
+ auto query = R"(
+ --!syntax_v1
+ ALTER TRANSFER `/Root/transfer`
+ SET (
+ STATE = "StandBy"
+ );
+ )";
+
+ const auto result = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ // alter state and config
+ {
+ auto query = R"(
+ --!syntax_v1
+ ALTER TRANSFER `/Root/transfer`
+ SET (
STATE = "DONE",
CONNECTION_STRING = "grpc://localhost:2135/?database=/Root"
);
diff --git a/ydb/tests/functional/transfer/replication.cpp b/ydb/tests/functional/transfer/replication.cpp
new file mode 100644
index 00000000000..36792f9df43
--- /dev/null
+++ b/ydb/tests/functional/transfer/replication.cpp
@@ -0,0 +1,65 @@
+#include "utils.h"
+
+using namespace NReplicationTest;
+
+Y_UNIT_TEST_SUITE(Replication)
+{
+ Y_UNIT_TEST(PauseAndResumeReplication)
+ {
+ MainTestCase testCase;
+ testCase.CreateSourceTable(R"(
+ CREATE TABLE `%s` (
+ Key Uint64 NOT NULL,
+ Message Utf8,
+ PRIMARY KEY (Key)
+ );
+ )");
+
+ testCase.CreateReplication();
+
+ testCase.ExecuteSourceTableQuery("INSERT INTO `%s` (`Key`, `Message`) VALUES (1, 'Message-1');");
+
+ testCase.CheckResult({{
+ _C("Message", TString("Message-1"))
+ }});
+
+ testCase.CheckReplicationState(TReplicationDescription::EState::Running);
+
+ Cerr << "State: Paused" << Endl << Flush;
+
+ testCase.PauseReplication();
+
+ Sleep(TDuration::Seconds(1));
+ testCase.CheckReplicationState(TReplicationDescription::EState::Paused);
+
+ testCase.ExecuteSourceTableQuery("INSERT INTO `%s` (`Key`, `Message`) VALUES (2, 'Message-2');");
+
+ // Replication is paused. New messages aren`t added to the table.
+ Sleep(TDuration::Seconds(3));
+ testCase.CheckResult({{
+ _C("Message", TString("Message-1"))
+ }});
+
+ Cerr << "State: StandBy" << Endl << Flush;
+
+ testCase.ResumeReplication();
+
+ // Replication is resumed. New messages are added to the table.
+ testCase.CheckReplicationState(TReplicationDescription::EState::Running);
+ testCase.CheckResult({{
+ _C("Message", TString("Message-1"))
+ }, {
+ _C("Message", TString("Message-2")),
+ }});
+
+ // More cycles for pause/resume
+ testCase.PauseReplication();
+ testCase.CheckReplicationState(TReplicationDescription::EState::Paused);
+
+ testCase.ResumeReplication();
+ testCase.CheckReplicationState(TReplicationDescription::EState::Running);
+
+ testCase.DropReplicatopn();
+ }
+}
+
diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/transfer.cpp
index abf1dae7d70..eb090031dfb 100644
--- a/ydb/tests/functional/transfer/main.cpp
+++ b/ydb/tests/functional/transfer/transfer.cpp
@@ -1,449 +1,6 @@
-#include <util/system/env.h>
-#include <library/cpp/testing/unittest/registar.h>
+#include "utils.h"
-#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h>
-#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/query/client.h>
-#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
-#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h>
-#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/draft/ydb_scripting.h>
-#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/draft/ydb_replication.h>
-
-#include <library/cpp/threading/local_executor/local_executor.h>
-
-using namespace NYdb;
-using namespace NYdb::NQuery;
-using namespace NYdb::NTopic;
-using namespace NYdb::NReplication;
-
-namespace {
-
-volatile size_t TestCaseCounter = RandomNumber<size_t>();
-
-struct IChecker {
- virtual void Assert(const TString& msg, const ::Ydb::Value& value) = 0;
- virtual ~IChecker() = default;
-};
-
-template<typename T>
-struct Checker : public IChecker {
- Checker(T&& expected)
- : Expected(std::move(expected))
- {}
-
- void Assert(const TString& msg, const ::Ydb::Value& value) override {
- UNIT_ASSERT_VALUES_EQUAL_C(Get(value), Expected, msg);
- }
-
- T Get(const ::Ydb::Value& value);
-
- T Expected;
-};
-
-template<>
-bool Checker<bool>::Get(const ::Ydb::Value& value) {
- return value.bool_value();
-}
-
-template<>
-ui32 Checker<ui32>::Get(const ::Ydb::Value& value) {
- return value.uint32_value();
-}
-
-template<>
-ui64 Checker<ui64>::Get(const ::Ydb::Value& value) {
- return value.uint64_value();
-}
-
-template<>
-double Checker<double>::Get(const ::Ydb::Value& value) {
- return value.double_value();
-}
-
-template<>
-TString Checker<TString>::Get(const ::Ydb::Value& value) {
- return value.text_value();
-}
-
-template<>
-TInstant Checker<TInstant>::Get(const ::Ydb::Value& value) {
- return TInstant::Days(value.uint32_value());
-}
-
-template<typename T>
-std::pair<TString, std::shared_ptr<IChecker>> _C(TString&& name, T&& expected) {
- return {
- std::move(name),
- std::make_shared<Checker<T>>(std::move(expected))
- };
-}
-
-struct TMessage {
- TString Message;
- std::optional<ui32> Partition = std::nullopt;
- std::optional<TString> ProducerId = std::nullopt;
- std::optional<TString> MessageGroupId = std::nullopt;
- std::optional<ui64> SeqNo = std::nullopt;
-};
-
-TMessage _withSeqNo(ui64 seqNo) {
- return {
- .Message = TStringBuilder() << "Message-" << seqNo,
- .Partition = 0,
- .ProducerId = std::nullopt,
- .MessageGroupId = std::nullopt,
- .SeqNo = seqNo
- };
-}
-
-TMessage _withProducerId(const TString& producerId) {
- return {
- .Message = TStringBuilder() << "Message-" << producerId,
- .Partition = 0,
- .ProducerId = producerId,
- .MessageGroupId = std::nullopt,
- .SeqNo = std::nullopt
- };
-}
-
-TMessage _withMessageGroupId(const TString& messageGroupId) {
- return {
- .Message = TStringBuilder() << "Message-" << messageGroupId,
- .Partition = 0,
- .ProducerId = messageGroupId,
- .MessageGroupId = messageGroupId,
- .SeqNo = std::nullopt
- };
-}
-
-using TExpectations = TVector<TVector<std::pair<TString, std::shared_ptr<IChecker>>>>;
-
-struct TConfig {
- const TString TableDDL;
- const TString Lambda;
- const TVector<TMessage> Messages;
- const TExpectations Expectations;
- const TVector<TString> AlterLambdas;
-};
-
-struct MainTestCase {
-
- MainTestCase()
- : Id(TestCaseCounter++)
- , ConnectionString(GetEnv("YDB_ENDPOINT") + "/?database=" + GetEnv("YDB_DATABASE"))
- , TopicName(TStringBuilder() << "Topic_" << Id)
- , TableName(TStringBuilder() << "Table_" << Id)
- , TransferName(TStringBuilder() << "Transfer_" << Id)
- , Driver(TDriverConfig(ConnectionString))
- , TableClient(Driver)
- , Session(TableClient.GetSession().GetValueSync().GetSession())
- , TopicClient(Driver)
- {
- }
-
- void ExecuteDDL(const TString& ddl) {
- auto res = Session.ExecuteQuery(ddl, TTxControl::NoTx()).GetValueSync();
- UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
- }
-
- void CreateTable(const TString& tableDDL) {
- ExecuteDDL(Sprintf(tableDDL.data(), TableName.data()));
- }
-
- void CreateTopic(size_t partitionCount = 10) {
- ExecuteDDL(Sprintf(R"(
- CREATE TOPIC `%s`
- WITH (
- min_active_partitions = %d
- );
- )", TopicName.data(), partitionCount));
- }
-
- void CreateConsumer(const TString& consumerName) {
- ExecuteDDL(Sprintf(R"(
- ALTER TOPIC `%s`
- ADD CONSUMER `%s`;
- )", TopicName.data(), consumerName.data()));
- }
-
- struct CreateTransferSettings {
- std::optional<TString> ConsumerName = std::nullopt;
- std::optional<TDuration> FlushInterval;
- std::optional<ui64> BatchSizeBytes;
-
- CreateTransferSettings()
- : ConsumerName(std::nullopt)
- , FlushInterval(TDuration::Seconds(1))
- , BatchSizeBytes(8_MB) {}
-
- static CreateTransferSettings WithConsumerName(const TString& consumerName) {
- CreateTransferSettings result;
- result.ConsumerName = consumerName;
- return result;
- }
-
- static CreateTransferSettings WithBatching(const TDuration& flushInterval, const ui64 batchSize) {
- CreateTransferSettings result;
- result.FlushInterval = flushInterval;
- result.BatchSizeBytes = batchSize;
- return result;
- }
- };
-
- void CreateTransfer(const TString& lambda, const CreateTransferSettings& settings = CreateTransferSettings()) {
- TStringBuilder sb;
- if (settings.ConsumerName) {
- sb << ", CONSUMER = '" << *settings.ConsumerName << "'" << Endl;
- }
- if (settings.FlushInterval) {
- sb << ", FLUSH_INTERVAL = Interval('PT" << settings.FlushInterval->Seconds() << "S')" << Endl;
- }
- if (settings.BatchSizeBytes) {
- sb << ", BATCH_SIZE_BYTES = " << *settings.BatchSizeBytes << Endl;
- }
-
- auto ddl = Sprintf(R"(
- %s;
-
- CREATE TRANSFER `%s`
- FROM `%s` TO `%s` USING $l
- WITH (
- CONNECTION_STRING = 'grpc://%s'
- %s
- );
- )", lambda.data(), TransferName.data(), TopicName.data(), TableName.data(), ConnectionString.data(), sb.data());
-
- ExecuteDDL(ddl);
- }
-
- struct AlterTransferSettings {
- std::optional<TString> TransformLambda;
- std::optional<TDuration> FlushInterval;
- std::optional<ui64> BatchSizeBytes;
-
- AlterTransferSettings()
- : FlushInterval(std::nullopt)
- , BatchSizeBytes(std::nullopt) {}
-
- static AlterTransferSettings WithBatching(const TDuration& flushInterval, const ui64 batchSize) {
- AlterTransferSettings result;
- result.FlushInterval = flushInterval;
- result.BatchSizeBytes = batchSize;
- return result;
- }
-
- static AlterTransferSettings WithTransformLambda(const TString& lambda) {
- AlterTransferSettings result;
- result.TransformLambda = lambda;
- return result;
- }
- };
-
- void AlterTransfer(const TString& lambda) {
- AlterTransfer(AlterTransferSettings::WithTransformLambda(lambda));
- }
-
- void AlterTransfer(const AlterTransferSettings& settings, bool success = true) {
- TString lambda = settings.TransformLambda ? *settings.TransformLambda : "";
- TString setLambda = settings.TransformLambda ? "SET USING $l" : "";
-
- TStringBuilder sb;
- if (settings.FlushInterval) {
- sb << "FLUSH_INTERVAL = Interval('PT" << settings.FlushInterval->Seconds() << "S')" << Endl;
- }
- if (settings.BatchSizeBytes) {
- sb << ", BATCH_SIZE_BYTES = " << *settings.BatchSizeBytes << Endl;
- }
-
- TString setOptions;
- if (!sb.empty()) {
- setOptions = TStringBuilder() << "SET (" << sb << " )";
- }
-
- auto res = Session.ExecuteQuery(Sprintf(R"(
- %s;
-
- ALTER TRANSFER `%s`
- %s
- %s;
- )", lambda.data(), TransferName.data(), setLambda.data(), setOptions.data()), TTxControl::NoTx()).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(success, res.IsSuccess(), res.GetIssues().ToString());
- }
-
- void DropTransfer() {
- auto res = Session.ExecuteQuery(Sprintf(R"(
- DROP TRANSFER `%s`;
- )", TransferName.data()), TTxControl::NoTx()).GetValueSync();
- UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
- }
-
- void PauseTransfer() {
- ExecuteDDL(Sprintf(R"(
- ALTER TRANSFER `%s`
- SET (
- STATE = "Paused"
- );
- )", TransferName.data()));
- }
-
- void ResumeTransfer() {
- ExecuteDDL(Sprintf(R"(
- ALTER TRANSFER `%s`
- SET (
- STATE = "StandBy"
- );
- )", TransferName.data()));
- }
-
- auto DescribeTransfer() {
- TReplicationClient client(Driver);
-
- TDescribeReplicationSettings settings;
- settings.IncludeStats(true);
-
- return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings).ExtractValueSync();
- }
-
- auto DescribeTopic() {
- TDescribeTopicSettings settings;
- settings.IncludeLocation(true);
- settings.IncludeStats(true);
-
- return TopicClient.DescribeTopic(TopicName, settings).ExtractValueSync();
- }
-
- void Write(const TMessage& message) {
- TWriteSessionSettings writeSettings;
- writeSettings.Path(TopicName);
- writeSettings.DeduplicationEnabled(message.SeqNo);
- if (message.Partition) {
- writeSettings.PartitionId(message.Partition);
- }
- if (message.ProducerId) {
- writeSettings.ProducerId(*message.ProducerId);
- }
- if (message.MessageGroupId) {
- writeSettings.MessageGroupId(*message.MessageGroupId);
- }
- auto writeSession = TopicClient.CreateSimpleBlockingWriteSession(writeSettings);
-
- UNIT_ASSERT(writeSession->Write(message.Message, message.SeqNo));
- writeSession->Close(TDuration::Seconds(1));
- }
-
- std::pair<ui64, Ydb::ResultSet> DoRead(const TExpectations& expectations) {
- auto& e = expectations.front();
-
- TStringBuilder columns;
- for (size_t i = 0; i < e.size(); ++i) {
- if (i) {
- columns << ", ";
- }
- columns << "`" << e[i].first << "`";
- }
-
-
- auto query = Sprintf("SELECT %s FROM `%s` ORDER BY %s", columns.data(), TableName.data(), columns.data());
- Cerr << ">>>>> Query: " << query << Endl << Flush;
- auto res = Session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
- UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
-
- const auto proto = NYdb::TProtoAccessor::GetProto(res.GetResultSet(0));
- return {proto.rowsSize(), proto};
- }
-
- void CheckResult(const TExpectations& expectations) {
- for (size_t attempt = 20; attempt--; ) {
- auto res = DoRead(expectations);
- Cerr << "Attempt=" << attempt << " count=" << res.first << Endl << Flush;
- if (res.first == expectations.size()) {
- const Ydb::ResultSet& proto = res.second;
- for (size_t i = 0; i < expectations.size(); ++i) {
- auto& row = proto.rows(i);
- auto& rowExpectations = expectations[i];
- for (size_t i = 0; i < rowExpectations.size(); ++i) {
- auto& c = rowExpectations[i];
- TString msg = TStringBuilder() << "Row " << i << " column '" << c.first << "': ";
- c.second->Assert(msg, row.items(i));
- }
- }
-
- break;
- }
-
- UNIT_ASSERT_C(attempt, "Unable to wait transfer result");
- Sleep(TDuration::Seconds(1));
- }
- }
-
- TReplicationDescription CheckTransferState(TReplicationDescription::EState expected) {
- for (size_t i = 20; i--;) {
- auto result = DescribeTransfer().GetReplicationDescription();
- if (expected == result.GetState()) {
- return result;
- }
-
- UNIT_ASSERT_C(i, "Unable to wait transfer state. Expected: " << expected << ", actual: " << result.GetState());
- Sleep(TDuration::Seconds(1));
- }
-
- Y_UNREACHABLE();
- }
-
- void CheckTransferStateError(const TString& expectedMessage) {
- auto result = CheckTransferState(TReplicationDescription::EState::Error);
- Cerr << ">>>>> ACTUAL: " << result.GetErrorState().GetIssues().ToOneLineString() << Endl << Flush;
- Cerr << ">>>>> EXPECTED: " << expectedMessage << Endl << Flush;
- UNIT_ASSERT(result.GetErrorState().GetIssues().ToOneLineString().contains(expectedMessage));
- }
-
- void Run(const TConfig& config) {
-
- CreateTable(config.TableDDL);
- CreateTopic();
-
- TVector<TString> lambdas;
- lambdas.insert(lambdas.end(), config.AlterLambdas.begin(), config.AlterLambdas.end());
- lambdas.push_back(config.Lambda);
-
- for (size_t i = 0; i < lambdas.size(); ++i) {
- auto lambda = lambdas[i];
- if (!i) {
- CreateTransfer(lambda);
- } else {
- Sleep(TDuration::Seconds(1));
-
- AlterTransfer(lambda);
-
- if (i == lambdas.size() - 1) {
- Sleep(TDuration::Seconds(1));
- }
- }
- }
-
- for (const auto& m : config.Messages) {
- Write(m);
- }
-
- CheckResult(config.Expectations);
- }
-
- const size_t Id;
- const TString ConnectionString;
-
- const TString TopicName;
- const TString TableName;
- const TString TransferName;
-
- TDriver Driver;
- TQueryClient TableClient;
- TSession Session;
- TTopicClient TopicClient;
-
- std::vector<std::string> ColumnNames;
-};
-
-
-} // namespace
+using namespace NReplicationTest;
Y_UNIT_TEST_SUITE(Transfer)
{
@@ -924,8 +481,8 @@ Y_UNIT_TEST_SUITE(Transfer)
Y_UNIT_TEST(DropTransfer)
{
MainTestCase testCase;
- testCase.Run({
- .TableDDL = R"(
+
+ testCase.CreateTable(R"(
CREATE TABLE `%s` (
Key Uint64 NOT NULL,
Message Utf8 NOT NULL,
@@ -933,9 +490,9 @@ Y_UNIT_TEST_SUITE(Transfer)
) WITH (
STORE = COLUMN
);
- )",
-
- .Lambda = R"(
+ )");
+ testCase.CreateTopic();
+ testCase.CreateTransfer(R"(
$l = ($x) -> {
return [
<|
@@ -944,15 +501,13 @@ Y_UNIT_TEST_SUITE(Transfer)
|>
];
};
- )",
-
- .Messages = {{"Message-1"}},
+ )");
- .Expectations = {{
- _C("Key", ui64(0)),
- _C("Message", TString("Message-1")),
- }}
- });
+ testCase.Write({"Message-1"});
+ testCase.CheckResult({{
+ _C("Key", ui64(0)),
+ _C("Message", TString("Message-1")),
+ }});
{
auto result = testCase.DescribeTransfer();
@@ -966,6 +521,9 @@ Y_UNIT_TEST_SUITE(Transfer)
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToOneLineString());
UNIT_ASSERT_VALUES_EQUAL(EStatus::SCHEME_ERROR, result.GetStatus());
}
+
+ testCase.DropTable();
+ testCase.DropTopic();
}
Y_UNIT_TEST(CreateAndDropConsumer)
@@ -1022,6 +580,9 @@ Y_UNIT_TEST_SUITE(Transfer)
UNIT_ASSERT_C(i, "Unable to wait consumer has been removed");
Sleep(TDuration::Seconds(1));
}
+
+ testCase.DropTable();
+ testCase.DropTopic();
}
Y_UNIT_TEST(DescribeError_OnLambdaCompilation)
@@ -1045,6 +606,10 @@ Y_UNIT_TEST_SUITE(Transfer)
)");
testCase.CheckTransferStateError("_unknown_field_for_lambda_compilation_error");
+
+ testCase.DropTransfer();
+ testCase.DropTable();
+ testCase.DropTopic();
}
/*
Y_UNIT_TEST(DescribeError_OnWriteToShard)
@@ -1125,6 +690,9 @@ Y_UNIT_TEST_SUITE(Transfer)
UNIT_ASSERT_VALUES_EQUAL(1, consumers.size());
UNIT_ASSERT_VALUES_EQUAL("PredefinedConsumer", consumers[0].GetConsumerName());
}
+
+ testCase.DropTable();
+ testCase.DropTopic();
}
Y_UNIT_TEST(CustomFlushInterval)
@@ -1172,6 +740,10 @@ Y_UNIT_TEST_SUITE(Transfer)
UNIT_ASSERT_C(attempt, "Unable to wait transfer result");
Sleep(TDuration::Seconds(1));
}
+
+ testCase.DropTransfer();
+ testCase.DropTable();
+ testCase.DropTopic();
}
Y_UNIT_TEST(AlterFlushInterval)
@@ -1223,6 +795,10 @@ Y_UNIT_TEST_SUITE(Transfer)
testCase.CheckResult({{
_C("Message", TString("Message-1"))
}});
+
+ testCase.DropTransfer();
+ testCase.DropTable();
+ testCase.DropTopic();
}
Y_UNIT_TEST(AlterBatchSize)
@@ -1266,6 +842,10 @@ Y_UNIT_TEST_SUITE(Transfer)
testCase.CheckResult({{
_C("Message", TString("Message-1"))
}});
+
+ testCase.DropTransfer();
+ testCase.DropTable();
+ testCase.DropTopic();
}
Y_UNIT_TEST(CreateTransferSourceNotExists)
@@ -1293,6 +873,9 @@ Y_UNIT_TEST_SUITE(Transfer)
)");
testCase.CheckTransferStateError("Discovery error: local/Topic_");
+
+ testCase.DropTransfer();
+ testCase.DropTable();
}
Y_UNIT_TEST(CreateTransferSourceIsNotTopic)
@@ -1308,7 +891,7 @@ Y_UNIT_TEST_SUITE(Transfer)
);
)");
- testCase.ExecuteDDL(Sprintf(R"(
+ testCase.ExecuteDDL(Sprintf(R"(
CREATE TABLE `%s` (
Key Uint64 NOT NULL,
PRIMARY KEY (Key)
@@ -1327,6 +910,9 @@ Y_UNIT_TEST_SUITE(Transfer)
)");
testCase.CheckTransferStateError("Discovery error: local/Topic_");
+
+ testCase.DropTransfer();
+ testCase.DropTable();
}
Y_UNIT_TEST(CreateTransferRowTable)
@@ -1353,6 +939,10 @@ Y_UNIT_TEST_SUITE(Transfer)
)");
testCase.CheckTransferStateError("Only column tables are supported as transfer targets");
+
+ testCase.DropTransfer();
+ testCase.DropTable();
+ testCase.DropTopic();
}
Y_UNIT_TEST(CreateTransferTargetIsNotTable)
@@ -1375,6 +965,9 @@ Y_UNIT_TEST_SUITE(Transfer)
)");
testCase.CheckTransferStateError("Only column tables are supported as transfer targets");
+
+ testCase.DropTransfer();
+ testCase.DropTopic();
}
Y_UNIT_TEST(CreateTransferTargetNotExists)
@@ -1394,6 +987,9 @@ Y_UNIT_TEST_SUITE(Transfer)
)");
testCase.CheckTransferStateError(TStringBuilder() << "The target table `/local/" << testCase.TableName << "` does not exist");
+
+ testCase.DropTransfer();
+ testCase.DropTopic();
}
Y_UNIT_TEST(PauseAndResumeTransfer)
@@ -1462,6 +1058,10 @@ Y_UNIT_TEST_SUITE(Transfer)
testCase.ResumeTransfer();
testCase.CheckTransferState(TReplicationDescription::EState::Running);
+
+ testCase.DropTransfer();
+ testCase.DropTable();
+ testCase.DropTopic();
}
}
diff --git a/ydb/tests/functional/transfer/utils.h b/ydb/tests/functional/transfer/utils.h
new file mode 100644
index 00000000000..05bbfe52cbd
--- /dev/null
+++ b/ydb/tests/functional/transfer/utils.h
@@ -0,0 +1,545 @@
+#pragma once
+
+#include <util/system/env.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h>
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/query/client.h>
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h>
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/draft/ydb_scripting.h>
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/draft/ydb_replication.h>
+
+#include <library/cpp/threading/local_executor/local_executor.h>
+
+using namespace NYdb;
+using namespace NYdb::NQuery;
+using namespace NYdb::NTopic;
+using namespace NYdb::NReplication;
+
+namespace NReplicationTest {
+
+struct IChecker {
+ virtual void Assert(const TString& msg, const ::Ydb::Value& value) = 0;
+ virtual ~IChecker() = default;
+};
+
+template<typename T>
+struct Checker : public IChecker {
+ Checker(T&& expected)
+ : Expected(std::move(expected))
+ {}
+
+ void Assert(const TString& msg, const ::Ydb::Value& value) override {
+ UNIT_ASSERT_VALUES_EQUAL_C(Get(value), Expected, msg);
+ }
+
+ T Get(const ::Ydb::Value& value);
+
+ T Expected;
+};
+
+template<>
+inline bool Checker<bool>::Get(const ::Ydb::Value& value) {
+ return value.bool_value();
+}
+
+template<>
+inline ui32 Checker<ui32>::Get(const ::Ydb::Value& value) {
+ return value.uint32_value();
+}
+
+template<>
+inline ui64 Checker<ui64>::Get(const ::Ydb::Value& value) {
+ return value.uint64_value();
+}
+
+template<>
+inline double Checker<double>::Get(const ::Ydb::Value& value) {
+ return value.double_value();
+}
+
+template<>
+inline TString Checker<TString>::Get(const ::Ydb::Value& value) {
+ return value.text_value();
+}
+
+template<>
+inline TInstant Checker<TInstant>::Get(const ::Ydb::Value& value) {
+ return TInstant::Days(value.uint32_value());
+}
+
+template<typename T>
+std::pair<TString, std::shared_ptr<IChecker>> _C(TString&& name, T&& expected) {
+ return {
+ std::move(name),
+ std::make_shared<Checker<T>>(std::move(expected))
+ };
+}
+
+struct TMessage {
+ TString Message;
+ std::optional<ui32> Partition = std::nullopt;
+ std::optional<TString> ProducerId = std::nullopt;
+ std::optional<TString> MessageGroupId = std::nullopt;
+ std::optional<ui64> SeqNo = std::nullopt;
+};
+
+inline TMessage _withSeqNo(ui64 seqNo) {
+ return {
+ .Message = TStringBuilder() << "Message-" << seqNo,
+ .Partition = 0,
+ .ProducerId = std::nullopt,
+ .MessageGroupId = std::nullopt,
+ .SeqNo = seqNo
+ };
+}
+
+inline TMessage _withProducerId(const TString& producerId) {
+ return {
+ .Message = TStringBuilder() << "Message-" << producerId,
+ .Partition = 0,
+ .ProducerId = producerId,
+ .MessageGroupId = std::nullopt,
+ .SeqNo = std::nullopt
+ };
+}
+
+inline TMessage _withMessageGroupId(const TString& messageGroupId) {
+ return {
+ .Message = TStringBuilder() << "Message-" << messageGroupId,
+ .Partition = 0,
+ .ProducerId = messageGroupId,
+ .MessageGroupId = messageGroupId,
+ .SeqNo = std::nullopt
+ };
+}
+
+using TExpectations = TVector<TVector<std::pair<TString, std::shared_ptr<IChecker>>>>;
+
+struct TConfig {
+ const TString TableDDL;
+ const TString Lambda;
+ const TVector<TMessage> Messages;
+ const TExpectations Expectations;
+ const TVector<TString> AlterLambdas;
+};
+
+struct MainTestCase {
+
+ MainTestCase()
+ : Id(RandomNumber<size_t>())
+ , ConnectionString(GetEnv("YDB_ENDPOINT") + "/?database=" + GetEnv("YDB_DATABASE"))
+ , TopicName(TStringBuilder() << "Topic_" << Id)
+ , SourceTableName(TStringBuilder() << "SourceTable_" << Id)
+ , TableName(TStringBuilder() << "Table_" << Id)
+ , ReplicationName(TStringBuilder() << "Replication_" << Id)
+ , TransferName(TStringBuilder() << "Transfer_" << Id)
+ , Driver(TDriverConfig(ConnectionString))
+ , TableClient(Driver)
+ , Session(TableClient.GetSession().GetValueSync().GetSession())
+ , TopicClient(Driver)
+ {
+ }
+
+ void ExecuteDDL(const TString& ddl, bool checkResult = true) {
+ Cerr << "DDL: " << ddl << Endl << Flush;
+ auto res = Session.ExecuteQuery(ddl, TTxControl::NoTx()).GetValueSync();
+ if (checkResult) {
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ }
+ }
+
+ auto ExecuteSourceTableQuery(const TString& query) {
+ for (size_t i = 10; i--;) {
+ auto q = Sprintf(query.data(), SourceTableName.data());
+ Cerr << ">>>>> Query: " << q << Endl << Flush;
+ auto res = Session.ExecuteQuery(q, TTxControl::NoTx()).GetValueSync();
+ if (res.IsSuccess()) {
+ return;
+ }
+
+ UNIT_ASSERT_C(i, res.GetIssues().ToString());
+ Sleep(TDuration::Seconds(1));
+ }
+ }
+
+ void CreateTable(const TString& tableDDL) {
+ ExecuteDDL(Sprintf(tableDDL.data(), TableName.data()));
+ }
+
+ void DropTable() {
+ ExecuteDDL(Sprintf("DROP TABLE `%s`", TableName.data()));
+ }
+
+ void CreateSourceTable(const TString& tableDDL) {
+ ExecuteDDL(Sprintf(tableDDL.data(), SourceTableName.data()));
+ }
+
+ void DropSourceTable() {
+ ExecuteDDL(Sprintf("DROP TABLE `%s`", SourceTableName.data()));
+ }
+
+ void CreateTopic(size_t partitionCount = 10) {
+ ExecuteDDL(Sprintf(R"(
+ CREATE TOPIC `%s`
+ WITH (
+ min_active_partitions = %d
+ );
+ )", TopicName.data(), partitionCount));
+ }
+
+ void DropTopic() {
+ ExecuteDDL(Sprintf("DROP TOPIC `%s`", TopicName.data()));
+ }
+
+ void CreateConsumer(const TString& consumerName) {
+ ExecuteDDL(Sprintf(R"(
+ ALTER TOPIC `%s`
+ ADD CONSUMER `%s`;
+ )", TopicName.data(), consumerName.data()));
+ }
+
+ struct CreateTransferSettings {
+ std::optional<TString> ConsumerName = std::nullopt;
+ std::optional<TDuration> FlushInterval;
+ std::optional<ui64> BatchSizeBytes;
+
+ CreateTransferSettings()
+ : ConsumerName(std::nullopt)
+ , FlushInterval(TDuration::Seconds(1))
+ , BatchSizeBytes(8_MB) {}
+
+ static CreateTransferSettings WithConsumerName(const TString& consumerName) {
+ CreateTransferSettings result;
+ result.ConsumerName = consumerName;
+ return result;
+ }
+
+ static CreateTransferSettings WithBatching(const TDuration& flushInterval, const ui64 batchSize) {
+ CreateTransferSettings result;
+ result.FlushInterval = flushInterval;
+ result.BatchSizeBytes = batchSize;
+ return result;
+ }
+ };
+
+ void CreateTransfer(const TString& lambda, const CreateTransferSettings& settings = CreateTransferSettings()) {
+ TStringBuilder sb;
+ if (settings.ConsumerName) {
+ sb << ", CONSUMER = '" << *settings.ConsumerName << "'" << Endl;
+ }
+ if (settings.FlushInterval) {
+ sb << ", FLUSH_INTERVAL = Interval('PT" << settings.FlushInterval->Seconds() << "S')" << Endl;
+ }
+ if (settings.BatchSizeBytes) {
+ sb << ", BATCH_SIZE_BYTES = " << *settings.BatchSizeBytes << Endl;
+ }
+
+ auto ddl = Sprintf(R"(
+ %s;
+
+ CREATE TRANSFER `%s`
+ FROM `%s` TO `%s` USING $l
+ WITH (
+ CONNECTION_STRING = 'grpc://%s'
+ %s
+ );
+ )", lambda.data(), TransferName.data(), TopicName.data(), TableName.data(), ConnectionString.data(), sb.data());
+
+ ExecuteDDL(ddl);
+ }
+
+ struct AlterTransferSettings {
+ std::optional<TString> TransformLambda;
+ std::optional<TDuration> FlushInterval;
+ std::optional<ui64> BatchSizeBytes;
+
+ AlterTransferSettings()
+ : FlushInterval(std::nullopt)
+ , BatchSizeBytes(std::nullopt) {}
+
+ static AlterTransferSettings WithBatching(const TDuration& flushInterval, const ui64 batchSize) {
+ AlterTransferSettings result;
+ result.FlushInterval = flushInterval;
+ result.BatchSizeBytes = batchSize;
+ return result;
+ }
+
+ static AlterTransferSettings WithTransformLambda(const TString& lambda) {
+ AlterTransferSettings result;
+ result.TransformLambda = lambda;
+ return result;
+ }
+ };
+
+ void AlterTransfer(const TString& lambda) {
+ AlterTransfer(AlterTransferSettings::WithTransformLambda(lambda));
+ }
+
+ void AlterTransfer(const AlterTransferSettings& settings, bool success = true) {
+ TString lambda = settings.TransformLambda ? *settings.TransformLambda : "";
+ TString setLambda = settings.TransformLambda ? "SET USING $l" : "";
+
+ TStringBuilder sb;
+ if (settings.FlushInterval) {
+ sb << "FLUSH_INTERVAL = Interval('PT" << settings.FlushInterval->Seconds() << "S')" << Endl;
+ }
+ if (settings.BatchSizeBytes) {
+ sb << ", BATCH_SIZE_BYTES = " << *settings.BatchSizeBytes << Endl;
+ }
+
+ TString setOptions;
+ if (!sb.empty()) {
+ setOptions = TStringBuilder() << "SET (" << sb << " )";
+ }
+
+ auto res = Session.ExecuteQuery(Sprintf(R"(
+ %s;
+
+ ALTER TRANSFER `%s`
+ %s
+ %s;
+ )", lambda.data(), TransferName.data(), setLambda.data(), setOptions.data()), TTxControl::NoTx()).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(success, res.IsSuccess(), res.GetIssues().ToString());
+ }
+
+ void DropTransfer() {
+ ExecuteDDL(Sprintf("DROP TRANSFER `%s`;", TransferName.data()));
+ }
+
+ void PauseTransfer() {
+ ExecuteDDL(Sprintf(R"(
+ ALTER TRANSFER `%s`
+ SET (
+ STATE = "Paused"
+ );
+ )", TransferName.data()));
+ }
+
+ void ResumeTransfer() {
+ ExecuteDDL(Sprintf(R"(
+ ALTER TRANSFER `%s`
+ SET (
+ STATE = "StandBy"
+ );
+ )", TransferName.data()));
+ }
+
+ auto DescribeTransfer() {
+ TReplicationClient client(Driver);
+
+ TDescribeReplicationSettings settings;
+ settings.IncludeStats(true);
+
+ return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings).ExtractValueSync();
+ }
+
+ void CreateReplication() {
+ auto ddl = Sprintf(R"(
+ CREATE ASYNC REPLICATION `%s`
+ FOR `%s` AS `%s`
+ WITH (
+ CONNECTION_STRING = 'grpc://%s'
+ );
+ )", ReplicationName.data(), SourceTableName.data(), TableName.data(), ConnectionString.data());
+
+ ExecuteDDL(ddl);
+ }
+
+ void DropReplicatopn() {
+ ExecuteDDL(Sprintf("DROP ASYNC REPLICATION `%s`;", ReplicationName.data()));
+ }
+
+ auto DescribeReplication() {
+ TReplicationClient client(Driver);
+
+ TDescribeReplicationSettings settings;
+ settings.IncludeStats(true);
+
+ return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + ReplicationName, settings).ExtractValueSync();
+ }
+
+ TReplicationDescription CheckReplicationState(TReplicationDescription::EState expected) {
+ for (size_t i = 20; i--;) {
+ auto result = DescribeReplication().GetReplicationDescription();
+ if (expected == result.GetState()) {
+ return result;
+ }
+
+ UNIT_ASSERT_C(i, "Unable to wait replication state. Expected: " << expected << ", actual: " << result.GetState());
+ Sleep(TDuration::Seconds(1));
+ }
+
+ Y_UNREACHABLE();
+ }
+
+ void PauseReplication() {
+ ExecuteDDL(Sprintf(R"(
+ ALTER ASYNC REPLICATION `%s`
+ SET (
+ STATE = "Paused"
+ );
+ )", ReplicationName.data()));
+ }
+
+ void ResumeReplication() {
+ ExecuteDDL(Sprintf(R"(
+ ALTER ASYNC REPLICATION `%s`
+ SET (
+ STATE = "StandBy"
+ );
+ )", ReplicationName.data()));
+ }
+
+ auto DescribeTopic() {
+ TDescribeTopicSettings settings;
+ settings.IncludeLocation(true);
+ settings.IncludeStats(true);
+
+ return TopicClient.DescribeTopic(TopicName, settings).ExtractValueSync();
+ }
+
+ void Write(const TMessage& message) {
+ TWriteSessionSettings writeSettings;
+ writeSettings.Path(TopicName);
+ writeSettings.DeduplicationEnabled(message.SeqNo);
+ if (message.Partition) {
+ writeSettings.PartitionId(message.Partition);
+ }
+ if (message.ProducerId) {
+ writeSettings.ProducerId(*message.ProducerId);
+ }
+ if (message.MessageGroupId) {
+ writeSettings.MessageGroupId(*message.MessageGroupId);
+ }
+ auto writeSession = TopicClient.CreateSimpleBlockingWriteSession(writeSettings);
+
+ UNIT_ASSERT(writeSession->Write(message.Message, message.SeqNo));
+ writeSession->Close(TDuration::Seconds(1));
+ }
+
+ std::pair<i64, Ydb::ResultSet> DoRead(const TExpectations& expectations) {
+ auto& e = expectations.front();
+
+ TStringBuilder columns;
+ for (size_t i = 0; i < e.size(); ++i) {
+ if (i) {
+ columns << ", ";
+ }
+ columns << "`" << e[i].first << "`";
+ }
+
+
+ auto query = Sprintf("SELECT %s FROM `%s` ORDER BY %s", columns.data(), TableName.data(), columns.data());
+ Cerr << ">>>>> Query: " << query << Endl << Flush;
+ auto res = Session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
+ if (!res.IsSuccess()) {
+ Cerr << ">>>>> Query error: " << res.GetIssues().ToString() << Endl << Flush;
+ TResultSet r{Ydb::ResultSet()};
+ return {-1, NYdb::TProtoAccessor::GetProto(r)};
+ }
+
+ const auto proto = NYdb::TProtoAccessor::GetProto(res.GetResultSet(0));
+ return {proto.rowsSize(), proto};
+ }
+
+ void CheckResult(const TExpectations& expectations) {
+ for (size_t attempt = 20; attempt--; ) {
+ auto res = DoRead(expectations);
+ Cerr << "Attempt=" << attempt << " count=" << res.first << Endl << Flush;
+ if (res.first == (ssize_t)expectations.size()) {
+ const Ydb::ResultSet& proto = res.second;
+ for (size_t i = 0; i < expectations.size(); ++i) {
+ auto& row = proto.rows(i);
+ auto& rowExpectations = expectations[i];
+ for (size_t i = 0; i < rowExpectations.size(); ++i) {
+ auto& c = rowExpectations[i];
+ TString msg = TStringBuilder() << "Row " << i << " column '" << c.first << "': ";
+ c.second->Assert(msg, row.items(i));
+ }
+ }
+
+ break;
+ }
+
+ UNIT_ASSERT_C(attempt, "Unable to wait transfer result");
+ Sleep(TDuration::Seconds(1));
+ }
+ }
+
+ TReplicationDescription CheckTransferState(TReplicationDescription::EState expected) {
+ for (size_t i = 20; i--;) {
+ auto result = DescribeTransfer().GetReplicationDescription();
+ if (expected == result.GetState()) {
+ return result;
+ }
+
+ UNIT_ASSERT_C(i, "Unable to wait transfer state. Expected: " << expected << ", actual: " << result.GetState());
+ Sleep(TDuration::Seconds(1));
+ }
+
+ Y_UNREACHABLE();
+ }
+
+ void CheckTransferStateError(const TString& expectedMessage) {
+ auto result = CheckTransferState(TReplicationDescription::EState::Error);
+ Cerr << ">>>>> ACTUAL: " << result.GetErrorState().GetIssues().ToOneLineString() << Endl << Flush;
+ Cerr << ">>>>> EXPECTED: " << expectedMessage << Endl << Flush;
+ UNIT_ASSERT(result.GetErrorState().GetIssues().ToOneLineString().contains(expectedMessage));
+ }
+
+ void Run(const TConfig& config) {
+
+ CreateTable(config.TableDDL);
+ CreateTopic();
+
+ TVector<TString> lambdas;
+ lambdas.insert(lambdas.end(), config.AlterLambdas.begin(), config.AlterLambdas.end());
+ lambdas.push_back(config.Lambda);
+
+ for (size_t i = 0; i < lambdas.size(); ++i) {
+ auto lambda = lambdas[i];
+ if (!i) {
+ CreateTransfer(lambda);
+ } else {
+ Sleep(TDuration::Seconds(1));
+
+ AlterTransfer(lambda);
+
+ if (i == lambdas.size() - 1) {
+ Sleep(TDuration::Seconds(1));
+ }
+ }
+ }
+
+ for (const auto& m : config.Messages) {
+ Write(m);
+ }
+
+ CheckResult(config.Expectations);
+
+ DropTransfer();
+ DropTable();
+ DropTopic();
+ }
+
+ const size_t Id;
+ const TString ConnectionString;
+
+ const TString TopicName;
+ const TString SourceTableName;
+ const TString TableName;
+ const TString ReplicationName;
+ const TString TransferName;
+
+ TDriver Driver;
+ TQueryClient TableClient;
+ TSession Session;
+ TTopicClient TopicClient;
+
+ std::vector<std::string> ColumnNames;
+};
+
+
+} // namespace
diff --git a/ydb/tests/functional/transfer/ya.make b/ydb/tests/functional/transfer/ya.make
index 152a3fde51d..c6b70829d5a 100644
--- a/ydb/tests/functional/transfer/ya.make
+++ b/ydb/tests/functional/transfer/ya.make
@@ -16,7 +16,8 @@ PEERDIR(
)
SRCS(
- main.cpp
+ replication.cpp
+ transfer.cpp
)
INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)
@@ -24,7 +25,9 @@ INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)
SIZE(MEDIUM)
IF (SANITIZER_TYPE)
- REQUIREMENTS(ram:16 cpu:4)
+ REQUIREMENTS(ram:24 cpu:4)
+ELSE()
+ REQUIREMENTS(ram:16 cpu:2)
ENDIF()
END()