diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-09-12 10:07:23 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-09-12 10:25:21 +0300 |
commit | 0237e07bda5b67a822b92224e9fbc09bfce0430b (patch) | |
tree | 77a163d66d012535eb7d47a47b06e50a620e0bfc | |
parent | 00b007a9fcb5f5db8ea11be6e1ea5cd9a83a1e5a (diff) | |
download | ydb-0237e07bda5b67a822b92224e9fbc09bfce0430b.tar.gz |
Topic API SDK test framework
14 files changed, 322 insertions, 280 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp index 5c21c087915..88a82793116 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp @@ -1,6 +1,6 @@ #include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h> -#include <ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt index cd74c3f5100..282211a4b93 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt @@ -42,8 +42,9 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp ) set_property( TARGET diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt index 3361edb1ed7..be6ba2fbf35 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt @@ -45,8 +45,9 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp ) set_property( TARGET diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt index 6b37cbbbd11..74b2b0756cc 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt @@ -46,8 +46,9 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp ) set_property( TARGET diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt index ad9dd4e1b21..3c14b2644b1 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt @@ -35,8 +35,9 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp ) set_property( TARGET diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp index bdd0b0d16fd..4bb5f40c762 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp @@ -1,13 +1,14 @@ -#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include "ut_utils/managed_executor.h" +#include "ut_utils/topic_sdk_test_setup.h" +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h> -#include <ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h> - -#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h> #include <library/cpp/testing/unittest/registar.h> #include <library/cpp/testing/unittest/tests_data.h> @@ -18,8 +19,8 @@ namespace NYdb::NTopic::NTests { -void WriteAndReadToEndWithRestarts(NYdb::NTopic::TReadSessionSettings readSettings, NPersQueue::TWriteSessionSettings writeSettings, std::string message, ui32 count, std::shared_ptr<NPersQueue::NTests::TPersQueueYdbSdkTestSetup> setup, TIntrusivePtr<TManagedExecutor> decompressor) { - auto& client = setup->GetPersQueueClient(); +void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSessionSettings writeSettings, const std::string& message, ui32 count, TTopicSdkTestSetup& setup, TIntrusivePtr<TManagedExecutor> decompressor) { + auto client = setup.MakeClient(); auto session = client.CreateSimpleBlockingWriteSession(writeSettings); for (ui32 i = 1; i <= count; ++i) { @@ -29,10 +30,9 @@ void WriteAndReadToEndWithRestarts(NYdb::NTopic::TReadSessionSettings readSettin bool res = session->Close(TDuration::Seconds(10)); UNIT_ASSERT(res); - std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession; + std::shared_ptr<IReadSession> ReadSession; - // Create topic client. - NYdb::NTopic::TTopicClient topicClient(setup->GetDriver()); + TTopicClient topicClient = setup.MakeClient(); auto WaitTasks = [&](auto f, size_t c) { @@ -61,7 +61,7 @@ void WriteAndReadToEndWithRestarts(NYdb::NTopic::TReadSessionSettings readSettin WaitPlannedTasks(e, n); size_t completed = e->GetExecutedCount(); - setup->GetServer().KillTopicPqrbTablet(setup->GetTestTopicPath()); + setup.GetServer().KillTopicPqrbTablet(setup.GetTopicPath()); Sleep(TDuration::MilliSeconds(100)); e->StartFuncs(tasks); @@ -76,7 +76,7 @@ void WriteAndReadToEndWithRestarts(NYdb::NTopic::TReadSessionSettings readSettin auto f = checkedPromise.GetFuture(); readSettings.EventHandlers_.SimpleDataHandlers( [&] - (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) mutable { + (TReadSessionEvent::TDataReceivedEvent& ev) mutable { AtomicSet(lastOffset, ev.GetMessages().back().GetOffset()); Cerr << ">>> TEST: last offset = " << lastOffset << Endl; }); @@ -93,10 +93,10 @@ void WriteAndReadToEndWithRestarts(NYdb::NTopic::TReadSessionSettings readSettin Y_UNIT_TEST_SUITE(BasicUsage) { Y_UNIT_TEST(ConnectToYDB) { - auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); + TTopicSdkTestSetup setup(TEST_CASE_NAME); NYdb::TDriverConfig cfg; - cfg.SetEndpoint(TStringBuilder() << "invalid:" << setup->GetGrpcPort()); + cfg.SetEndpoint(TStringBuilder() << "invalid:" << setup.GetServer().GrpcPort); cfg.SetDatabase("/Invalid"); cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); auto driver = NYdb::TDriver(cfg); @@ -105,8 +105,8 @@ Y_UNIT_TEST_SUITE(BasicUsage) { TTopicClient client(driver); auto writeSettings = TWriteSessionSettings() - .Path(setup->GetTestTopic()) - .MessageGroupId("group_id") + .Path(TEST_TOPIC) + .MessageGroupId(TEST_MESSAGE_GROUP_ID) // TODO why retries? see LOGBROKER-8490 .RetryPolicy(IRetryPolicy::GetNoRetryPolicy()); auto writeSession = client.CreateWriteSession(writeSettings); @@ -118,13 +118,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) { { auto settings = TTopicClientSettings() .Database({"/Root"}) - .DiscoveryEndpoint({TStringBuilder() << "localhost:" << setup->GetGrpcPort()}); + .DiscoveryEndpoint({TStringBuilder() << "localhost:" << setup.GetServer().GrpcPort}); TTopicClient client(driver, settings); auto writeSettings = TWriteSessionSettings() - .Path(setup->GetTestTopic()) - .MessageGroupId("group_id") + .Path(TEST_TOPIC) + .MessageGroupId(TEST_MESSAGE_GROUP_ID) .RetryPolicy(IRetryPolicy::GetNoRetryPolicy()); auto writeSession = client.CreateWriteSession(writeSettings); @@ -135,22 +135,22 @@ Y_UNIT_TEST_SUITE(BasicUsage) { Y_UNIT_TEST(WriteRead) { - auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); - TTopicClient client(setup->GetDriver()); - + TTopicSdkTestSetup setup(TEST_CASE_NAME); + TTopicClient client = setup.MakeClient(); + { auto writeSettings = TWriteSessionSettings() - .Path(setup->GetTestTopic()) - .ProducerId(setup->GetTestMessageGroupId()) - .MessageGroupId(setup->GetTestMessageGroupId()); + .Path(TEST_TOPIC) + .ProducerId(TEST_MESSAGE_GROUP_ID) + .MessageGroupId(TEST_MESSAGE_GROUP_ID); auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings); UNIT_ASSERT(writeSession->Write("message_using_MessageGroupId")); writeSession->Close(); } { auto writeSettings = TWriteSessionSettings() - .Path(setup->GetTestTopic()) - .ProducerId(setup->GetTestMessageGroupId()) + .Path(TEST_TOPIC) + .ProducerId(TEST_MESSAGE_GROUP_ID) .PartitionId(0); auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings); UNIT_ASSERT(writeSession->Write("message_using_PartitionId")); @@ -159,8 +159,8 @@ Y_UNIT_TEST_SUITE(BasicUsage) { { auto readSettings = TReadSessionSettings() - .ConsumerName(setup->GetTestConsumer()) - .AppendTopics(setup->GetTestTopic()); + .ConsumerName(TEST_CONSUMER) + .AppendTopics(TEST_TOPIC); auto readSession = client.CreateReadSession(readSettings); auto event = readSession->GetEvent(true); @@ -183,23 +183,24 @@ Y_UNIT_TEST_SUITE(BasicUsage) { } Y_UNIT_TEST(ReadWithoutConsumerWithRestarts) { - auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); - auto compressor = new NPersQueue::TSyncExecutor(); + TTopicSdkTestSetup setup(TEST_CASE_NAME); + auto compressor = new TSyncExecutor(); auto decompressor = CreateThreadPoolManagedExecutor(1); - NYdb::NTopic::TReadSessionSettings readSettings; - NYdb::NTopic::TTopicReadSettings topic = setup->GetTestTopic(); + TReadSessionSettings readSettings; + TTopicReadSettings topic = TEST_TOPIC; topic.AppendPartitionIds(0); readSettings .WithoutConsumer() .MaxMemoryUsageBytes(1_MB) .DecompressionExecutor(decompressor) .AppendTopics(topic); - - NPersQueue::TWriteSessionSettings writeSettings; + + TWriteSessionSettings writeSettings; writeSettings - .Path(setup->GetTestTopic()).MessageGroupId("src_id") - .Codec(NPersQueue::ECodec::RAW) + .Path(TEST_TOPIC) + .MessageGroupId(TEST_MESSAGE_GROUP_ID) + .Codec(NTopic::ECodec::RAW) .CompressionExecutor(compressor); @@ -210,19 +211,19 @@ Y_UNIT_TEST_SUITE(BasicUsage) { } Y_UNIT_TEST(MaxByteSizeEqualZero) { - auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); - TTopicClient client(setup->GetDriver()); + TTopicSdkTestSetup setup(TEST_CASE_NAME); + TTopicClient client = setup.MakeClient(); auto writeSettings = TWriteSessionSettings() - .Path(setup->GetTestTopic()) - .MessageGroupId("group_id"); + .Path(TEST_TOPIC) + .MessageGroupId(TEST_MESSAGE_GROUP_ID); auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings); UNIT_ASSERT(writeSession->Write("message")); writeSession->Close(); auto readSettings = TReadSessionSettings() - .ConsumerName(setup->GetTestConsumer()) - .AppendTopics(setup->GetTestTopic()); + .ConsumerName(TEST_CONSUMER) + .AppendTopics(TEST_TOPIC); auto readSession = client.CreateReadSession(readSettings); auto event = readSession->GetEvent(true); @@ -246,7 +247,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); NPersQueue::TWriteSessionSettings writeSettings; - writeSettings.Path(setup->GetTestTopic()).MessageGroupId("src_id"); + writeSettings.Path(setup->GetTestTopic()).MessageGroupId(TEST_MESSAGE_GROUP_ID); writeSettings.Codec(NPersQueue::ECodec::RAW); NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor(); writeSettings.CompressionExecutor(executor); @@ -277,13 +278,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) { } session->Close(); - std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession; + std::shared_ptr<IReadSession> ReadSession; // Create topic client. NYdb::NTopic::TTopicClient topicClient(setup->GetDriver()); // Create read session. - NYdb::NTopic::TReadSessionSettings readSettings; + TReadSessionSettings readSettings; readSettings .ConsumerName(setup->GetTestConsumer()) .MaxMemoryUsageBytes(1_MB) @@ -299,7 +300,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { readSettings.EventHandlers_.SimpleDataHandlers( // [checkedPromise = std::move(checkedPromise), &check, &sentMessages, &totalReceived] [&] - (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) mutable { + (TReadSessionEvent::TDataReceivedEvent& ev) mutable { Y_VERIFY_S(AtomicGet(check) != 0, "check is false"); auto& messages = ev.GetMessages(); for (size_t i = 0u; i < messages.size(); ++i) { @@ -321,7 +322,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { UNIT_ASSERT(status.GetValueSync().IsSuccess()); auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true); - auto result = topicClient.DescribeConsumer("/Root/PQ/rt3.dc1--" + setup->GetTestTopic(), setup->GetTestConsumer(), describeConsumerSettings).GetValueSync(); + auto result = topicClient.DescribeConsumer(setup->GetTestTopicPath(), setup->GetTestConsumer(), describeConsumerSettings).GetValueSync(); UNIT_ASSERT(result.IsSuccess()); auto description = result.GetConsumerDescription(); @@ -333,21 +334,21 @@ Y_UNIT_TEST_SUITE(BasicUsage) { Y_UNIT_TEST(ReadWithRestarts) { - auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); - auto compressor = new NPersQueue::TSyncExecutor(); + TTopicSdkTestSetup setup(TEST_CASE_NAME); + auto compressor = new TSyncExecutor(); auto decompressor = CreateThreadPoolManagedExecutor(1); - NYdb::NTopic::TReadSessionSettings readSettings; + TReadSessionSettings readSettings; readSettings - .ConsumerName(setup->GetTestConsumer()) + .ConsumerName(TEST_CONSUMER) .MaxMemoryUsageBytes(1_MB) .DecompressionExecutor(decompressor) - .AppendTopics(setup->GetTestTopic()); + .AppendTopics(TEST_TOPIC); - NPersQueue::TWriteSessionSettings writeSettings; + TWriteSessionSettings writeSettings; writeSettings - .Path(setup->GetTestTopic()).MessageGroupId("src_id") - .Codec(NPersQueue::ECodec::RAW) + .Path(TEST_TOPIC).MessageGroupId(TEST_MESSAGE_GROUP_ID) + .Codec(NTopic::ECodec::RAW) .CompressionExecutor(compressor); @@ -358,13 +359,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) { } Y_UNIT_TEST(SessionNotDestroyedWhileCompressionInFlight) { - auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); + TTopicSdkTestSetup setup(TEST_CASE_NAME); // controlled executor auto stepByStepExecutor = CreateThreadPoolManagedExecutor(1); // Create topic client. - NYdb::NTopic::TTopicClient topicClient(setup->GetDriver()); + TTopicClient topicClient = setup.MakeClient(); NThreading::TPromise<void> promiseToWrite = NThreading::NewPromise<void>(); auto futureWrite = promiseToWrite.GetFuture(); @@ -372,18 +373,18 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NThreading::TPromise<void> promiseToRead = NThreading::NewPromise<void>(); auto futureRead = promiseToRead.GetFuture(); - NYdb::NTopic::TWriteSessionSettings writeSettings; - writeSettings.Path(setup->GetTestTopic()) - .MessageGroupId("src_id") - .ProducerId("src_id") + TWriteSessionSettings writeSettings; + writeSettings.Path(TEST_TOPIC) + .MessageGroupId(TEST_MESSAGE_GROUP_ID) + .ProducerId(TEST_MESSAGE_GROUP_ID) .CompressionExecutor(stepByStepExecutor); // Create read session. - NYdb::NTopic::TReadSessionSettings readSettings; + TReadSessionSettings readSettings; readSettings - .ConsumerName(setup->GetTestConsumer()) + .ConsumerName(TEST_CONSUMER) .MaxMemoryUsageBytes(1_MB) - .AppendTopics(setup->GetTestTopic()) + .AppendTopics(TEST_TOPIC) .DecompressionExecutor(stepByStepExecutor); auto f = std::async(std::launch::async, @@ -405,7 +406,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto future = promise.GetFuture(); readSettings.EventHandlers_.SimpleDataHandlers( - [promise = std::move(promise)](NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) mutable { + [promise = std::move(promise)](TReadSessionEvent::TDataReceivedEvent& ev) mutable { ev.Commit(); promise.SetValue(); Cerr << ">>>TEST: get read event " << Endl; @@ -465,13 +466,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) { } Y_UNIT_TEST(SessionNotDestroyedWhileUserEventHandlingInFlight) { - auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); + TTopicSdkTestSetup setup(TEST_CASE_NAME); // controlled executor auto stepByStepExecutor = CreateThreadPoolManagedExecutor(1); // Create topic client. - NYdb::NTopic::TTopicClient topicClient(setup->GetDriver()); + TTopicClient topicClient = setup.MakeClient(); // NThreading::TPromise<void> promiseToWrite = NThreading::NewPromise<void>(); // auto futureWrite = promiseToWrite.GetFuture(); @@ -480,9 +481,9 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto futureRead = promiseToRead.GetFuture(); auto writeSettings = TWriteSessionSettings() - .Path(setup->GetTestTopic()) - .MessageGroupId("src_id") - .ProducerId("src_id"); + .Path(TEST_TOPIC) + .MessageGroupId(TEST_MESSAGE_GROUP_ID) + .ProducerId(TEST_MESSAGE_GROUP_ID); auto writeSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings); std::string message(2'000, 'x'); @@ -495,9 +496,9 @@ Y_UNIT_TEST_SUITE(BasicUsage) { // Create read session. auto readSettings = TReadSessionSettings() - .ConsumerName(setup->GetTestConsumer()) + .ConsumerName(TEST_CONSUMER) .MaxMemoryUsageBytes(1_MB) - .AppendTopics(setup->GetTestTopic()); + .AppendTopics(TEST_TOPIC); readSettings.EventHandlers_ .HandlersExecutor(stepByStepExecutor); @@ -531,7 +532,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto future = promise.GetFuture(); readSettings.EventHandlers_.SimpleDataHandlers( - [promise = std::move(promise)](NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) mutable { + [promise = std::move(promise)](TReadSessionEvent::TDataReceivedEvent& ev) mutable { Cerr << ">>>TEST: in SimpleDataHandlers " << Endl; ev.Commit(); promise.SetValue(); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp index 559ea706ffb..e3aaee33e4e 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp @@ -1,9 +1,9 @@ +#include "ut_utils/topic_sdk_test_setup.h" + #include <ydb/library/persqueue/topic_parser_public/topic_parser.h> #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> -#include <ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h> -#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h> #include <library/cpp/testing/unittest/registar.h> #include <library/cpp/testing/unittest/tests_data.h> @@ -16,14 +16,14 @@ namespace NYdb::NTopic::NTests { Y_UNIT_TEST_SUITE(Describe) { - void DescribeTopic(NPersQueue::NTests::TPersQueueYdbSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets) + void DescribeTopic(TTopicSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets) { TDescribeTopicSettings settings; settings.IncludeStats(requireStats); settings.IncludeLocation(requireLocation); { - auto result = client.DescribeTopic(setup.GetTestTopicPath(), settings).GetValueSync(); + auto result = client.DescribeTopic(TEST_TOPIC, settings).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); const auto& description = result.GetTopicDescription(); @@ -63,9 +63,9 @@ namespace NYdb::NTopic::NTests { if (killTablets) { - setup.GetServer().KillTopicPqTablets(setup.GetTestTopicPath()); + setup.GetServer().KillTopicPqTablets(setup.GetTopicPath()); - auto result = client.DescribeTopic(setup.GetTestTopicPath(), settings).GetValueSync(); + auto result = client.DescribeTopic(TEST_TOPIC, settings).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); const auto& description = result.GetTopicDescription(); @@ -104,14 +104,14 @@ namespace NYdb::NTopic::NTests { } } - void DescribeConsumer(NPersQueue::NTests::TPersQueueYdbSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets) + void DescribeConsumer(TTopicSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets) { TDescribeConsumerSettings settings; settings.IncludeStats(requireStats); settings.IncludeLocation(requireLocation); { - auto result = client.DescribeConsumer(setup.GetTestTopicPath(), ::NPersQueue::SDKTestSetup::GetTestConsumer(), settings).GetValueSync(); + auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, settings).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); const auto& description = result.GetConsumerDescription(); @@ -162,9 +162,9 @@ namespace NYdb::NTopic::NTests { if (killTablets) { - setup.GetServer().KillTopicPqTablets(setup.GetTestTopicPath()); + setup.GetServer().KillTopicPqTablets(setup.GetTopicPath()); - auto result = client.DescribeConsumer(setup.GetTestTopicPath(), ::NPersQueue::SDKTestSetup::GetTestConsumer(), settings).GetValueSync(); + auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, settings).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); const auto& description = result.GetConsumerDescription(); @@ -186,7 +186,7 @@ namespace NYdb::NTopic::NTests { } } - void DescribePartition(NPersQueue::NTests::TPersQueueYdbSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets) + void DescribePartition(TTopicSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets) { TDescribePartitionSettings settings; settings.IncludeStats(requireStats); @@ -195,7 +195,7 @@ namespace NYdb::NTopic::NTests { i64 testPartitionId = 0; { - auto result = client.DescribePartition(setup.GetTestTopicPath(), testPartitionId, settings).GetValueSync(); + auto result = client.DescribePartition(TEST_TOPIC, testPartitionId, settings).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); const auto& description = result.GetPartitionDescription(); @@ -235,9 +235,9 @@ namespace NYdb::NTopic::NTests { if (killTablets) { - setup.GetServer().KillTopicPqTablets(setup.GetTestTopicPath()); + setup.GetServer().KillTopicPqTablets(setup.GetTopicPath()); - auto result = client.DescribePartition(setup.GetTestTopicPath(), testPartitionId, settings).GetValueSync(); + auto result = client.DescribePartition(TEST_TOPIC, testPartitionId, settings).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); const auto& description = result.GetPartitionDescription(); @@ -257,8 +257,8 @@ namespace NYdb::NTopic::NTests { } Y_UNIT_TEST(Basic) { - NPersQueue::NTests::TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME); - TTopicClient client(setup.GetDriver()); + TTopicSdkTestSetup setup(TEST_CASE_NAME); + TTopicClient client = setup.MakeClient(); DescribeTopic(setup, client, false, false, false, false); DescribeConsumer(setup, client, false, false, false, false); @@ -266,8 +266,8 @@ namespace NYdb::NTopic::NTests { } Y_UNIT_TEST(Statistics) { - NPersQueue::NTests::TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME); - TTopicClient client(setup.GetDriver()); + TTopicSdkTestSetup setup(TEST_CASE_NAME); + TTopicClient client = setup.MakeClient(); // Get empty description DescribeTopic(setup, client, true, false, false, false); @@ -276,7 +276,7 @@ namespace NYdb::NTopic::NTests { // Write a message { - auto writeSettings = TWriteSessionSettings().Path(setup.GetTestTopicPath()).MessageGroupId(::NPersQueue::SDKTestSetup::GetTestMessageGroupId()); + auto writeSettings = TWriteSessionSettings().Path(TEST_TOPIC).MessageGroupId(TEST_MESSAGE_GROUP_ID); auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings); std::string message(10_KB, 'x'); UNIT_ASSERT(writeSession->Write(message)); @@ -285,9 +285,7 @@ namespace NYdb::NTopic::NTests { // Read a message { - auto readSettings = TReadSessionSettings() - .ConsumerName(setup.GetTestConsumer()) - .AppendTopics(setup.GetTestTopicPath()); + auto readSettings = TReadSessionSettings().ConsumerName(TEST_CONSUMER).AppendTopics(TEST_TOPIC); auto readSession = client.CreateReadSession(readSettings); // Event 1: start partition session @@ -328,8 +326,8 @@ namespace NYdb::NTopic::NTests { } Y_UNIT_TEST(Location) { - NPersQueue::NTests::TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME); - TTopicClient client(setup.GetDriver()); + TTopicSdkTestSetup setup(TEST_CASE_NAME); + TTopicClient client = setup.MakeClient(); DescribeTopic(setup, client, false, false, true, false); DescribeConsumer(setup, client, false, false, true, false); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp index 96b10a84645..d99bb01a08a 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp @@ -1,3 +1,7 @@ +#include "ut_utils/topic_sdk_test_setup.h" + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h> + #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> @@ -5,8 +9,6 @@ #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h> -#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h> - #include <library/cpp/testing/unittest/registar.h> #include <library/cpp/testing/unittest/tests_data.h> @@ -18,49 +20,49 @@ using namespace NYdb::NPersQueue::NTests; namespace NYdb::NTopic::NTests { Y_UNIT_TEST_SUITE(LocalPartition) { - std::shared_ptr<TPersQueueYdbSdkTestSetup> CreateSetup(TString testCaseName, ui32 nodeCount = 1) { - return std::make_shared<TPersQueueYdbSdkTestSetup>(testCaseName, true, ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::PRI_DEBUG, nodeCount, 1); + std::shared_ptr<TTopicSdkTestSetup> CreateSetup(const TString& testCaseName, ui32 nodeCount = 1) { + NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings(); + settings.SetNodeCount(nodeCount); + return std::make_shared<TTopicSdkTestSetup>(testCaseName, settings); } - NYdb::TDriverConfig CreateConfig(TString discoveryAddr) + NYdb::TDriverConfig CreateConfig(const TTopicSdkTestSetup& setup, TString discoveryAddr) { - return NYdb::TDriverConfig() - .SetEndpoint(discoveryAddr) - .SetDatabase("/Root") - .SetAuthToken("root@builtin") - .SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + NYdb::TDriverConfig config = setup.MakeDriverConfig(); + config.SetEndpoint(discoveryAddr); + return config; } - TWriteSessionSettings CreateWriteSessionSettings(const TPersQueueYdbSdkTestSetup& setup) + TWriteSessionSettings CreateWriteSessionSettings() { return TWriteSessionSettings() - .Path(setup.GetTestTopicPath()) - .ProducerId(setup.GetTestMessageGroupId()) + .Path(TEST_TOPIC) + .ProducerId(TEST_MESSAGE_GROUP_ID) .PartitionId(0) .DirectWriteToPartition(true); } - TReadSessionSettings CreateReadSessionSettings(const TPersQueueYdbSdkTestSetup& setup) + TReadSessionSettings CreateReadSessionSettings() { return TReadSessionSettings() - .ConsumerName(setup.GetTestConsumer()) - .AppendTopics(setup.GetTestTopic()); + .ConsumerName(TEST_CONSUMER) + .AppendTopics(TEST_TOPIC); } - void WriteMessage(TPersQueueYdbSdkTestSetup& setup, TTopicClient& client) + void WriteMessage(TTopicClient& client) { Cerr << "=== Write message" << Endl; - auto writeSession = client.CreateSimpleBlockingWriteSession(CreateWriteSessionSettings(setup)); + auto writeSession = client.CreateSimpleBlockingWriteSession(CreateWriteSessionSettings()); UNIT_ASSERT(writeSession->Write("message")); writeSession->Close(); } - void ReadMessage(TPersQueueYdbSdkTestSetup& setup, TTopicClient& client, ui64 expectedCommitedOffset = 1) + void ReadMessage(TTopicClient& client, ui64 expectedCommitedOffset = 1) { Cerr << "=== Read message" << Endl; - auto readSession = client.CreateReadSession(CreateReadSessionSettings(setup)); + auto readSession = client.CreateReadSession(CreateReadSessionSettings()); TMaybe<TReadSessionEvent::TEvent> event = readSession->GetEvent(true); UNIT_ASSERT(event); @@ -105,10 +107,10 @@ namespace NYdb::NTopic::NTests { Server = ::NYdb::NTopic::NTests::NTestSuiteLocalPartition::StartGrpcServer(DiscoveryAddr, *this); } - void SetGoodEndpoints(TPersQueueYdbSdkTestSetup& setup) + void SetGoodEndpoints(TTopicSdkTestSetup& setup) { Cerr << "=== TMockDiscovery set good endpoint nodes " << Endl; - SetEndpoints(setup.GetRuntime().GetNodeId(0), setup.GetRuntime().GetNodeCount(), setup.GetGrpcPort()); + SetEndpoints(setup.GetRuntime().GetNodeId(0), setup.GetRuntime().GetNodeCount(), setup.GetServer().GrpcPort); } void SetEndpoints(ui32 firstNodeId, ui32 nodeCount, ui16 port) @@ -185,7 +187,7 @@ namespace NYdb::NTopic::NTests { auto Start(TString testCaseName, std::shared_ptr<TMockDiscoveryService> mockDiscoveryService = {}) { struct Result { - std::shared_ptr<TPersQueueYdbSdkTestSetup> Setup; + std::shared_ptr<TTopicSdkTestSetup> Setup; std::shared_ptr<TTopicClient> Client; std::shared_ptr<TMockDiscoveryService> MockDiscoveryService; }; @@ -198,7 +200,7 @@ namespace NYdb::NTopic::NTests { mockDiscoveryService->SetGoodEndpoints(*setup); } - TDriver driver(CreateConfig(mockDiscoveryService->GetDiscoveryAddr())); + TDriver driver(CreateConfig(*setup, mockDiscoveryService->GetDiscoveryAddr())); auto client = std::make_shared<TTopicClient>(driver); @@ -208,8 +210,8 @@ namespace NYdb::NTopic::NTests { Y_UNIT_TEST(Basic) { auto [setup, client, discovery] = Start(TEST_CASE_NAME); - WriteMessage(*setup, *client); - ReadMessage(*setup, *client); + WriteMessage(*client); + ReadMessage(*client); } Y_UNIT_TEST(Restarts) { @@ -217,9 +219,9 @@ namespace NYdb::NTopic::NTests { for (size_t i = 1; i <= 10; ++i) { Cerr << "=== Restart attempt " << i << Endl; - setup->GetServer().KillTopicPqTablets(setup->GetTestTopicPath()); - WriteMessage(*setup, *client); - ReadMessage(*setup, *client, i); + setup->GetServer().KillTopicPqTablets(setup->GetTopicPath()); + WriteMessage(*client); + ReadMessage(*client, i); } } @@ -233,7 +235,7 @@ namespace NYdb::NTopic::NTests { auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>(); // Set non-existing partition - auto writeSettings = CreateWriteSessionSettings(*setup); + auto writeSettings = CreateWriteSessionSettings(); writeSettings.RetryPolicy(retryPolicy); writeSettings.PartitionId(1); @@ -241,7 +243,7 @@ namespace NYdb::NTopic::NTests { retryPolicy->ExpectBreakDown(); Cerr << "=== Create write session\n"; - TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr()))); + TTopicClient client(TDriver(CreateConfig(*setup, discovery.GetDiscoveryAddr()))); auto writeSession = client.CreateWriteSession(writeSettings); Cerr << "=== Wait for retries\n"; @@ -250,7 +252,7 @@ namespace NYdb::NTopic::NTests { Cerr << "=== Alter partition count\n"; TAlterTopicSettings alterSettings; alterSettings.AlterPartitioningSettings(2, 2); - auto alterResult = client.AlterTopic(setup->GetTestTopicPath(), alterSettings).GetValueSync(); + auto alterResult = client.AlterTopic(setup->GetTopicPath(), alterSettings).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); Cerr << "=== Wait for repair\n"; @@ -268,14 +270,14 @@ namespace NYdb::NTopic::NTests { auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>(); - auto writeSettings = CreateWriteSessionSettings(*setup); + auto writeSettings = CreateWriteSessionSettings(); writeSettings.RetryPolicy(retryPolicy); retryPolicy->Initialize(); retryPolicy->ExpectBreakDown(); Cerr << "=== Create write session\n"; - TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr()))); + TTopicClient client(TDriver(CreateConfig(*setup, discovery.GetDiscoveryAddr()))); auto writeSession = client.CreateWriteSession(writeSettings); Cerr << "=== Wait for retries\n"; @@ -294,18 +296,18 @@ namespace NYdb::NTopic::NTests { auto setup = CreateSetup(TEST_CASE_NAME); TMockDiscoveryService discovery; - discovery.SetEndpoints(9999, setup->GetRuntime().GetNodeCount(), setup->GetGrpcPort()); + discovery.SetEndpoints(9999, setup->GetRuntime().GetNodeCount(), setup->GetServer().GrpcPort); auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>(); - auto writeSettings = CreateWriteSessionSettings(*setup); + auto writeSettings = CreateWriteSessionSettings(); writeSettings.RetryPolicy(retryPolicy); retryPolicy->Initialize(); retryPolicy->ExpectBreakDown(); Cerr << "=== Create write session\n"; - TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr()))); + TTopicClient client(TDriver(CreateConfig(*setup, discovery.GetDiscoveryAddr()))); auto writeSession = client.CreateWriteSession(writeSettings); Cerr << "=== Wait for retries\n"; @@ -328,14 +330,14 @@ namespace NYdb::NTopic::NTests { auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>(TDuration::Days(1)); - auto writeSettings = CreateWriteSessionSettings(*setup); + auto writeSettings = CreateWriteSessionSettings(); writeSettings.RetryPolicy(retryPolicy); retryPolicy->Initialize(); retryPolicy->ExpectBreakDown(); Cerr << "=== Create write session\n"; - TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr()))); + TTopicClient client(TDriver(CreateConfig(*setup, discovery.GetDiscoveryAddr()))); auto writeSession = client.CreateWriteSession(writeSettings); Cerr << "=== Close write session\n"; @@ -350,8 +352,8 @@ namespace NYdb::NTopic::NTests { discovery.SetDelay(TDuration::Days(1)); Cerr << "=== Create write session\n"; - TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr()))); - auto writeSession = client.CreateWriteSession(CreateWriteSessionSettings(*setup)); + TTopicClient client(TDriver(CreateConfig(*setup, discovery.GetDiscoveryAddr()))); + auto writeSession = client.CreateWriteSession(CreateWriteSessionSettings()); Cerr << "=== Close write session\n"; writeSession->Close(); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index dc2f938d60d..7e708ac7279 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -1,3 +1,5 @@ +#include "ut_utils/topic_sdk_test_setup.h" + #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h> @@ -10,101 +12,6 @@ namespace NYdb::NTopic::NTests { Y_UNIT_TEST_SUITE(TxUsage) { -NKikimr::Tests::TServerSettings MakeServerSettings() -{ - auto loggerInitializer = [](TTestActorRuntime& runtime) { - runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NActors::NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::PQ_WRITE_PROXY, NActors::NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::PQ_MIRRORER, NActors::NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NActors::NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, NActors::NLog::PRI_DEBUG); - }; - - auto settings = PQSettings(0); - settings.SetDomainName("Root"); - settings.SetEnableTopicServiceTx(true); - settings.PQConfig.SetTopicsAreFirstClassCitizen(true); - settings.PQConfig.SetRoot("/Root"); - settings.PQConfig.SetDatabase("/Root"); - settings.SetLoggerInitializer(loggerInitializer); - - return settings; -} - -class TEnvironment { -public: - TEnvironment(); - - void CreateTopic(const TString& path, const TString& consumer); - - TString GetEndpoint() const; - TString GetTopicPath(const TString& name) const; - TString GetTopicParent() const; - TString GetDatabase() const; - - const TDriver& GetDriver(); - -private: - TString Database; - ::NPersQueue::TTestServer Server; - TMaybe<TDriver> Driver; -}; - -TEnvironment::TEnvironment() : - Database("/Root"), - Server(MakeServerSettings(), false) -{ - Server.StartServer(true, GetDatabase()); -} - -void TEnvironment::CreateTopic(const TString& path, const TString& consumer) -{ - NTopic::TTopicClient client(GetDriver()); - - NTopic::TCreateTopicSettings topics; - NTopic::TConsumerSettings<NTopic::TCreateTopicSettings> consumers(topics, consumer); - topics.AppendConsumers(consumers); - - auto status = client.CreateTopic(path, topics).GetValueSync(); - UNIT_ASSERT(status.IsSuccess()); -} - -TString TEnvironment::GetEndpoint() const -{ - return "localhost:" + ToString(Server.GrpcPort); -} - -TString TEnvironment::GetTopicPath(const TString& name) const -{ - return GetTopicParent() + "/" + name; -} - -TString TEnvironment::GetTopicParent() const -{ - return GetDatabase(); -} - -TString TEnvironment::GetDatabase() const -{ - return Database; -} - -const TDriver& TEnvironment::GetDriver() -{ - if (!Driver) { - TDriverConfig config; - config.SetEndpoint(GetEndpoint()); - config.SetDatabase(GetDatabase()); - config.SetAuthToken("root@builtin"); - config.SetLog(MakeHolder<TStreamLogBackend>(&Cerr)); - - Driver.ConstructInPlace(config); - } - - return *Driver; -} - class TFixture : public NUnitTest::TBaseFixture { protected: void SetUp(NUnitTest::TTestContext&) override; @@ -126,29 +33,25 @@ protected: void WriteMessage(const TString& data); protected: - const TDriver& GetDriver(); - - TString GetTopicPath() const; - TString GetMessageGroupId() const; + const TDriver& GetDriver() const; private: - TString GetTopicName() const; - TString GetConsumerName() const; - template<class E> E ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx); template<class E> E ReadEvent(TTopicReadSessionPtr reader); - std::shared_ptr<TEnvironment> Env; - TMaybe<TDriver> Driver; + std::unique_ptr<TTopicSdkTestSetup> Setup; + std::unique_ptr<TDriver> Driver; }; void TFixture::SetUp(NUnitTest::TTestContext&) { - Env = std::make_shared<TEnvironment>(); + NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings(); + settings.SetEnableTopicServiceTx(true); + Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings); - Env->CreateTopic(GetTopicPath(), GetConsumerName()); + Driver = std::make_unique<TDriver>(Setup->MakeDriver()); } NTable::TSession TFixture::CreateSession() @@ -174,8 +77,8 @@ auto TFixture::CreateReader() -> TTopicReadSessionPtr { NTopic::TTopicClient client(GetDriver()); TReadSessionSettings options; - options.ConsumerName(GetConsumerName()); - options.AppendTopics(GetTopicPath()); + options.ConsumerName(TEST_CONSUMER); + options.AppendTopics(TEST_TOPIC); return client.CreateReadSession(options); } @@ -231,8 +134,8 @@ E TFixture::ReadEvent(TTopicReadSessionPtr reader) void TFixture::WriteMessage(const TString& data) { NTopic::TWriteSessionSettings options; - options.Path(GetTopicPath()); - options.MessageGroupId(GetMessageGroupId()); + options.Path(TEST_TOPIC); + options.MessageGroupId(TEST_MESSAGE_GROUP_ID); NTopic::TTopicClient client(GetDriver()); auto session = client.CreateSimpleBlockingWriteSession(options); @@ -240,29 +143,9 @@ void TFixture::WriteMessage(const TString& data) session->Close(); } -TString TFixture::GetTopicPath() const -{ - return Env->GetTopicPath(GetTopicName()); -} - -TString TFixture::GetTopicName() const -{ - return "my-topic"; -} - -TString TFixture::GetConsumerName() const +const TDriver& TFixture::GetDriver() const { - return "my-consumer"; -} - -TString TFixture::GetMessageGroupId() const -{ - return "my-message-group"; -} - -const TDriver& TFixture::GetDriver() -{ - return Env->GetDriver(); + return *Driver; } Y_UNIT_TEST_F(SessionAbort, TFixture) @@ -333,8 +216,8 @@ Y_UNIT_TEST_F(TwoSessionOneConsumer, TFixture) Y_UNIT_TEST_F(WriteToTopic, TFixture) { NTopic::TWriteSessionSettings options; - options.Path(GetTopicPath()); - options.MessageGroupId(GetMessageGroupId()); + options.Path(TEST_TOPIC); + options.MessageGroupId(TEST_MESSAGE_GROUP_ID); auto session = CreateSession(); auto tx = BeginTx(session); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp index d9826e7a08e..d6b074c3582 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp @@ -1,4 +1,4 @@ -#include <ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h> +#include "managed_executor.h" namespace NYdb::NTopic::NTests { diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.h index d32c0ff8817..d32c0ff8817 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.h diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp new file mode 100644 index 00000000000..c82589773f9 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -0,0 +1,110 @@ +#include "topic_sdk_test_setup.h" + +using namespace NYdb; +using namespace NYdb::NTopic; +using namespace NYdb::NTopic::NTests; + +TTopicSdkTestSetup::TTopicSdkTestSetup(const TString& testCaseName, const NKikimr::Tests::TServerSettings& settings, bool createTopic) + : Database("/Root") + , Server(settings, false) +{ + Log.SetFormatter([testCaseName](ELogPriority priority, TStringBuf message) { + return TStringBuilder() << TInstant::Now() << " :" << testCaseName << " " << priority << ": " << message << Endl; + }); + + Server.StartServer(true, GetDatabase()); + + Log << "TTopicSdkTestSetup started"; + + if (createTopic) { + CreateTopic(); + Log << "Topic created"; + } +} + +void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consumer) +{ + TTopicClient client(MakeDriver()); + + TCreateTopicSettings topics; + TConsumerSettings<TCreateTopicSettings> consumers(topics, consumer); + topics.AppendConsumers(consumers); + + auto status = client.CreateTopic(path, topics).GetValueSync(); + UNIT_ASSERT(status.IsSuccess()); + + Server.WaitInit(path); +} + +TString TTopicSdkTestSetup::GetEndpoint() const { + return "localhost:" + ToString(Server.GrpcPort); +} + +TString TTopicSdkTestSetup::GetTopicPath(const TString& name) const { + return GetTopicParent() + "/" + name; +} + +TString TTopicSdkTestSetup::GetTopicParent() const { + return GetDatabase(); +} + +TString TTopicSdkTestSetup::GetDatabase() const { + return Database; +} + +::NPersQueue::TTestServer& TTopicSdkTestSetup::GetServer() { + return Server; +} + +NActors::TTestActorRuntime& TTopicSdkTestSetup::GetRuntime() { + return *Server.CleverServer->GetRuntime(); +} + +TLog& TTopicSdkTestSetup::GetLog() { + return Log; +} + +TDriverConfig TTopicSdkTestSetup::MakeDriverConfig() const +{ + TDriverConfig config; + config.SetEndpoint(GetEndpoint()); + config.SetDatabase(GetDatabase()); + config.SetAuthToken("root@builtin"); + config.SetLog(MakeHolder<TStreamLogBackend>(&Cerr)); + return config; +} + +NKikimr::Tests::TServerSettings TTopicSdkTestSetup::MakeServerSettings() +{ + auto settings = NKikimr::NPersQueueTests::PQSettings(0); + settings.SetDomainName("Root"); + settings.SetNodeCount(1); + settings.PQConfig.SetTopicsAreFirstClassCitizen(true); + settings.PQConfig.SetRoot("/Root"); + settings.PQConfig.SetDatabase("/Root"); + + settings.SetLoggerInitializer([](NActors::TTestActorRuntime& runtime) { + runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NActors::NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::PQ_WRITE_PROXY, NActors::NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::PQ_MIRRORER, NActors::NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NActors::NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, NActors::NLog::PRI_DEBUG); + }); + + return settings; +} + +TDriver TTopicSdkTestSetup::MakeDriver() const { + return MakeDriver(MakeDriverConfig()); +} + +TDriver TTopicSdkTestSetup::MakeDriver(const TDriverConfig& config) const +{ + return TDriver(config); +} + +TTopicClient TTopicSdkTestSetup::MakeClient() const +{ + return TTopicClient(MakeDriver()); +}
\ No newline at end of file diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h new file mode 100644 index 00000000000..a9014a34377 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h @@ -0,0 +1,44 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h> + +#include <ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h> + +namespace NYdb::NTopic::NTests { + +#define TEST_CASE_NAME (this->Name_) + +inline static const TString TEST_TOPIC = "test-topic"; +inline static const TString TEST_CONSUMER = "test-consumer"; +inline static const TString TEST_MESSAGE_GROUP_ID = "test-message_group_id"; + +class TTopicSdkTestSetup { +public: + TTopicSdkTestSetup(const TString& testCaseName, const NKikimr::Tests::TServerSettings& settings = MakeServerSettings(), bool createTopic = true); + + void CreateTopic(const TString& path = TEST_TOPIC, const TString& consumer = TEST_CONSUMER); + + TString GetEndpoint() const; + TString GetTopicPath(const TString& name = TEST_TOPIC) const; + TString GetTopicParent() const; + TString GetDatabase() const; + + ::NPersQueue::TTestServer& GetServer(); + NActors::TTestActorRuntime& GetRuntime(); + TLog& GetLog(); + + TTopicClient MakeClient() const; + + TDriver MakeDriver() const; + TDriver MakeDriver(const TDriverConfig& config) const; + + TDriverConfig MakeDriverConfig() const; + static NKikimr::Tests::TServerSettings MakeServerSettings(); +private: + TString Database; + ::NPersQueue::TTestServer Server; + + TLog Log = CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG); +}; + +}
\ No newline at end of file diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make b/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make index 35f2ffbf019..69c5f51707f 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make @@ -32,9 +32,9 @@ SRCS( basic_usage_ut.cpp describe_topic_ut.cpp local_partition_ut.cpp - managed_executor.h - managed_executor.cpp topic_to_table_ut.cpp + ut_utils/managed_executor.cpp + ut_utils/topic_sdk_test_setup.cpp ) END() |