aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2025-02-28 14:38:47 +0500
committerGitHub <noreply@github.com>2025-02-28 14:38:47 +0500
commitc35545b5a68f73d16a97f58e3788e343bc8c8e95 (patch)
tree1a51e84b2773616a3d197575115f7c64204cc7af
parent724231a4bae4aa40fe1441cedf8f368dd2071387 (diff)
downloadydb-c35545b5a68f73d16a97f58e3788e343bc8c8e95.tar.gz
Refactoring of tests (#15179)
-rw-r--r--ydb/tests/functional/transfer/main.cpp346
1 files changed, 185 insertions, 161 deletions
diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/main.cpp
index b24dc77ed9..a52b4a3ee7 100644
--- a/ydb/tests/functional/transfer/main.cpp
+++ b/ydb/tests/functional/transfer/main.cpp
@@ -113,153 +113,178 @@ TMessage _withMessageGroupId(const TString& messageGroupId) {
};
}
+using TExpectations = TVector<TVector<std::pair<TString, std::shared_ptr<IChecker>>>>;
+
struct TConfig {
- TString TableDDL;
+ const TString TableDDL;
const TString Lambda;
const TVector<TMessage> Messages;
- TVector<std::pair<TString, std::shared_ptr<IChecker>>> Expectations;
+ const TExpectations Expectations;
const TVector<TString> AlterLambdas;
};
struct MainTestCase {
- MainTestCase(TConfig&& config)
- : Config(std::move(config)) {
- size_t id = TestCaseCounter++;
+ MainTestCase()
+ : Id(TestCaseCounter++)
+ , ConnectionString(GetEnv("YDB_ENDPOINT") + "/?database=" + GetEnv("YDB_DATABASE"))
+ , TopicName(TStringBuilder() << "Topic_" << Id)
+ , TableName(TStringBuilder() << "Table_" << Id)
+ , TransferName(TStringBuilder() << "Transfer_" << Id)
+ , Driver(TDriverConfig(ConnectionString))
+ , TableClient(Driver)
+ , Session(TableClient.GetSession().GetValueSync().GetSession())
+ , TopicClient(Driver)
+ {
+ }
- TopicName = TStringBuilder() << "Topic_" << id;
- TableName = TStringBuilder() << "Table_" << id;
- TransferName = TStringBuilder() << "Transfer_" << id;
+ void CreateTable(const TString& tableDDL) {
+ auto ddl = Sprintf(tableDDL.data(), TableName.data());
+ auto res = Session.ExecuteQuery(ddl, TTxControl::NoTx()).GetValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}
- void Run() {
- TString connectionString = GetEnv("YDB_ENDPOINT") + "/?database=" + GetEnv("YDB_DATABASE");
+ void CreateTopic() {
+ auto res = Session.ExecuteQuery(Sprintf(R"(
+ CREATE TOPIC `%s`
+ WITH (
+ min_active_partitions = 10
+ );
+ )", TopicName.data()), TTxControl::NoTx()).GetValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ }
- auto config = TDriverConfig(connectionString);
- auto driver = TDriver(config);
- auto tableClient = TQueryClient(driver);
- auto session = tableClient.GetSession().GetValueSync().GetSession();
- auto topicClient = TTopicClient(driver);
+ void CreateTransfer(const TString& lambda) {
+ auto res = Session.ExecuteQuery(Sprintf(R"(
+ %s;
+
+ CREATE TRANSFER `%s`
+ FROM `%s` TO `%s` USING $l
+ WITH (
+ CONNECTION_STRING = 'grpc://%s'
+ -- , TOKEN = 'user@builtin'
+ );
+ )", lambda.data(), TransferName.data(), TopicName.data(), TableName.data(), ConnectionString.data()), TTxControl::NoTx()).GetValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ }
- {
- auto tableDDL = Sprintf(Config.TableDDL.data(), TableName.data());
- auto res = session.ExecuteQuery(tableDDL, TTxControl::NoTx()).GetValueSync();
- UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
- }
+ void AlterTransfer(const TString& lambda) {
+ auto res = Session.ExecuteQuery(Sprintf(R"(
+ %s;
- {
- auto res = session.ExecuteQuery(Sprintf(R"(
- CREATE TOPIC `%s`
- WITH (
- min_active_partitions = 10
- );
- )", TopicName.data()), TTxControl::NoTx()).GetValueSync();
- UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
- }
-
- TVector<TString> lambdas;
- lambdas.insert(lambdas.end(), Config.AlterLambdas.begin(), Config.AlterLambdas.end());
- lambdas.push_back(Config.Lambda);
+ ALTER TRANSFER `%s`
+ SET USING $l;
+ )", lambda.data(), TransferName.data()), TTxControl::NoTx()).GetValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ }
- for (size_t i = 0; i < lambdas.size(); ++i) {
- auto lambda = lambdas[i];
- if (!i) {
- auto res = session.ExecuteQuery(Sprintf(R"(
- %s;
-
- CREATE TRANSFER `%s`
- FROM `%s` TO `%s` USING $l
- WITH (
- CONNECTION_STRING = 'grpc://%s'
- -- , TOKEN = 'user@builtin'
- );
- )", lambda.data(), TransferName.data(), TopicName.data(), TableName.data(), connectionString.data()), TTxControl::NoTx()).GetValueSync();
- UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
- } else {
- Sleep(TDuration::Seconds(1));
+ void Write(const TMessage& message) {
+ TWriteSessionSettings writeSettings;
+ writeSettings.Path(TopicName);
+ writeSettings.DeduplicationEnabled(message.SeqNo);
+ if (message.Partition) {
+ writeSettings.PartitionId(message.Partition);
+ }
+ if (message.ProducerId) {
+ writeSettings.ProducerId(*message.ProducerId);
+ }
+ if (message.MessageGroupId) {
+ writeSettings.MessageGroupId(*message.MessageGroupId);
+ }
+ auto writeSession = TopicClient.CreateSimpleBlockingWriteSession(writeSettings);
- auto res = session.ExecuteQuery(Sprintf(R"(
- %s;
+ UNIT_ASSERT(writeSession->Write(message.Message, message.SeqNo));
+ writeSession->Close(TDuration::Seconds(1));
+ }
- ALTER TRANSFER `%s`
- SET USING $l;
- )", lambda.data(), TransferName.data()), TTxControl::NoTx()).GetValueSync();
- UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ std::pair<ui64, Ydb::ResultSet> DoRead(const TExpectations& expectations) {
+ auto& e = expectations.front();
- if (i == lambdas.size() - 1) {
- Sleep(TDuration::Seconds(1));
- }
+ TStringBuilder columns;
+ for (size_t i = 0; i < e.size(); ++i) {
+ if (i) {
+ columns << ", ";
}
+ columns << "`" << e[i].first << "`";
}
- {
- for (const auto& m : Config.Messages) {
- TWriteSessionSettings writeSettings;
- writeSettings.Path(TopicName);
- writeSettings.DeduplicationEnabled(m.SeqNo);
- if (m.Partition) {
- writeSettings.PartitionId(m.Partition);
- }
- if (m.ProducerId) {
- writeSettings.ProducerId(*m.ProducerId);
- }
- if (m.MessageGroupId) {
- writeSettings.MessageGroupId(*m.MessageGroupId);
- }
- auto writeSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings);
- UNIT_ASSERT(writeSession->Write(m.Message, m.SeqNo));
- writeSession->Close(TDuration::Seconds(1));
- }
- }
+ auto res = Session.ExecuteQuery(
+ Sprintf("SELECT %s FROM `%s`", columns.data(), TableName.data()),
+ TTxControl::NoTx()).GetValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+
+ const auto proto = NYdb::TProtoAccessor::GetProto(res.GetResultSet(0));
+ return {proto.rowsSize(), proto};
+ }
- {
- for (size_t attempt = 20; attempt--; ) {
- auto res = DoRead(session);
- Cerr << "Attempt=" << attempt << " count=" << res.first << Endl << Flush;
- if (res.first == Config.Messages.size()) {
- const Ydb::ResultSet& proto = res.second;
- for (size_t i = 0; i < Config.Expectations.size(); ++i) {
- auto& c = Config.Expectations[i];
- TString msg = TStringBuilder() << "Column '" << c.first << "': ";
- c.second->Assert(msg, proto.rows(0).items(i));
+ void CheckResult(const TExpectations& expectations) {
+ for (size_t attempt = 20; attempt--; ) {
+ auto res = DoRead(expectations);
+ Cerr << "Attempt=" << attempt << " count=" << res.first << Endl << Flush;
+ if (res.first == expectations.size()) {
+ const Ydb::ResultSet& proto = res.second;
+ for (size_t i = 0; i < expectations.size(); ++i) {
+ auto& row = proto.rows(0);
+ auto& rowExpectations = expectations[i];
+ for (size_t i = 0; i < rowExpectations.size(); ++i) {
+ auto& c = rowExpectations[i];
+ TString msg = TStringBuilder() << "Row " << i << " column '" << c.first << "': ";
+ c.second->Assert(msg, row.items(i));
}
-
- break;
}
- UNIT_ASSERT_C(attempt, "Unable to wait replication result");
- Sleep(TDuration::Seconds(1));
+ break;
}
+
+ UNIT_ASSERT_C(attempt, "Unable to wait replication result");
+ Sleep(TDuration::Seconds(1));
}
}
+ void Run(const TConfig& config) {
- std::pair<ui64, Ydb::ResultSet> DoRead(TSession& s) {
- TStringBuilder columns;
- for (size_t i = 0; i < Config.Expectations.size(); ++i) {
- if (i) {
- columns << ", ";
+ CreateTable(config.TableDDL);
+ CreateTopic();
+
+ TVector<TString> lambdas;
+ lambdas.insert(lambdas.end(), config.AlterLambdas.begin(), config.AlterLambdas.end());
+ lambdas.push_back(config.Lambda);
+
+ for (size_t i = 0; i < lambdas.size(); ++i) {
+ auto lambda = lambdas[i];
+ if (!i) {
+ CreateTransfer(lambda);
+ } else {
+ Sleep(TDuration::Seconds(1));
+
+ AlterTransfer(lambda);
+
+ if (i == lambdas.size() - 1) {
+ Sleep(TDuration::Seconds(1));
+ }
}
- columns << "`" << Config.Expectations[i].first << "`";
}
+ for (const auto& m : config.Messages) {
+ Write(m);
+ }
- auto res = s.ExecuteQuery(
- Sprintf("SELECT %s FROM `%s`", columns.data(), TableName.data()),
- TTxControl::NoTx()).GetValueSync();
- UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
-
- const auto proto = NYdb::TProtoAccessor::GetProto(res.GetResultSet(0));
- return {proto.rowsSize(), proto};
+ CheckResult(config.Expectations);
}
- TConfig Config;
+ const size_t Id;
+ const TString ConnectionString;
+
+ const TString TopicName;
+ const TString TableName;
+ const TString TransferName;
- TString TopicName;
- TString TableName;
- TString TransferName;
+ TDriver Driver;
+ TQueryClient TableClient;
+ TSession Session;
+ TTopicClient TopicClient;
std::vector<std::string> ColumnNames;
};
@@ -271,7 +296,7 @@ Y_UNIT_TEST_SUITE(Transfer)
{
Y_UNIT_TEST(Main_ColumnTable_KeyColumnFirst)
{
- MainTestCase({
+ MainTestCase().Run({
.TableDDL = R"(
CREATE TABLE `%s` (
Key Uint64 NOT NULL,
@@ -295,16 +320,16 @@ Y_UNIT_TEST_SUITE(Transfer)
.Messages = {{"Message-1"}},
- .Expectations = {
+ .Expectations = {{
_C("Key", ui64(0)),
_C("Message", TString("Message-1")),
- }
- }).Run();
+ }}
+ });
}
Y_UNIT_TEST(Main_ColumnTable_KeyColumnLast)
{
- MainTestCase({
+ MainTestCase().Run({
.TableDDL = R"(
CREATE TABLE `%s` (
Message Utf8 NOT NULL,
@@ -328,16 +353,16 @@ Y_UNIT_TEST_SUITE(Transfer)
.Messages = {{"Message-1"}},
- .Expectations = {
+ .Expectations = {{
_C("Key", ui64(0)),
_C("Message", TString("Message-1")),
- }
- }).Run();
+ }}
+ });
}
Y_UNIT_TEST(Main_ColumnTable_ComplexKey)
{
- MainTestCase({
+ MainTestCase().Run({
.TableDDL = R"(
CREATE TABLE `%s` (
Key1 Uint64 NOT NULL,
@@ -369,20 +394,20 @@ Y_UNIT_TEST_SUITE(Transfer)
.Messages = {{"Message-1"}},
- .Expectations = {
+ .Expectations = {{
_C("Key1", ui64(1)),
_C("Key2", ui64(2)),
_C("Key3", ui64(3)),
_C("Key4", ui64(4)),
_C("Value1", TString("value-1")),
_C("Value2", TString("value-2")),
- }
- }).Run();
+ }}
+ });
}
Y_UNIT_TEST(Main_ColumnTable_JsonMessage)
{
- MainTestCase({
+ MainTestCase().Run({
.TableDDL = R"(
CREATE TABLE `%s` (
Id Uint64 NOT NULL,
@@ -417,18 +442,18 @@ Y_UNIT_TEST_SUITE(Transfer)
"salary": "123"
})"}},
- .Expectations = {
+ .Expectations = {{
_C("Id", ui64(1)),
_C("FirstName", TString("Vasya")),
_C("LastName", TString("Pupkin")),
_C("Salary", ui64(123)),
- }
- }).Run();
+ }}
+ });
}
Y_UNIT_TEST(Main_ColumnTable_NullableColumn)
{
- MainTestCase({
+ MainTestCase().Run({
.TableDDL = R"(
CREATE TABLE `%s` (
Key Uint64 NOT NULL,
@@ -452,16 +477,16 @@ Y_UNIT_TEST_SUITE(Transfer)
.Messages = {{"Message-1"}},
- .Expectations = {
+ .Expectations = {{
_C("Key", ui64(0)),
_C("Message", TString("Message-1")),
- }
- }).Run();
+ }}
+ });
}
Y_UNIT_TEST(Main_ColumnTable_Date)
{
- MainTestCase({
+ MainTestCase().Run({
.TableDDL = R"(
CREATE TABLE `%s` (
Key Uint64 NOT NULL,
@@ -485,16 +510,16 @@ Y_UNIT_TEST_SUITE(Transfer)
.Messages = {{"2025-02-21"}},
- .Expectations = {
+ .Expectations = {{
_C("Key", ui64(0)),
_C("Message", TInstant::ParseIso8601("2025-02-21")),
- }
- }).Run();
+ }}
+ });
}
Y_UNIT_TEST(Main_ColumnTable_Double)
{
- MainTestCase({
+ MainTestCase().Run({
.TableDDL = R"(
CREATE TABLE `%s` (
Key Uint64 NOT NULL,
@@ -518,16 +543,16 @@ Y_UNIT_TEST_SUITE(Transfer)
.Messages = {{"1.23"}},
- .Expectations = {
+ .Expectations = {{
_C("Key", ui64(0)),
_C("Message", 1.23),
- }
- }).Run();
+ }}
+ });
}
Y_UNIT_TEST(Main_ColumnTable_Utf8_Long)
{
- MainTestCase({
+ MainTestCase().Run({
.TableDDL = R"(
CREATE TABLE `%s` (
Key Uint64 NOT NULL,
@@ -551,16 +576,16 @@ Y_UNIT_TEST_SUITE(Transfer)
.Messages = {{"Message-1 long value 0 1234567890 1 1234567890 2 1234567890 3 1234567890 4 1234567890 5 1234567890 6 1234567890"}},
- .Expectations = {
+ .Expectations = {{
_C("Key", ui64(0)),
_C("Message", TString("Message-1 long value 0 1234567890 1 1234567890 2 1234567890 3 1234567890 4 1234567890 5 1234567890 6 1234567890")),
- }
- }).Run();
+ }}
+ });
}
Y_UNIT_TEST(Main_MessageField_Partition)
{
- MainTestCase({
+ MainTestCase().Run({
.TableDDL = R"(
CREATE TABLE `%s` (
Partition Uint32 NOT NULL,
@@ -584,16 +609,16 @@ Y_UNIT_TEST_SUITE(Transfer)
.Messages = {{"Message-1", 7}},
- .Expectations = {
+ .Expectations = {{
_C("Partition", ui32(7)),
_C("Message", TString("Message-1")),
- }
- }).Run();
+ }}
+ });
}
Y_UNIT_TEST(Main_MessageField_SeqNo)
{
- MainTestCase({
+ MainTestCase().Run({
.TableDDL = R"(
CREATE TABLE `%s` (
SeqNo Uint64 NOT NULL,
@@ -617,15 +642,15 @@ Y_UNIT_TEST_SUITE(Transfer)
.Messages = {_withSeqNo(13)},
- .Expectations = {
+ .Expectations = {{
_C("SeqNo", ui64(13)),
- }
- }).Run();
+ }}
+ });
}
Y_UNIT_TEST(Main_MessageField_ProducerId)
{
- MainTestCase({
+ MainTestCase().Run({
.TableDDL = R"(
CREATE TABLE `%s` (
Offset Uint64 NOT NULL,
@@ -649,15 +674,15 @@ Y_UNIT_TEST_SUITE(Transfer)
.Messages = {_withProducerId("Producer-13")},
- .Expectations = {
+ .Expectations = {{
_C("ProducerId", TString("Producer-13")),
- }
- }).Run();
+ }}
+ });
}
Y_UNIT_TEST(Main_MessageField_MessageGroupId)
{
- MainTestCase({
+ MainTestCase().Run({
.TableDDL = R"(
CREATE TABLE `%s` (
Offset Uint64 NOT NULL,
@@ -681,15 +706,15 @@ Y_UNIT_TEST_SUITE(Transfer)
.Messages = {_withMessageGroupId("MessageGroupId-13")},
- .Expectations = {
+ .Expectations = {{
_C("MessageGroupId", TString("MessageGroupId-13")),
- }
- }).Run();
+ }}
+ });
}
Y_UNIT_TEST(AlterLambda)
{
- MainTestCase({
+ MainTestCase().Run({
.TableDDL = R"(
CREATE TABLE `%s` (
Key Uint64 NOT NULL,
@@ -713,9 +738,9 @@ Y_UNIT_TEST_SUITE(Transfer)
.Messages = {{"Message-1"}},
- .Expectations = {
+ .Expectations = {{
_C("Message", TString("Message-1 new lambda")),
- },
+ }},
.AlterLambdas = {
R"(
@@ -740,9 +765,8 @@ Y_UNIT_TEST_SUITE(Transfer)
)",
}
- }).Run();
+ });
}
-
}