aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-09-12 10:07:23 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-09-12 10:25:21 +0300
commit0237e07bda5b67a822b92224e9fbc09bfce0430b (patch)
tree77a163d66d012535eb7d47a47b06e50a620e0bfc
parent00b007a9fcb5f5db8ea11be6e1ea5cd9a83a1e5a (diff)
downloadydb-0237e07bda5b67a822b92224e9fbc09bfce0430b.tar.gz
Topic API SDK test framework
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp147
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp46
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp84
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp151
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp (renamed from ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp)2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.h (renamed from ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h)0
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp110
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h44
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make4
14 files changed, 322 insertions, 280 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
index 5c21c087915..88a82793116 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
@@ -1,6 +1,6 @@
#include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h>
-#include <ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt
index cd74c3f5100..282211a4b93 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt
@@ -42,8 +42,9 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
)
set_property(
TARGET
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt
index 3361edb1ed7..be6ba2fbf35 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt
@@ -45,8 +45,9 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
)
set_property(
TARGET
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt
index 6b37cbbbd11..74b2b0756cc 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt
@@ -46,8 +46,9 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
)
set_property(
TARGET
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt
index ad9dd4e1b21..3c14b2644b1 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt
@@ -35,8 +35,9 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
)
set_property(
TARGET
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
index bdd0b0d16fd..4bb5f40c762 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
@@ -1,13 +1,14 @@
-#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+#include "ut_utils/managed_executor.h"
+#include "ut_utils/topic_sdk_test_setup.h"
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h>
-#include <ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h>
-
-#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h>
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/testing/unittest/tests_data.h>
@@ -18,8 +19,8 @@
namespace NYdb::NTopic::NTests {
-void WriteAndReadToEndWithRestarts(NYdb::NTopic::TReadSessionSettings readSettings, NPersQueue::TWriteSessionSettings writeSettings, std::string message, ui32 count, std::shared_ptr<NPersQueue::NTests::TPersQueueYdbSdkTestSetup> setup, TIntrusivePtr<TManagedExecutor> decompressor) {
- auto& client = setup->GetPersQueueClient();
+void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSessionSettings writeSettings, const std::string& message, ui32 count, TTopicSdkTestSetup& setup, TIntrusivePtr<TManagedExecutor> decompressor) {
+ auto client = setup.MakeClient();
auto session = client.CreateSimpleBlockingWriteSession(writeSettings);
for (ui32 i = 1; i <= count; ++i) {
@@ -29,10 +30,9 @@ void WriteAndReadToEndWithRestarts(NYdb::NTopic::TReadSessionSettings readSettin
bool res = session->Close(TDuration::Seconds(10));
UNIT_ASSERT(res);
- std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
+ std::shared_ptr<IReadSession> ReadSession;
- // Create topic client.
- NYdb::NTopic::TTopicClient topicClient(setup->GetDriver());
+ TTopicClient topicClient = setup.MakeClient();
auto WaitTasks = [&](auto f, size_t c) {
@@ -61,7 +61,7 @@ void WriteAndReadToEndWithRestarts(NYdb::NTopic::TReadSessionSettings readSettin
WaitPlannedTasks(e, n);
size_t completed = e->GetExecutedCount();
- setup->GetServer().KillTopicPqrbTablet(setup->GetTestTopicPath());
+ setup.GetServer().KillTopicPqrbTablet(setup.GetTopicPath());
Sleep(TDuration::MilliSeconds(100));
e->StartFuncs(tasks);
@@ -76,7 +76,7 @@ void WriteAndReadToEndWithRestarts(NYdb::NTopic::TReadSessionSettings readSettin
auto f = checkedPromise.GetFuture();
readSettings.EventHandlers_.SimpleDataHandlers(
[&]
- (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) mutable {
+ (TReadSessionEvent::TDataReceivedEvent& ev) mutable {
AtomicSet(lastOffset, ev.GetMessages().back().GetOffset());
Cerr << ">>> TEST: last offset = " << lastOffset << Endl;
});
@@ -93,10 +93,10 @@ void WriteAndReadToEndWithRestarts(NYdb::NTopic::TReadSessionSettings readSettin
Y_UNIT_TEST_SUITE(BasicUsage) {
Y_UNIT_TEST(ConnectToYDB) {
- auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
+ TTopicSdkTestSetup setup(TEST_CASE_NAME);
NYdb::TDriverConfig cfg;
- cfg.SetEndpoint(TStringBuilder() << "invalid:" << setup->GetGrpcPort());
+ cfg.SetEndpoint(TStringBuilder() << "invalid:" << setup.GetServer().GrpcPort);
cfg.SetDatabase("/Invalid");
cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG));
auto driver = NYdb::TDriver(cfg);
@@ -105,8 +105,8 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
TTopicClient client(driver);
auto writeSettings = TWriteSessionSettings()
- .Path(setup->GetTestTopic())
- .MessageGroupId("group_id")
+ .Path(TEST_TOPIC)
+ .MessageGroupId(TEST_MESSAGE_GROUP_ID)
// TODO why retries? see LOGBROKER-8490
.RetryPolicy(IRetryPolicy::GetNoRetryPolicy());
auto writeSession = client.CreateWriteSession(writeSettings);
@@ -118,13 +118,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
{
auto settings = TTopicClientSettings()
.Database({"/Root"})
- .DiscoveryEndpoint({TStringBuilder() << "localhost:" << setup->GetGrpcPort()});
+ .DiscoveryEndpoint({TStringBuilder() << "localhost:" << setup.GetServer().GrpcPort});
TTopicClient client(driver, settings);
auto writeSettings = TWriteSessionSettings()
- .Path(setup->GetTestTopic())
- .MessageGroupId("group_id")
+ .Path(TEST_TOPIC)
+ .MessageGroupId(TEST_MESSAGE_GROUP_ID)
.RetryPolicy(IRetryPolicy::GetNoRetryPolicy());
auto writeSession = client.CreateWriteSession(writeSettings);
@@ -135,22 +135,22 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
Y_UNIT_TEST(WriteRead) {
- auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
- TTopicClient client(setup->GetDriver());
-
+ TTopicSdkTestSetup setup(TEST_CASE_NAME);
+ TTopicClient client = setup.MakeClient();
+
{
auto writeSettings = TWriteSessionSettings()
- .Path(setup->GetTestTopic())
- .ProducerId(setup->GetTestMessageGroupId())
- .MessageGroupId(setup->GetTestMessageGroupId());
+ .Path(TEST_TOPIC)
+ .ProducerId(TEST_MESSAGE_GROUP_ID)
+ .MessageGroupId(TEST_MESSAGE_GROUP_ID);
auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings);
UNIT_ASSERT(writeSession->Write("message_using_MessageGroupId"));
writeSession->Close();
}
{
auto writeSettings = TWriteSessionSettings()
- .Path(setup->GetTestTopic())
- .ProducerId(setup->GetTestMessageGroupId())
+ .Path(TEST_TOPIC)
+ .ProducerId(TEST_MESSAGE_GROUP_ID)
.PartitionId(0);
auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings);
UNIT_ASSERT(writeSession->Write("message_using_PartitionId"));
@@ -159,8 +159,8 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
{
auto readSettings = TReadSessionSettings()
- .ConsumerName(setup->GetTestConsumer())
- .AppendTopics(setup->GetTestTopic());
+ .ConsumerName(TEST_CONSUMER)
+ .AppendTopics(TEST_TOPIC);
auto readSession = client.CreateReadSession(readSettings);
auto event = readSession->GetEvent(true);
@@ -183,23 +183,24 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
}
Y_UNIT_TEST(ReadWithoutConsumerWithRestarts) {
- auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
- auto compressor = new NPersQueue::TSyncExecutor();
+ TTopicSdkTestSetup setup(TEST_CASE_NAME);
+ auto compressor = new TSyncExecutor();
auto decompressor = CreateThreadPoolManagedExecutor(1);
- NYdb::NTopic::TReadSessionSettings readSettings;
- NYdb::NTopic::TTopicReadSettings topic = setup->GetTestTopic();
+ TReadSessionSettings readSettings;
+ TTopicReadSettings topic = TEST_TOPIC;
topic.AppendPartitionIds(0);
readSettings
.WithoutConsumer()
.MaxMemoryUsageBytes(1_MB)
.DecompressionExecutor(decompressor)
.AppendTopics(topic);
-
- NPersQueue::TWriteSessionSettings writeSettings;
+
+ TWriteSessionSettings writeSettings;
writeSettings
- .Path(setup->GetTestTopic()).MessageGroupId("src_id")
- .Codec(NPersQueue::ECodec::RAW)
+ .Path(TEST_TOPIC)
+ .MessageGroupId(TEST_MESSAGE_GROUP_ID)
+ .Codec(NTopic::ECodec::RAW)
.CompressionExecutor(compressor);
@@ -210,19 +211,19 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
}
Y_UNIT_TEST(MaxByteSizeEqualZero) {
- auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
- TTopicClient client(setup->GetDriver());
+ TTopicSdkTestSetup setup(TEST_CASE_NAME);
+ TTopicClient client = setup.MakeClient();
auto writeSettings = TWriteSessionSettings()
- .Path(setup->GetTestTopic())
- .MessageGroupId("group_id");
+ .Path(TEST_TOPIC)
+ .MessageGroupId(TEST_MESSAGE_GROUP_ID);
auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings);
UNIT_ASSERT(writeSession->Write("message"));
writeSession->Close();
auto readSettings = TReadSessionSettings()
- .ConsumerName(setup->GetTestConsumer())
- .AppendTopics(setup->GetTestTopic());
+ .ConsumerName(TEST_CONSUMER)
+ .AppendTopics(TEST_TOPIC);
auto readSession = client.CreateReadSession(readSettings);
auto event = readSession->GetEvent(true);
@@ -246,7 +247,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
NPersQueue::TWriteSessionSettings writeSettings;
- writeSettings.Path(setup->GetTestTopic()).MessageGroupId("src_id");
+ writeSettings.Path(setup->GetTestTopic()).MessageGroupId(TEST_MESSAGE_GROUP_ID);
writeSettings.Codec(NPersQueue::ECodec::RAW);
NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor();
writeSettings.CompressionExecutor(executor);
@@ -277,13 +278,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
}
session->Close();
- std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
+ std::shared_ptr<IReadSession> ReadSession;
// Create topic client.
NYdb::NTopic::TTopicClient topicClient(setup->GetDriver());
// Create read session.
- NYdb::NTopic::TReadSessionSettings readSettings;
+ TReadSessionSettings readSettings;
readSettings
.ConsumerName(setup->GetTestConsumer())
.MaxMemoryUsageBytes(1_MB)
@@ -299,7 +300,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
readSettings.EventHandlers_.SimpleDataHandlers(
// [checkedPromise = std::move(checkedPromise), &check, &sentMessages, &totalReceived]
[&]
- (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) mutable {
+ (TReadSessionEvent::TDataReceivedEvent& ev) mutable {
Y_VERIFY_S(AtomicGet(check) != 0, "check is false");
auto& messages = ev.GetMessages();
for (size_t i = 0u; i < messages.size(); ++i) {
@@ -321,7 +322,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
UNIT_ASSERT(status.GetValueSync().IsSuccess());
auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
- auto result = topicClient.DescribeConsumer("/Root/PQ/rt3.dc1--" + setup->GetTestTopic(), setup->GetTestConsumer(), describeConsumerSettings).GetValueSync();
+ auto result = topicClient.DescribeConsumer(setup->GetTestTopicPath(), setup->GetTestConsumer(), describeConsumerSettings).GetValueSync();
UNIT_ASSERT(result.IsSuccess());
auto description = result.GetConsumerDescription();
@@ -333,21 +334,21 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
Y_UNIT_TEST(ReadWithRestarts) {
- auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
- auto compressor = new NPersQueue::TSyncExecutor();
+ TTopicSdkTestSetup setup(TEST_CASE_NAME);
+ auto compressor = new TSyncExecutor();
auto decompressor = CreateThreadPoolManagedExecutor(1);
- NYdb::NTopic::TReadSessionSettings readSettings;
+ TReadSessionSettings readSettings;
readSettings
- .ConsumerName(setup->GetTestConsumer())
+ .ConsumerName(TEST_CONSUMER)
.MaxMemoryUsageBytes(1_MB)
.DecompressionExecutor(decompressor)
- .AppendTopics(setup->GetTestTopic());
+ .AppendTopics(TEST_TOPIC);
- NPersQueue::TWriteSessionSettings writeSettings;
+ TWriteSessionSettings writeSettings;
writeSettings
- .Path(setup->GetTestTopic()).MessageGroupId("src_id")
- .Codec(NPersQueue::ECodec::RAW)
+ .Path(TEST_TOPIC).MessageGroupId(TEST_MESSAGE_GROUP_ID)
+ .Codec(NTopic::ECodec::RAW)
.CompressionExecutor(compressor);
@@ -358,13 +359,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
}
Y_UNIT_TEST(SessionNotDestroyedWhileCompressionInFlight) {
- auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
+ TTopicSdkTestSetup setup(TEST_CASE_NAME);
// controlled executor
auto stepByStepExecutor = CreateThreadPoolManagedExecutor(1);
// Create topic client.
- NYdb::NTopic::TTopicClient topicClient(setup->GetDriver());
+ TTopicClient topicClient = setup.MakeClient();
NThreading::TPromise<void> promiseToWrite = NThreading::NewPromise<void>();
auto futureWrite = promiseToWrite.GetFuture();
@@ -372,18 +373,18 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
NThreading::TPromise<void> promiseToRead = NThreading::NewPromise<void>();
auto futureRead = promiseToRead.GetFuture();
- NYdb::NTopic::TWriteSessionSettings writeSettings;
- writeSettings.Path(setup->GetTestTopic())
- .MessageGroupId("src_id")
- .ProducerId("src_id")
+ TWriteSessionSettings writeSettings;
+ writeSettings.Path(TEST_TOPIC)
+ .MessageGroupId(TEST_MESSAGE_GROUP_ID)
+ .ProducerId(TEST_MESSAGE_GROUP_ID)
.CompressionExecutor(stepByStepExecutor);
// Create read session.
- NYdb::NTopic::TReadSessionSettings readSettings;
+ TReadSessionSettings readSettings;
readSettings
- .ConsumerName(setup->GetTestConsumer())
+ .ConsumerName(TEST_CONSUMER)
.MaxMemoryUsageBytes(1_MB)
- .AppendTopics(setup->GetTestTopic())
+ .AppendTopics(TEST_TOPIC)
.DecompressionExecutor(stepByStepExecutor);
auto f = std::async(std::launch::async,
@@ -405,7 +406,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
auto future = promise.GetFuture();
readSettings.EventHandlers_.SimpleDataHandlers(
- [promise = std::move(promise)](NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) mutable {
+ [promise = std::move(promise)](TReadSessionEvent::TDataReceivedEvent& ev) mutable {
ev.Commit();
promise.SetValue();
Cerr << ">>>TEST: get read event " << Endl;
@@ -465,13 +466,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
}
Y_UNIT_TEST(SessionNotDestroyedWhileUserEventHandlingInFlight) {
- auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
+ TTopicSdkTestSetup setup(TEST_CASE_NAME);
// controlled executor
auto stepByStepExecutor = CreateThreadPoolManagedExecutor(1);
// Create topic client.
- NYdb::NTopic::TTopicClient topicClient(setup->GetDriver());
+ TTopicClient topicClient = setup.MakeClient();
// NThreading::TPromise<void> promiseToWrite = NThreading::NewPromise<void>();
// auto futureWrite = promiseToWrite.GetFuture();
@@ -480,9 +481,9 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
auto futureRead = promiseToRead.GetFuture();
auto writeSettings = TWriteSessionSettings()
- .Path(setup->GetTestTopic())
- .MessageGroupId("src_id")
- .ProducerId("src_id");
+ .Path(TEST_TOPIC)
+ .MessageGroupId(TEST_MESSAGE_GROUP_ID)
+ .ProducerId(TEST_MESSAGE_GROUP_ID);
auto writeSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings);
std::string message(2'000, 'x');
@@ -495,9 +496,9 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
// Create read session.
auto readSettings = TReadSessionSettings()
- .ConsumerName(setup->GetTestConsumer())
+ .ConsumerName(TEST_CONSUMER)
.MaxMemoryUsageBytes(1_MB)
- .AppendTopics(setup->GetTestTopic());
+ .AppendTopics(TEST_TOPIC);
readSettings.EventHandlers_
.HandlersExecutor(stepByStepExecutor);
@@ -531,7 +532,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
auto future = promise.GetFuture();
readSettings.EventHandlers_.SimpleDataHandlers(
- [promise = std::move(promise)](NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) mutable {
+ [promise = std::move(promise)](TReadSessionEvent::TDataReceivedEvent& ev) mutable {
Cerr << ">>>TEST: in SimpleDataHandlers " << Endl;
ev.Commit();
promise.SetValue();
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
index 559ea706ffb..e3aaee33e4e 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
@@ -1,9 +1,9 @@
+#include "ut_utils/topic_sdk_test_setup.h"
+
#include <ydb/library/persqueue/topic_parser_public/topic_parser.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
-#include <ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h>
-#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h>
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/testing/unittest/tests_data.h>
@@ -16,14 +16,14 @@ namespace NYdb::NTopic::NTests {
Y_UNIT_TEST_SUITE(Describe) {
- void DescribeTopic(NPersQueue::NTests::TPersQueueYdbSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets)
+ void DescribeTopic(TTopicSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets)
{
TDescribeTopicSettings settings;
settings.IncludeStats(requireStats);
settings.IncludeLocation(requireLocation);
{
- auto result = client.DescribeTopic(setup.GetTestTopicPath(), settings).GetValueSync();
+ auto result = client.DescribeTopic(TEST_TOPIC, settings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
const auto& description = result.GetTopicDescription();
@@ -63,9 +63,9 @@ namespace NYdb::NTopic::NTests {
if (killTablets)
{
- setup.GetServer().KillTopicPqTablets(setup.GetTestTopicPath());
+ setup.GetServer().KillTopicPqTablets(setup.GetTopicPath());
- auto result = client.DescribeTopic(setup.GetTestTopicPath(), settings).GetValueSync();
+ auto result = client.DescribeTopic(TEST_TOPIC, settings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
const auto& description = result.GetTopicDescription();
@@ -104,14 +104,14 @@ namespace NYdb::NTopic::NTests {
}
}
- void DescribeConsumer(NPersQueue::NTests::TPersQueueYdbSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets)
+ void DescribeConsumer(TTopicSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets)
{
TDescribeConsumerSettings settings;
settings.IncludeStats(requireStats);
settings.IncludeLocation(requireLocation);
{
- auto result = client.DescribeConsumer(setup.GetTestTopicPath(), ::NPersQueue::SDKTestSetup::GetTestConsumer(), settings).GetValueSync();
+ auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, settings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
const auto& description = result.GetConsumerDescription();
@@ -162,9 +162,9 @@ namespace NYdb::NTopic::NTests {
if (killTablets)
{
- setup.GetServer().KillTopicPqTablets(setup.GetTestTopicPath());
+ setup.GetServer().KillTopicPqTablets(setup.GetTopicPath());
- auto result = client.DescribeConsumer(setup.GetTestTopicPath(), ::NPersQueue::SDKTestSetup::GetTestConsumer(), settings).GetValueSync();
+ auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, settings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
const auto& description = result.GetConsumerDescription();
@@ -186,7 +186,7 @@ namespace NYdb::NTopic::NTests {
}
}
- void DescribePartition(NPersQueue::NTests::TPersQueueYdbSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets)
+ void DescribePartition(TTopicSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets)
{
TDescribePartitionSettings settings;
settings.IncludeStats(requireStats);
@@ -195,7 +195,7 @@ namespace NYdb::NTopic::NTests {
i64 testPartitionId = 0;
{
- auto result = client.DescribePartition(setup.GetTestTopicPath(), testPartitionId, settings).GetValueSync();
+ auto result = client.DescribePartition(TEST_TOPIC, testPartitionId, settings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
const auto& description = result.GetPartitionDescription();
@@ -235,9 +235,9 @@ namespace NYdb::NTopic::NTests {
if (killTablets)
{
- setup.GetServer().KillTopicPqTablets(setup.GetTestTopicPath());
+ setup.GetServer().KillTopicPqTablets(setup.GetTopicPath());
- auto result = client.DescribePartition(setup.GetTestTopicPath(), testPartitionId, settings).GetValueSync();
+ auto result = client.DescribePartition(TEST_TOPIC, testPartitionId, settings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
const auto& description = result.GetPartitionDescription();
@@ -257,8 +257,8 @@ namespace NYdb::NTopic::NTests {
}
Y_UNIT_TEST(Basic) {
- NPersQueue::NTests::TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME);
- TTopicClient client(setup.GetDriver());
+ TTopicSdkTestSetup setup(TEST_CASE_NAME);
+ TTopicClient client = setup.MakeClient();
DescribeTopic(setup, client, false, false, false, false);
DescribeConsumer(setup, client, false, false, false, false);
@@ -266,8 +266,8 @@ namespace NYdb::NTopic::NTests {
}
Y_UNIT_TEST(Statistics) {
- NPersQueue::NTests::TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME);
- TTopicClient client(setup.GetDriver());
+ TTopicSdkTestSetup setup(TEST_CASE_NAME);
+ TTopicClient client = setup.MakeClient();
// Get empty description
DescribeTopic(setup, client, true, false, false, false);
@@ -276,7 +276,7 @@ namespace NYdb::NTopic::NTests {
// Write a message
{
- auto writeSettings = TWriteSessionSettings().Path(setup.GetTestTopicPath()).MessageGroupId(::NPersQueue::SDKTestSetup::GetTestMessageGroupId());
+ auto writeSettings = TWriteSessionSettings().Path(TEST_TOPIC).MessageGroupId(TEST_MESSAGE_GROUP_ID);
auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings);
std::string message(10_KB, 'x');
UNIT_ASSERT(writeSession->Write(message));
@@ -285,9 +285,7 @@ namespace NYdb::NTopic::NTests {
// Read a message
{
- auto readSettings = TReadSessionSettings()
- .ConsumerName(setup.GetTestConsumer())
- .AppendTopics(setup.GetTestTopicPath());
+ auto readSettings = TReadSessionSettings().ConsumerName(TEST_CONSUMER).AppendTopics(TEST_TOPIC);
auto readSession = client.CreateReadSession(readSettings);
// Event 1: start partition session
@@ -328,8 +326,8 @@ namespace NYdb::NTopic::NTests {
}
Y_UNIT_TEST(Location) {
- NPersQueue::NTests::TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME);
- TTopicClient client(setup.GetDriver());
+ TTopicSdkTestSetup setup(TEST_CASE_NAME);
+ TTopicClient client = setup.MakeClient();
DescribeTopic(setup, client, false, false, true, false);
DescribeConsumer(setup, client, false, false, true, false);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp
index 96b10a84645..d99bb01a08a 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp
@@ -1,3 +1,7 @@
+#include "ut_utils/topic_sdk_test_setup.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h>
+
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
@@ -5,8 +9,6 @@
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h>
-#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h>
-
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/testing/unittest/tests_data.h>
@@ -18,49 +20,49 @@ using namespace NYdb::NPersQueue::NTests;
namespace NYdb::NTopic::NTests {
Y_UNIT_TEST_SUITE(LocalPartition) {
- std::shared_ptr<TPersQueueYdbSdkTestSetup> CreateSetup(TString testCaseName, ui32 nodeCount = 1) {
- return std::make_shared<TPersQueueYdbSdkTestSetup>(testCaseName, true, ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::PRI_DEBUG, nodeCount, 1);
+ std::shared_ptr<TTopicSdkTestSetup> CreateSetup(const TString& testCaseName, ui32 nodeCount = 1) {
+ NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings();
+ settings.SetNodeCount(nodeCount);
+ return std::make_shared<TTopicSdkTestSetup>(testCaseName, settings);
}
- NYdb::TDriverConfig CreateConfig(TString discoveryAddr)
+ NYdb::TDriverConfig CreateConfig(const TTopicSdkTestSetup& setup, TString discoveryAddr)
{
- return NYdb::TDriverConfig()
- .SetEndpoint(discoveryAddr)
- .SetDatabase("/Root")
- .SetAuthToken("root@builtin")
- .SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG));
+ NYdb::TDriverConfig config = setup.MakeDriverConfig();
+ config.SetEndpoint(discoveryAddr);
+ return config;
}
- TWriteSessionSettings CreateWriteSessionSettings(const TPersQueueYdbSdkTestSetup& setup)
+ TWriteSessionSettings CreateWriteSessionSettings()
{
return TWriteSessionSettings()
- .Path(setup.GetTestTopicPath())
- .ProducerId(setup.GetTestMessageGroupId())
+ .Path(TEST_TOPIC)
+ .ProducerId(TEST_MESSAGE_GROUP_ID)
.PartitionId(0)
.DirectWriteToPartition(true);
}
- TReadSessionSettings CreateReadSessionSettings(const TPersQueueYdbSdkTestSetup& setup)
+ TReadSessionSettings CreateReadSessionSettings()
{
return TReadSessionSettings()
- .ConsumerName(setup.GetTestConsumer())
- .AppendTopics(setup.GetTestTopic());
+ .ConsumerName(TEST_CONSUMER)
+ .AppendTopics(TEST_TOPIC);
}
- void WriteMessage(TPersQueueYdbSdkTestSetup& setup, TTopicClient& client)
+ void WriteMessage(TTopicClient& client)
{
Cerr << "=== Write message" << Endl;
- auto writeSession = client.CreateSimpleBlockingWriteSession(CreateWriteSessionSettings(setup));
+ auto writeSession = client.CreateSimpleBlockingWriteSession(CreateWriteSessionSettings());
UNIT_ASSERT(writeSession->Write("message"));
writeSession->Close();
}
- void ReadMessage(TPersQueueYdbSdkTestSetup& setup, TTopicClient& client, ui64 expectedCommitedOffset = 1)
+ void ReadMessage(TTopicClient& client, ui64 expectedCommitedOffset = 1)
{
Cerr << "=== Read message" << Endl;
- auto readSession = client.CreateReadSession(CreateReadSessionSettings(setup));
+ auto readSession = client.CreateReadSession(CreateReadSessionSettings());
TMaybe<TReadSessionEvent::TEvent> event = readSession->GetEvent(true);
UNIT_ASSERT(event);
@@ -105,10 +107,10 @@ namespace NYdb::NTopic::NTests {
Server = ::NYdb::NTopic::NTests::NTestSuiteLocalPartition::StartGrpcServer(DiscoveryAddr, *this);
}
- void SetGoodEndpoints(TPersQueueYdbSdkTestSetup& setup)
+ void SetGoodEndpoints(TTopicSdkTestSetup& setup)
{
Cerr << "=== TMockDiscovery set good endpoint nodes " << Endl;
- SetEndpoints(setup.GetRuntime().GetNodeId(0), setup.GetRuntime().GetNodeCount(), setup.GetGrpcPort());
+ SetEndpoints(setup.GetRuntime().GetNodeId(0), setup.GetRuntime().GetNodeCount(), setup.GetServer().GrpcPort);
}
void SetEndpoints(ui32 firstNodeId, ui32 nodeCount, ui16 port)
@@ -185,7 +187,7 @@ namespace NYdb::NTopic::NTests {
auto Start(TString testCaseName, std::shared_ptr<TMockDiscoveryService> mockDiscoveryService = {})
{
struct Result {
- std::shared_ptr<TPersQueueYdbSdkTestSetup> Setup;
+ std::shared_ptr<TTopicSdkTestSetup> Setup;
std::shared_ptr<TTopicClient> Client;
std::shared_ptr<TMockDiscoveryService> MockDiscoveryService;
};
@@ -198,7 +200,7 @@ namespace NYdb::NTopic::NTests {
mockDiscoveryService->SetGoodEndpoints(*setup);
}
- TDriver driver(CreateConfig(mockDiscoveryService->GetDiscoveryAddr()));
+ TDriver driver(CreateConfig(*setup, mockDiscoveryService->GetDiscoveryAddr()));
auto client = std::make_shared<TTopicClient>(driver);
@@ -208,8 +210,8 @@ namespace NYdb::NTopic::NTests {
Y_UNIT_TEST(Basic) {
auto [setup, client, discovery] = Start(TEST_CASE_NAME);
- WriteMessage(*setup, *client);
- ReadMessage(*setup, *client);
+ WriteMessage(*client);
+ ReadMessage(*client);
}
Y_UNIT_TEST(Restarts) {
@@ -217,9 +219,9 @@ namespace NYdb::NTopic::NTests {
for (size_t i = 1; i <= 10; ++i) {
Cerr << "=== Restart attempt " << i << Endl;
- setup->GetServer().KillTopicPqTablets(setup->GetTestTopicPath());
- WriteMessage(*setup, *client);
- ReadMessage(*setup, *client, i);
+ setup->GetServer().KillTopicPqTablets(setup->GetTopicPath());
+ WriteMessage(*client);
+ ReadMessage(*client, i);
}
}
@@ -233,7 +235,7 @@ namespace NYdb::NTopic::NTests {
auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>();
// Set non-existing partition
- auto writeSettings = CreateWriteSessionSettings(*setup);
+ auto writeSettings = CreateWriteSessionSettings();
writeSettings.RetryPolicy(retryPolicy);
writeSettings.PartitionId(1);
@@ -241,7 +243,7 @@ namespace NYdb::NTopic::NTests {
retryPolicy->ExpectBreakDown();
Cerr << "=== Create write session\n";
- TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr())));
+ TTopicClient client(TDriver(CreateConfig(*setup, discovery.GetDiscoveryAddr())));
auto writeSession = client.CreateWriteSession(writeSettings);
Cerr << "=== Wait for retries\n";
@@ -250,7 +252,7 @@ namespace NYdb::NTopic::NTests {
Cerr << "=== Alter partition count\n";
TAlterTopicSettings alterSettings;
alterSettings.AlterPartitioningSettings(2, 2);
- auto alterResult = client.AlterTopic(setup->GetTestTopicPath(), alterSettings).GetValueSync();
+ auto alterResult = client.AlterTopic(setup->GetTopicPath(), alterSettings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
Cerr << "=== Wait for repair\n";
@@ -268,14 +270,14 @@ namespace NYdb::NTopic::NTests {
auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>();
- auto writeSettings = CreateWriteSessionSettings(*setup);
+ auto writeSettings = CreateWriteSessionSettings();
writeSettings.RetryPolicy(retryPolicy);
retryPolicy->Initialize();
retryPolicy->ExpectBreakDown();
Cerr << "=== Create write session\n";
- TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr())));
+ TTopicClient client(TDriver(CreateConfig(*setup, discovery.GetDiscoveryAddr())));
auto writeSession = client.CreateWriteSession(writeSettings);
Cerr << "=== Wait for retries\n";
@@ -294,18 +296,18 @@ namespace NYdb::NTopic::NTests {
auto setup = CreateSetup(TEST_CASE_NAME);
TMockDiscoveryService discovery;
- discovery.SetEndpoints(9999, setup->GetRuntime().GetNodeCount(), setup->GetGrpcPort());
+ discovery.SetEndpoints(9999, setup->GetRuntime().GetNodeCount(), setup->GetServer().GrpcPort);
auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>();
- auto writeSettings = CreateWriteSessionSettings(*setup);
+ auto writeSettings = CreateWriteSessionSettings();
writeSettings.RetryPolicy(retryPolicy);
retryPolicy->Initialize();
retryPolicy->ExpectBreakDown();
Cerr << "=== Create write session\n";
- TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr())));
+ TTopicClient client(TDriver(CreateConfig(*setup, discovery.GetDiscoveryAddr())));
auto writeSession = client.CreateWriteSession(writeSettings);
Cerr << "=== Wait for retries\n";
@@ -328,14 +330,14 @@ namespace NYdb::NTopic::NTests {
auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>(TDuration::Days(1));
- auto writeSettings = CreateWriteSessionSettings(*setup);
+ auto writeSettings = CreateWriteSessionSettings();
writeSettings.RetryPolicy(retryPolicy);
retryPolicy->Initialize();
retryPolicy->ExpectBreakDown();
Cerr << "=== Create write session\n";
- TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr())));
+ TTopicClient client(TDriver(CreateConfig(*setup, discovery.GetDiscoveryAddr())));
auto writeSession = client.CreateWriteSession(writeSettings);
Cerr << "=== Close write session\n";
@@ -350,8 +352,8 @@ namespace NYdb::NTopic::NTests {
discovery.SetDelay(TDuration::Days(1));
Cerr << "=== Create write session\n";
- TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr())));
- auto writeSession = client.CreateWriteSession(CreateWriteSessionSettings(*setup));
+ TTopicClient client(TDriver(CreateConfig(*setup, discovery.GetDiscoveryAddr())));
+ auto writeSession = client.CreateWriteSession(CreateWriteSessionSettings());
Cerr << "=== Close write session\n";
writeSession->Close();
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
index dc2f938d60d..7e708ac7279 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
@@ -1,3 +1,5 @@
+#include "ut_utils/topic_sdk_test_setup.h"
+
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h>
@@ -10,101 +12,6 @@ namespace NYdb::NTopic::NTests {
Y_UNIT_TEST_SUITE(TxUsage) {
-NKikimr::Tests::TServerSettings MakeServerSettings()
-{
- auto loggerInitializer = [](TTestActorRuntime& runtime) {
- runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NActors::NLog::PRI_DEBUG);
- runtime.SetLogPriority(NKikimrServices::PQ_WRITE_PROXY, NActors::NLog::PRI_DEBUG);
- runtime.SetLogPriority(NKikimrServices::PQ_MIRRORER, NActors::NLog::PRI_DEBUG);
- runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NActors::NLog::PRI_DEBUG);
- runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_DEBUG);
- runtime.SetLogPriority(NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, NActors::NLog::PRI_DEBUG);
- };
-
- auto settings = PQSettings(0);
- settings.SetDomainName("Root");
- settings.SetEnableTopicServiceTx(true);
- settings.PQConfig.SetTopicsAreFirstClassCitizen(true);
- settings.PQConfig.SetRoot("/Root");
- settings.PQConfig.SetDatabase("/Root");
- settings.SetLoggerInitializer(loggerInitializer);
-
- return settings;
-}
-
-class TEnvironment {
-public:
- TEnvironment();
-
- void CreateTopic(const TString& path, const TString& consumer);
-
- TString GetEndpoint() const;
- TString GetTopicPath(const TString& name) const;
- TString GetTopicParent() const;
- TString GetDatabase() const;
-
- const TDriver& GetDriver();
-
-private:
- TString Database;
- ::NPersQueue::TTestServer Server;
- TMaybe<TDriver> Driver;
-};
-
-TEnvironment::TEnvironment() :
- Database("/Root"),
- Server(MakeServerSettings(), false)
-{
- Server.StartServer(true, GetDatabase());
-}
-
-void TEnvironment::CreateTopic(const TString& path, const TString& consumer)
-{
- NTopic::TTopicClient client(GetDriver());
-
- NTopic::TCreateTopicSettings topics;
- NTopic::TConsumerSettings<NTopic::TCreateTopicSettings> consumers(topics, consumer);
- topics.AppendConsumers(consumers);
-
- auto status = client.CreateTopic(path, topics).GetValueSync();
- UNIT_ASSERT(status.IsSuccess());
-}
-
-TString TEnvironment::GetEndpoint() const
-{
- return "localhost:" + ToString(Server.GrpcPort);
-}
-
-TString TEnvironment::GetTopicPath(const TString& name) const
-{
- return GetTopicParent() + "/" + name;
-}
-
-TString TEnvironment::GetTopicParent() const
-{
- return GetDatabase();
-}
-
-TString TEnvironment::GetDatabase() const
-{
- return Database;
-}
-
-const TDriver& TEnvironment::GetDriver()
-{
- if (!Driver) {
- TDriverConfig config;
- config.SetEndpoint(GetEndpoint());
- config.SetDatabase(GetDatabase());
- config.SetAuthToken("root@builtin");
- config.SetLog(MakeHolder<TStreamLogBackend>(&Cerr));
-
- Driver.ConstructInPlace(config);
- }
-
- return *Driver;
-}
-
class TFixture : public NUnitTest::TBaseFixture {
protected:
void SetUp(NUnitTest::TTestContext&) override;
@@ -126,29 +33,25 @@ protected:
void WriteMessage(const TString& data);
protected:
- const TDriver& GetDriver();
-
- TString GetTopicPath() const;
- TString GetMessageGroupId() const;
+ const TDriver& GetDriver() const;
private:
- TString GetTopicName() const;
- TString GetConsumerName() const;
-
template<class E>
E ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx);
template<class E>
E ReadEvent(TTopicReadSessionPtr reader);
- std::shared_ptr<TEnvironment> Env;
- TMaybe<TDriver> Driver;
+ std::unique_ptr<TTopicSdkTestSetup> Setup;
+ std::unique_ptr<TDriver> Driver;
};
void TFixture::SetUp(NUnitTest::TTestContext&)
{
- Env = std::make_shared<TEnvironment>();
+ NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings();
+ settings.SetEnableTopicServiceTx(true);
+ Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);
- Env->CreateTopic(GetTopicPath(), GetConsumerName());
+ Driver = std::make_unique<TDriver>(Setup->MakeDriver());
}
NTable::TSession TFixture::CreateSession()
@@ -174,8 +77,8 @@ auto TFixture::CreateReader() -> TTopicReadSessionPtr
{
NTopic::TTopicClient client(GetDriver());
TReadSessionSettings options;
- options.ConsumerName(GetConsumerName());
- options.AppendTopics(GetTopicPath());
+ options.ConsumerName(TEST_CONSUMER);
+ options.AppendTopics(TEST_TOPIC);
return client.CreateReadSession(options);
}
@@ -231,8 +134,8 @@ E TFixture::ReadEvent(TTopicReadSessionPtr reader)
void TFixture::WriteMessage(const TString& data)
{
NTopic::TWriteSessionSettings options;
- options.Path(GetTopicPath());
- options.MessageGroupId(GetMessageGroupId());
+ options.Path(TEST_TOPIC);
+ options.MessageGroupId(TEST_MESSAGE_GROUP_ID);
NTopic::TTopicClient client(GetDriver());
auto session = client.CreateSimpleBlockingWriteSession(options);
@@ -240,29 +143,9 @@ void TFixture::WriteMessage(const TString& data)
session->Close();
}
-TString TFixture::GetTopicPath() const
-{
- return Env->GetTopicPath(GetTopicName());
-}
-
-TString TFixture::GetTopicName() const
-{
- return "my-topic";
-}
-
-TString TFixture::GetConsumerName() const
+const TDriver& TFixture::GetDriver() const
{
- return "my-consumer";
-}
-
-TString TFixture::GetMessageGroupId() const
-{
- return "my-message-group";
-}
-
-const TDriver& TFixture::GetDriver()
-{
- return Env->GetDriver();
+ return *Driver;
}
Y_UNIT_TEST_F(SessionAbort, TFixture)
@@ -333,8 +216,8 @@ Y_UNIT_TEST_F(TwoSessionOneConsumer, TFixture)
Y_UNIT_TEST_F(WriteToTopic, TFixture)
{
NTopic::TWriteSessionSettings options;
- options.Path(GetTopicPath());
- options.MessageGroupId(GetMessageGroupId());
+ options.Path(TEST_TOPIC);
+ options.MessageGroupId(TEST_MESSAGE_GROUP_ID);
auto session = CreateSession();
auto tx = BeginTx(session);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp
index d9826e7a08e..d6b074c3582 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp
@@ -1,4 +1,4 @@
-#include <ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h>
+#include "managed_executor.h"
namespace NYdb::NTopic::NTests {
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.h
index d32c0ff8817..d32c0ff8817 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.h
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
new file mode 100644
index 00000000000..c82589773f9
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
@@ -0,0 +1,110 @@
+#include "topic_sdk_test_setup.h"
+
+using namespace NYdb;
+using namespace NYdb::NTopic;
+using namespace NYdb::NTopic::NTests;
+
+TTopicSdkTestSetup::TTopicSdkTestSetup(const TString& testCaseName, const NKikimr::Tests::TServerSettings& settings, bool createTopic)
+ : Database("/Root")
+ , Server(settings, false)
+{
+ Log.SetFormatter([testCaseName](ELogPriority priority, TStringBuf message) {
+ return TStringBuilder() << TInstant::Now() << " :" << testCaseName << " " << priority << ": " << message << Endl;
+ });
+
+ Server.StartServer(true, GetDatabase());
+
+ Log << "TTopicSdkTestSetup started";
+
+ if (createTopic) {
+ CreateTopic();
+ Log << "Topic created";
+ }
+}
+
+void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consumer)
+{
+ TTopicClient client(MakeDriver());
+
+ TCreateTopicSettings topics;
+ TConsumerSettings<TCreateTopicSettings> consumers(topics, consumer);
+ topics.AppendConsumers(consumers);
+
+ auto status = client.CreateTopic(path, topics).GetValueSync();
+ UNIT_ASSERT(status.IsSuccess());
+
+ Server.WaitInit(path);
+}
+
+TString TTopicSdkTestSetup::GetEndpoint() const {
+ return "localhost:" + ToString(Server.GrpcPort);
+}
+
+TString TTopicSdkTestSetup::GetTopicPath(const TString& name) const {
+ return GetTopicParent() + "/" + name;
+}
+
+TString TTopicSdkTestSetup::GetTopicParent() const {
+ return GetDatabase();
+}
+
+TString TTopicSdkTestSetup::GetDatabase() const {
+ return Database;
+}
+
+::NPersQueue::TTestServer& TTopicSdkTestSetup::GetServer() {
+ return Server;
+}
+
+NActors::TTestActorRuntime& TTopicSdkTestSetup::GetRuntime() {
+ return *Server.CleverServer->GetRuntime();
+}
+
+TLog& TTopicSdkTestSetup::GetLog() {
+ return Log;
+}
+
+TDriverConfig TTopicSdkTestSetup::MakeDriverConfig() const
+{
+ TDriverConfig config;
+ config.SetEndpoint(GetEndpoint());
+ config.SetDatabase(GetDatabase());
+ config.SetAuthToken("root@builtin");
+ config.SetLog(MakeHolder<TStreamLogBackend>(&Cerr));
+ return config;
+}
+
+NKikimr::Tests::TServerSettings TTopicSdkTestSetup::MakeServerSettings()
+{
+ auto settings = NKikimr::NPersQueueTests::PQSettings(0);
+ settings.SetDomainName("Root");
+ settings.SetNodeCount(1);
+ settings.PQConfig.SetTopicsAreFirstClassCitizen(true);
+ settings.PQConfig.SetRoot("/Root");
+ settings.PQConfig.SetDatabase("/Root");
+
+ settings.SetLoggerInitializer([](NActors::TTestActorRuntime& runtime) {
+ runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NActors::NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::PQ_WRITE_PROXY, NActors::NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::PQ_MIRRORER, NActors::NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NActors::NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, NActors::NLog::PRI_DEBUG);
+ });
+
+ return settings;
+}
+
+TDriver TTopicSdkTestSetup::MakeDriver() const {
+ return MakeDriver(MakeDriverConfig());
+}
+
+TDriver TTopicSdkTestSetup::MakeDriver(const TDriverConfig& config) const
+{
+ return TDriver(config);
+}
+
+TTopicClient TTopicSdkTestSetup::MakeClient() const
+{
+ return TTopicClient(MakeDriver());
+} \ No newline at end of file
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h
new file mode 100644
index 00000000000..a9014a34377
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h
@@ -0,0 +1,44 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h>
+
+namespace NYdb::NTopic::NTests {
+
+#define TEST_CASE_NAME (this->Name_)
+
+inline static const TString TEST_TOPIC = "test-topic";
+inline static const TString TEST_CONSUMER = "test-consumer";
+inline static const TString TEST_MESSAGE_GROUP_ID = "test-message_group_id";
+
+class TTopicSdkTestSetup {
+public:
+ TTopicSdkTestSetup(const TString& testCaseName, const NKikimr::Tests::TServerSettings& settings = MakeServerSettings(), bool createTopic = true);
+
+ void CreateTopic(const TString& path = TEST_TOPIC, const TString& consumer = TEST_CONSUMER);
+
+ TString GetEndpoint() const;
+ TString GetTopicPath(const TString& name = TEST_TOPIC) const;
+ TString GetTopicParent() const;
+ TString GetDatabase() const;
+
+ ::NPersQueue::TTestServer& GetServer();
+ NActors::TTestActorRuntime& GetRuntime();
+ TLog& GetLog();
+
+ TTopicClient MakeClient() const;
+
+ TDriver MakeDriver() const;
+ TDriver MakeDriver(const TDriverConfig& config) const;
+
+ TDriverConfig MakeDriverConfig() const;
+ static NKikimr::Tests::TServerSettings MakeServerSettings();
+private:
+ TString Database;
+ ::NPersQueue::TTestServer Server;
+
+ TLog Log = CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG);
+};
+
+} \ No newline at end of file
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make b/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make
index 35f2ffbf019..69c5f51707f 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make
@@ -32,9 +32,9 @@ SRCS(
basic_usage_ut.cpp
describe_topic_ut.cpp
local_partition_ut.cpp
- managed_executor.h
- managed_executor.cpp
topic_to_table_ut.cpp
+ ut_utils/managed_executor.cpp
+ ut_utils/topic_sdk_test_setup.cpp
)
END()