diff options
author | Nikolay Shestakov <tesseract@ydb.tech> | 2025-02-28 14:38:47 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-28 14:38:47 +0500 |
commit | c35545b5a68f73d16a97f58e3788e343bc8c8e95 (patch) | |
tree | 1a51e84b2773616a3d197575115f7c64204cc7af | |
parent | 724231a4bae4aa40fe1441cedf8f368dd2071387 (diff) | |
download | ydb-c35545b5a68f73d16a97f58e3788e343bc8c8e95.tar.gz |
Refactoring of tests (#15179)
-rw-r--r-- | ydb/tests/functional/transfer/main.cpp | 346 |
1 files changed, 185 insertions, 161 deletions
diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/main.cpp index b24dc77ed9..a52b4a3ee7 100644 --- a/ydb/tests/functional/transfer/main.cpp +++ b/ydb/tests/functional/transfer/main.cpp @@ -113,153 +113,178 @@ TMessage _withMessageGroupId(const TString& messageGroupId) { }; } +using TExpectations = TVector<TVector<std::pair<TString, std::shared_ptr<IChecker>>>>; + struct TConfig { - TString TableDDL; + const TString TableDDL; const TString Lambda; const TVector<TMessage> Messages; - TVector<std::pair<TString, std::shared_ptr<IChecker>>> Expectations; + const TExpectations Expectations; const TVector<TString> AlterLambdas; }; struct MainTestCase { - MainTestCase(TConfig&& config) - : Config(std::move(config)) { - size_t id = TestCaseCounter++; + MainTestCase() + : Id(TestCaseCounter++) + , ConnectionString(GetEnv("YDB_ENDPOINT") + "/?database=" + GetEnv("YDB_DATABASE")) + , TopicName(TStringBuilder() << "Topic_" << Id) + , TableName(TStringBuilder() << "Table_" << Id) + , TransferName(TStringBuilder() << "Transfer_" << Id) + , Driver(TDriverConfig(ConnectionString)) + , TableClient(Driver) + , Session(TableClient.GetSession().GetValueSync().GetSession()) + , TopicClient(Driver) + { + } - TopicName = TStringBuilder() << "Topic_" << id; - TableName = TStringBuilder() << "Table_" << id; - TransferName = TStringBuilder() << "Transfer_" << id; + void CreateTable(const TString& tableDDL) { + auto ddl = Sprintf(tableDDL.data(), TableName.data()); + auto res = Session.ExecuteQuery(ddl, TTxControl::NoTx()).GetValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); } - void Run() { - TString connectionString = GetEnv("YDB_ENDPOINT") + "/?database=" + GetEnv("YDB_DATABASE"); + void CreateTopic() { + auto res = Session.ExecuteQuery(Sprintf(R"( + CREATE TOPIC `%s` + WITH ( + min_active_partitions = 10 + ); + )", TopicName.data()), TTxControl::NoTx()).GetValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + } - auto config = TDriverConfig(connectionString); - auto driver = TDriver(config); - auto tableClient = TQueryClient(driver); - auto session = tableClient.GetSession().GetValueSync().GetSession(); - auto topicClient = TTopicClient(driver); + void CreateTransfer(const TString& lambda) { + auto res = Session.ExecuteQuery(Sprintf(R"( + %s; + + CREATE TRANSFER `%s` + FROM `%s` TO `%s` USING $l + WITH ( + CONNECTION_STRING = 'grpc://%s' + -- , TOKEN = 'user@builtin' + ); + )", lambda.data(), TransferName.data(), TopicName.data(), TableName.data(), ConnectionString.data()), TTxControl::NoTx()).GetValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + } - { - auto tableDDL = Sprintf(Config.TableDDL.data(), TableName.data()); - auto res = session.ExecuteQuery(tableDDL, TTxControl::NoTx()).GetValueSync(); - UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); - } + void AlterTransfer(const TString& lambda) { + auto res = Session.ExecuteQuery(Sprintf(R"( + %s; - { - auto res = session.ExecuteQuery(Sprintf(R"( - CREATE TOPIC `%s` - WITH ( - min_active_partitions = 10 - ); - )", TopicName.data()), TTxControl::NoTx()).GetValueSync(); - UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); - } - - TVector<TString> lambdas; - lambdas.insert(lambdas.end(), Config.AlterLambdas.begin(), Config.AlterLambdas.end()); - lambdas.push_back(Config.Lambda); + ALTER TRANSFER `%s` + SET USING $l; + )", lambda.data(), TransferName.data()), TTxControl::NoTx()).GetValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + } - for (size_t i = 0; i < lambdas.size(); ++i) { - auto lambda = lambdas[i]; - if (!i) { - auto res = session.ExecuteQuery(Sprintf(R"( - %s; - - CREATE TRANSFER `%s` - FROM `%s` TO `%s` USING $l - WITH ( - CONNECTION_STRING = 'grpc://%s' - -- , TOKEN = 'user@builtin' - ); - )", lambda.data(), TransferName.data(), TopicName.data(), TableName.data(), connectionString.data()), TTxControl::NoTx()).GetValueSync(); - UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); - } else { - Sleep(TDuration::Seconds(1)); + void Write(const TMessage& message) { + TWriteSessionSettings writeSettings; + writeSettings.Path(TopicName); + writeSettings.DeduplicationEnabled(message.SeqNo); + if (message.Partition) { + writeSettings.PartitionId(message.Partition); + } + if (message.ProducerId) { + writeSettings.ProducerId(*message.ProducerId); + } + if (message.MessageGroupId) { + writeSettings.MessageGroupId(*message.MessageGroupId); + } + auto writeSession = TopicClient.CreateSimpleBlockingWriteSession(writeSettings); - auto res = session.ExecuteQuery(Sprintf(R"( - %s; + UNIT_ASSERT(writeSession->Write(message.Message, message.SeqNo)); + writeSession->Close(TDuration::Seconds(1)); + } - ALTER TRANSFER `%s` - SET USING $l; - )", lambda.data(), TransferName.data()), TTxControl::NoTx()).GetValueSync(); - UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + std::pair<ui64, Ydb::ResultSet> DoRead(const TExpectations& expectations) { + auto& e = expectations.front(); - if (i == lambdas.size() - 1) { - Sleep(TDuration::Seconds(1)); - } + TStringBuilder columns; + for (size_t i = 0; i < e.size(); ++i) { + if (i) { + columns << ", "; } + columns << "`" << e[i].first << "`"; } - { - for (const auto& m : Config.Messages) { - TWriteSessionSettings writeSettings; - writeSettings.Path(TopicName); - writeSettings.DeduplicationEnabled(m.SeqNo); - if (m.Partition) { - writeSettings.PartitionId(m.Partition); - } - if (m.ProducerId) { - writeSettings.ProducerId(*m.ProducerId); - } - if (m.MessageGroupId) { - writeSettings.MessageGroupId(*m.MessageGroupId); - } - auto writeSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings); - UNIT_ASSERT(writeSession->Write(m.Message, m.SeqNo)); - writeSession->Close(TDuration::Seconds(1)); - } - } + auto res = Session.ExecuteQuery( + Sprintf("SELECT %s FROM `%s`", columns.data(), TableName.data()), + TTxControl::NoTx()).GetValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + + const auto proto = NYdb::TProtoAccessor::GetProto(res.GetResultSet(0)); + return {proto.rowsSize(), proto}; + } - { - for (size_t attempt = 20; attempt--; ) { - auto res = DoRead(session); - Cerr << "Attempt=" << attempt << " count=" << res.first << Endl << Flush; - if (res.first == Config.Messages.size()) { - const Ydb::ResultSet& proto = res.second; - for (size_t i = 0; i < Config.Expectations.size(); ++i) { - auto& c = Config.Expectations[i]; - TString msg = TStringBuilder() << "Column '" << c.first << "': "; - c.second->Assert(msg, proto.rows(0).items(i)); + void CheckResult(const TExpectations& expectations) { + for (size_t attempt = 20; attempt--; ) { + auto res = DoRead(expectations); + Cerr << "Attempt=" << attempt << " count=" << res.first << Endl << Flush; + if (res.first == expectations.size()) { + const Ydb::ResultSet& proto = res.second; + for (size_t i = 0; i < expectations.size(); ++i) { + auto& row = proto.rows(0); + auto& rowExpectations = expectations[i]; + for (size_t i = 0; i < rowExpectations.size(); ++i) { + auto& c = rowExpectations[i]; + TString msg = TStringBuilder() << "Row " << i << " column '" << c.first << "': "; + c.second->Assert(msg, row.items(i)); } - - break; } - UNIT_ASSERT_C(attempt, "Unable to wait replication result"); - Sleep(TDuration::Seconds(1)); + break; } + + UNIT_ASSERT_C(attempt, "Unable to wait replication result"); + Sleep(TDuration::Seconds(1)); } } + void Run(const TConfig& config) { - std::pair<ui64, Ydb::ResultSet> DoRead(TSession& s) { - TStringBuilder columns; - for (size_t i = 0; i < Config.Expectations.size(); ++i) { - if (i) { - columns << ", "; + CreateTable(config.TableDDL); + CreateTopic(); + + TVector<TString> lambdas; + lambdas.insert(lambdas.end(), config.AlterLambdas.begin(), config.AlterLambdas.end()); + lambdas.push_back(config.Lambda); + + for (size_t i = 0; i < lambdas.size(); ++i) { + auto lambda = lambdas[i]; + if (!i) { + CreateTransfer(lambda); + } else { + Sleep(TDuration::Seconds(1)); + + AlterTransfer(lambda); + + if (i == lambdas.size() - 1) { + Sleep(TDuration::Seconds(1)); + } } - columns << "`" << Config.Expectations[i].first << "`"; } + for (const auto& m : config.Messages) { + Write(m); + } - auto res = s.ExecuteQuery( - Sprintf("SELECT %s FROM `%s`", columns.data(), TableName.data()), - TTxControl::NoTx()).GetValueSync(); - UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); - - const auto proto = NYdb::TProtoAccessor::GetProto(res.GetResultSet(0)); - return {proto.rowsSize(), proto}; + CheckResult(config.Expectations); } - TConfig Config; + const size_t Id; + const TString ConnectionString; + + const TString TopicName; + const TString TableName; + const TString TransferName; - TString TopicName; - TString TableName; - TString TransferName; + TDriver Driver; + TQueryClient TableClient; + TSession Session; + TTopicClient TopicClient; std::vector<std::string> ColumnNames; }; @@ -271,7 +296,7 @@ Y_UNIT_TEST_SUITE(Transfer) { Y_UNIT_TEST(Main_ColumnTable_KeyColumnFirst) { - MainTestCase({ + MainTestCase().Run({ .TableDDL = R"( CREATE TABLE `%s` ( Key Uint64 NOT NULL, @@ -295,16 +320,16 @@ Y_UNIT_TEST_SUITE(Transfer) .Messages = {{"Message-1"}}, - .Expectations = { + .Expectations = {{ _C("Key", ui64(0)), _C("Message", TString("Message-1")), - } - }).Run(); + }} + }); } Y_UNIT_TEST(Main_ColumnTable_KeyColumnLast) { - MainTestCase({ + MainTestCase().Run({ .TableDDL = R"( CREATE TABLE `%s` ( Message Utf8 NOT NULL, @@ -328,16 +353,16 @@ Y_UNIT_TEST_SUITE(Transfer) .Messages = {{"Message-1"}}, - .Expectations = { + .Expectations = {{ _C("Key", ui64(0)), _C("Message", TString("Message-1")), - } - }).Run(); + }} + }); } Y_UNIT_TEST(Main_ColumnTable_ComplexKey) { - MainTestCase({ + MainTestCase().Run({ .TableDDL = R"( CREATE TABLE `%s` ( Key1 Uint64 NOT NULL, @@ -369,20 +394,20 @@ Y_UNIT_TEST_SUITE(Transfer) .Messages = {{"Message-1"}}, - .Expectations = { + .Expectations = {{ _C("Key1", ui64(1)), _C("Key2", ui64(2)), _C("Key3", ui64(3)), _C("Key4", ui64(4)), _C("Value1", TString("value-1")), _C("Value2", TString("value-2")), - } - }).Run(); + }} + }); } Y_UNIT_TEST(Main_ColumnTable_JsonMessage) { - MainTestCase({ + MainTestCase().Run({ .TableDDL = R"( CREATE TABLE `%s` ( Id Uint64 NOT NULL, @@ -417,18 +442,18 @@ Y_UNIT_TEST_SUITE(Transfer) "salary": "123" })"}}, - .Expectations = { + .Expectations = {{ _C("Id", ui64(1)), _C("FirstName", TString("Vasya")), _C("LastName", TString("Pupkin")), _C("Salary", ui64(123)), - } - }).Run(); + }} + }); } Y_UNIT_TEST(Main_ColumnTable_NullableColumn) { - MainTestCase({ + MainTestCase().Run({ .TableDDL = R"( CREATE TABLE `%s` ( Key Uint64 NOT NULL, @@ -452,16 +477,16 @@ Y_UNIT_TEST_SUITE(Transfer) .Messages = {{"Message-1"}}, - .Expectations = { + .Expectations = {{ _C("Key", ui64(0)), _C("Message", TString("Message-1")), - } - }).Run(); + }} + }); } Y_UNIT_TEST(Main_ColumnTable_Date) { - MainTestCase({ + MainTestCase().Run({ .TableDDL = R"( CREATE TABLE `%s` ( Key Uint64 NOT NULL, @@ -485,16 +510,16 @@ Y_UNIT_TEST_SUITE(Transfer) .Messages = {{"2025-02-21"}}, - .Expectations = { + .Expectations = {{ _C("Key", ui64(0)), _C("Message", TInstant::ParseIso8601("2025-02-21")), - } - }).Run(); + }} + }); } Y_UNIT_TEST(Main_ColumnTable_Double) { - MainTestCase({ + MainTestCase().Run({ .TableDDL = R"( CREATE TABLE `%s` ( Key Uint64 NOT NULL, @@ -518,16 +543,16 @@ Y_UNIT_TEST_SUITE(Transfer) .Messages = {{"1.23"}}, - .Expectations = { + .Expectations = {{ _C("Key", ui64(0)), _C("Message", 1.23), - } - }).Run(); + }} + }); } Y_UNIT_TEST(Main_ColumnTable_Utf8_Long) { - MainTestCase({ + MainTestCase().Run({ .TableDDL = R"( CREATE TABLE `%s` ( Key Uint64 NOT NULL, @@ -551,16 +576,16 @@ Y_UNIT_TEST_SUITE(Transfer) .Messages = {{"Message-1 long value 0 1234567890 1 1234567890 2 1234567890 3 1234567890 4 1234567890 5 1234567890 6 1234567890"}}, - .Expectations = { + .Expectations = {{ _C("Key", ui64(0)), _C("Message", TString("Message-1 long value 0 1234567890 1 1234567890 2 1234567890 3 1234567890 4 1234567890 5 1234567890 6 1234567890")), - } - }).Run(); + }} + }); } Y_UNIT_TEST(Main_MessageField_Partition) { - MainTestCase({ + MainTestCase().Run({ .TableDDL = R"( CREATE TABLE `%s` ( Partition Uint32 NOT NULL, @@ -584,16 +609,16 @@ Y_UNIT_TEST_SUITE(Transfer) .Messages = {{"Message-1", 7}}, - .Expectations = { + .Expectations = {{ _C("Partition", ui32(7)), _C("Message", TString("Message-1")), - } - }).Run(); + }} + }); } Y_UNIT_TEST(Main_MessageField_SeqNo) { - MainTestCase({ + MainTestCase().Run({ .TableDDL = R"( CREATE TABLE `%s` ( SeqNo Uint64 NOT NULL, @@ -617,15 +642,15 @@ Y_UNIT_TEST_SUITE(Transfer) .Messages = {_withSeqNo(13)}, - .Expectations = { + .Expectations = {{ _C("SeqNo", ui64(13)), - } - }).Run(); + }} + }); } Y_UNIT_TEST(Main_MessageField_ProducerId) { - MainTestCase({ + MainTestCase().Run({ .TableDDL = R"( CREATE TABLE `%s` ( Offset Uint64 NOT NULL, @@ -649,15 +674,15 @@ Y_UNIT_TEST_SUITE(Transfer) .Messages = {_withProducerId("Producer-13")}, - .Expectations = { + .Expectations = {{ _C("ProducerId", TString("Producer-13")), - } - }).Run(); + }} + }); } Y_UNIT_TEST(Main_MessageField_MessageGroupId) { - MainTestCase({ + MainTestCase().Run({ .TableDDL = R"( CREATE TABLE `%s` ( Offset Uint64 NOT NULL, @@ -681,15 +706,15 @@ Y_UNIT_TEST_SUITE(Transfer) .Messages = {_withMessageGroupId("MessageGroupId-13")}, - .Expectations = { + .Expectations = {{ _C("MessageGroupId", TString("MessageGroupId-13")), - } - }).Run(); + }} + }); } Y_UNIT_TEST(AlterLambda) { - MainTestCase({ + MainTestCase().Run({ .TableDDL = R"( CREATE TABLE `%s` ( Key Uint64 NOT NULL, @@ -713,9 +738,9 @@ Y_UNIT_TEST_SUITE(Transfer) .Messages = {{"Message-1"}}, - .Expectations = { + .Expectations = {{ _C("Message", TString("Message-1 new lambda")), - }, + }}, .AlterLambdas = { R"( @@ -740,9 +765,8 @@ Y_UNIT_TEST_SUITE(Transfer) )", } - }).Run(); + }); } - } |