summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBulat <[email protected]>2025-06-16 17:45:11 +0300
committerGitHub <[email protected]>2025-06-16 17:45:11 +0300
commitc198a67ea3efb1f488df95685ff18e85c8292c51 (patch)
tree781ceba4cefb2df585a497622924c49c627fae98
parentc7a5bf18b022f040f6f4bc5f3976c97d0e71af67 (diff)
[C++ SDK] Replaced testlib to recipe/container in topic tests (part 2) (#19573)
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/ut/describe_topic_ut.cpp107
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/ut/direct_read_ut.cpp2046
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/ut/local_partition_ut.cpp214
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/ut/trace_ut.cpp168
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/ut/ya.make1
-rw-r--r--ydb/public/sdk/cpp/tests/integration/basic_example/basic_example.cpp2
-rw-r--r--ydb/public/sdk/cpp/tests/integration/basic_example/ya.make2
-rw-r--r--ydb/public/sdk/cpp/tests/integration/bulk_upsert/bulk_upsert.cpp15
-rw-r--r--ydb/public/sdk/cpp/tests/integration/bulk_upsert/ya.make2
-rw-r--r--ydb/public/sdk/cpp/tests/integration/server_restart/main.cpp2
-rw-r--r--ydb/public/sdk/cpp/tests/integration/server_restart/ya.make2
-rw-r--r--ydb/public/sdk/cpp/tests/integration/sessions/main.cpp31
-rw-r--r--ydb/public/sdk/cpp/tests/integration/sessions/ya.make2
-rw-r--r--ydb/public/sdk/cpp/tests/integration/sessions_pool/main.cpp62
-rw-r--r--ydb/public/sdk/cpp/tests/integration/sessions_pool/ya.make2
-rw-r--r--ydb/public/sdk/cpp/tests/integration/tests_common.inc1
-rw-r--r--ydb/public/sdk/cpp/tests/integration/topic/basic_usage.cpp67
-rw-r--r--ydb/public/sdk/cpp/tests/integration/topic/describe_topic.cpp250
-rw-r--r--ydb/public/sdk/cpp/tests/integration/topic/direct_read.cpp2062
-rw-r--r--ydb/public/sdk/cpp/tests/integration/topic/local_partition.cpp530
-rw-r--r--ydb/public/sdk/cpp/tests/integration/topic/setup/fixture.cpp92
-rw-r--r--ydb/public/sdk/cpp/tests/integration/topic/setup/fixture.h31
-rw-r--r--ydb/public/sdk/cpp/tests/integration/topic/setup/ya.make13
-rw-r--r--ydb/public/sdk/cpp/tests/integration/topic/trace.cpp166
-rw-r--r--ydb/public/sdk/cpp/tests/integration/topic/utils/trace.cpp160
-rw-r--r--ydb/public/sdk/cpp/tests/integration/topic/utils/trace.h120
-rw-r--r--ydb/public/sdk/cpp/tests/integration/topic/utils/ya.make14
-rw-r--r--ydb/public/sdk/cpp/tests/integration/topic/with_direct_read/ya.make35
-rw-r--r--ydb/public/sdk/cpp/tests/integration/topic/ya.make15
29 files changed, 3554 insertions, 2660 deletions
diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/describe_topic_ut.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/describe_topic_ut.cpp
index da1e885fa7c..d43d0aef5a5 100644
--- a/ydb/public/sdk/cpp/src/client/topic/ut/describe_topic_ut.cpp
+++ b/ydb/public/sdk/cpp/src/client/topic/ut/describe_topic_ut.cpp
@@ -1,17 +1,8 @@
#include "ut_utils/topic_sdk_test_setup.h"
-#include <format>
-#include <ydb/public/sdk/cpp/src/library/persqueue/topic_parser_public/topic_parser.h>
-#include <ydb/public/api/protos/persqueue_error_codes_v1.pb.h>
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
-#include <ydb/public/sdk/cpp/src/client/persqueue_public/persqueue.h>
-#include <ydb/public/sdk/cpp/src/client/topic/common/log_lazy.h>
-#include <ydb/public/sdk/cpp/src/client/topic/impl/common.h>
#include <library/cpp/testing/unittest/registar.h>
-#include <library/cpp/testing/unittest/tests_data.h>
-#include <library/cpp/threading/future/future.h>
-#include <library/cpp/threading/future/async.h>
namespace NYdb::NTopic::NTests {
@@ -258,106 +249,10 @@ namespace NYdb::NTopic::NTests {
}
}
- Y_UNIT_TEST(Basic) {
+ Y_UNIT_TEST(LocationWithKillTablets) {
TTopicSdkTestSetup setup(TEST_CASE_NAME);
TTopicClient client = setup.MakeClient();
- DescribeTopic(setup, client, false, false, false, false);
- DescribeConsumer(setup, client, false, false, false, false);
- DescribePartition(setup, client, false, false, false, false);
- }
-
- Y_UNIT_TEST(Statistics) {
- // TODO(abcdef): temporarily deleted
- return;
-
- TTopicSdkTestSetup setup(TEST_CASE_NAME);
- TTopicClient client = setup.MakeClient();
-
- // Get empty description
- DescribeTopic(setup, client, true, false, false, false);
- DescribeConsumer(setup, client, true, false, false, false);
- DescribePartition(setup, client, true, false, false, false);
-
- const size_t messagesCount = 1;
-
- // Write a message
- {
- auto writeSettings = TWriteSessionSettings().Path(TEST_TOPIC).MessageGroupId(TEST_MESSAGE_GROUP_ID).Codec(ECodec::RAW);
- auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings);
- std::string message(32_MB, 'x');
-
- for(size_t i = 0; i < messagesCount; ++i) {
- UNIT_ASSERT(writeSession->Write(message, {}, TInstant::Now() - TDuration::Seconds(100)));
- }
- writeSession->Close();
- }
-
- // Read a message
- {
- auto readSettings = TReadSessionSettings().ConsumerName(TEST_CONSUMER).AppendTopics(TEST_TOPIC);
- auto readSession = client.CreateReadSession(readSettings);
-
- // Event 1: start partition session
- {
- std::optional<TReadSessionEvent::TEvent> event = readSession->GetEvent(true);
- UNIT_ASSERT(event);
- auto startPartitionSession = std::get_if<TReadSessionEvent::TStartPartitionSessionEvent>(&event.value());
- UNIT_ASSERT_C(startPartitionSession, DebugString(*event));
-
- startPartitionSession->Confirm();
- }
-
- // Event 2: data received
- {
- std::optional<TReadSessionEvent::TEvent> event = readSession->GetEvent(true);
- UNIT_ASSERT(event);
- auto dataReceived = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&event.value());
- UNIT_ASSERT_C(dataReceived, DebugString(*event));
-
- dataReceived->Commit();
- }
-
- // Event 3: commit acknowledgement
- {
- std::optional<TReadSessionEvent::TEvent> event = readSession->GetEvent(true);
- UNIT_ASSERT(event);
- auto commitOffsetAck = std::get_if<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event.value());
-
- UNIT_ASSERT_C(commitOffsetAck, DebugString(*event));
-
- UNIT_ASSERT_VALUES_EQUAL(commitOffsetAck->GetCommittedOffset(), messagesCount);
- }
- }
-
- // Additional write
- {
- auto writeSettings = TWriteSessionSettings().Path(TEST_TOPIC).MessageGroupId(TEST_MESSAGE_GROUP_ID).Codec(ECodec::RAW);
- auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings);
- std::string message(32, 'x');
-
- for(size_t i = 0; i < messagesCount; ++i) {
- UNIT_ASSERT(writeSession->Write(message));
- }
- writeSession->Close();
- }
- Sleep(TDuration::Seconds(3));
-
- // Get non-empty description
-
- DescribeTopic(setup, client, true, true, false, false);
- DescribeConsumer(setup, client, true, true, false, false);
- DescribePartition(setup, client, true, true, false, false);
- }
-
- Y_UNIT_TEST(Location) {
- TTopicSdkTestSetup setup(TEST_CASE_NAME);
- TTopicClient client = setup.MakeClient();
-
- DescribeTopic(setup, client, false, false, true, false);
- DescribeConsumer(setup, client, false, false, true, false);
- DescribePartition(setup, client, false, false, true, false);
-
// Describe with KillTablets
DescribeTopic(setup, client, false, false, true, true);
DescribeConsumer(setup, client, false, false, true, true);
diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/direct_read_ut.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/direct_read_ut.cpp
index f5154b24ab4..c5356720999 100644
--- a/ydb/public/sdk/cpp/src/client/topic/ut/direct_read_ut.cpp
+++ b/ydb/public/sdk/cpp/src/client/topic/ut/direct_read_ut.cpp
@@ -3,2014 +3,10 @@
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
-#include <ydb/public/sdk/cpp/src/client/persqueue_public/persqueue.h>
-
-#include <ydb/public/sdk/cpp/src/client/topic/impl/common.h>
-#include <ydb/public/sdk/cpp/src/client/topic/common/executor_impl.h>
-#include <ydb/public/sdk/cpp/src/client/persqueue_public/impl/write_session.h>
-#include <ydb/public/sdk/cpp/src/client/topic/impl/write_session.h>
-
-#include <library/cpp/retry/retry_policy.h>
#include <library/cpp/testing/unittest/registar.h>
-#include <library/cpp/testing/unittest/tests_data.h>
-#include <library/cpp/threading/future/future.h>
-#include <library/cpp/threading/future/async.h>
-
-#include <library/cpp/testing/gmock_in_unittest/gmock.h>
-#include <algorithm>
-#include <future>
-
-using namespace ::testing; // Google mock.
-
-
-#define UNIT_ASSERT_EVENT_TYPE(event, type) \
- UNIT_ASSERT_C( \
- std::holds_alternative<type>(event), \
- "Real event got: " << DebugString(event)) \
- /**/
-
-#define UNIT_ASSERT_NOT_EVENT_TYPE(event, type) \
- UNIT_ASSERT_C( \
- !std::holds_alternative<type>(event), \
- "Real event got: " << DebugString(event)) \
- /**/
-
namespace NYdb::NTopic::NTests {
-namespace {
- const char* SERVER_SESSION_ID = "server-session-id-1";
-}
-
-
-template <class TRequest, class TResponse>
-struct TMockProcessorFactory : public ISessionConnectionProcessorFactory<TRequest, TResponse> {
- using IFactory = ISessionConnectionProcessorFactory<TRequest, TResponse>;
-
- virtual ~TMockProcessorFactory() {
- Wait();
- }
-
- void CreateProcessor( // ISessionConnectionProcessorFactory method.
- typename IFactory::TConnectedCallback callback,
- const TRpcRequestSettings& requestSettings,
- NYdbGrpc::IQueueClientContextPtr connectContext,
- TDuration connectTimeout,
- NYdbGrpc::IQueueClientContextPtr connectTimeoutContext,
- typename IFactory::TConnectTimeoutCallback connectTimeoutCallback,
- TDuration connectDelay,
- NYdbGrpc::IQueueClientContextPtr connectDelayOperationContext) override
- {
- UNIT_ASSERT_C(!ConnectedCallback, "Only one connect at a time is expected");
- UNIT_ASSERT_C(!ConnectTimeoutCallback, "Only one connect at a time is expected");
- ConnectedCallback = callback;
- ConnectTimeoutCallback = connectTimeoutCallback;
-
- Y_UNUSED(requestSettings);
- // TODO Check requestSettings.PreferredEndpoint.GetNodeId()?
- UNIT_ASSERT(connectContext);
- UNIT_ASSERT(connectTimeout);
- UNIT_ASSERT(connectTimeoutContext);
- UNIT_ASSERT(connectTimeoutCallback);
- UNIT_ASSERT(!connectDelay || connectDelayOperationContext);
-
- OnCreateProcessor(++CreateCallsCount);
- }
-
- // Handler is called in CreateProcessor() method after parameter validation.
- MOCK_METHOD(void, OnCreateProcessor, (size_t callNumber)); // 1-based
-
- // Actions to use in OnCreateProcessor handler:
- void CreateProcessor(typename IFactory::IProcessor::TPtr processor) { // Success.
- UNIT_ASSERT(ConnectedCallback);
- auto cb = std::move(ConnectedCallback);
- ConnectedCallback = nullptr;
- ConnectTimeoutCallback = nullptr;
- with_lock (Lock) {
- CallbackFutures.push(std::async(std::launch::async, std::move(cb), TPlainStatus(), processor));
- }
- }
-
- void FailCreation(EStatus status = EStatus::INTERNAL_ERROR, const TString& message = {}) { // Fail.
- UNIT_ASSERT(ConnectedCallback);
- auto cb = std::move(ConnectedCallback);
- ConnectedCallback = nullptr;
- ConnectTimeoutCallback = nullptr;
- with_lock (Lock) {
- CallbackFutures.push(std::async(std::launch::async, std::move(cb), TPlainStatus(status, message), nullptr));
- }
- }
-
- void Timeout() { // Timeout.
- UNIT_ASSERT(ConnectTimeoutCallback);
- auto cb = std::move(ConnectTimeoutCallback);
- ConnectedCallback = nullptr;
- ConnectTimeoutCallback = nullptr;
- with_lock (Lock) {
- CallbackFutures.push(std::async(std::launch::async, std::move(cb), true));
- }
- }
-
- void CreateAndThenTimeout(typename IFactory::IProcessor::TPtr processor) {
- UNIT_ASSERT(ConnectedCallback);
- UNIT_ASSERT(ConnectTimeoutCallback);
- auto cb2 = [cbt = std::move(ConnectTimeoutCallback), cb = std::move(ConnectedCallback), processor]() mutable {
- cb(TPlainStatus(), std::move(processor));
- cbt(true);
- };
- ConnectedCallback = nullptr;
- ConnectTimeoutCallback = nullptr;
- with_lock (Lock) {
- CallbackFutures.push(std::async(std::launch::async, std::move(cb2)));
- }
- }
-
- void FailAndThenTimeout(EStatus status = EStatus::INTERNAL_ERROR, const TString& message = {}) {
- UNIT_ASSERT(ConnectedCallback);
- UNIT_ASSERT(ConnectTimeoutCallback);
- auto cb2 = [cbt = std::move(ConnectTimeoutCallback), cb = std::move(ConnectedCallback), status, message]() mutable {
- cb(TPlainStatus(status, message), nullptr);
- cbt(true);
- };
- ConnectedCallback = nullptr;
- ConnectTimeoutCallback = nullptr;
- with_lock (Lock) {
- CallbackFutures.push(std::async(std::launch::async, std::move(cb2)));
- }
- }
-
- void TimeoutAndThenCreate(typename IFactory::IProcessor::TPtr processor) {
- UNIT_ASSERT(ConnectedCallback);
- UNIT_ASSERT(ConnectTimeoutCallback);
- auto cb2 = [cbt = std::move(ConnectTimeoutCallback), cb = std::move(ConnectedCallback), processor]() mutable {
- cbt(true);
- cb(TPlainStatus(), std::move(processor));
- };
- ConnectedCallback = nullptr;
- ConnectTimeoutCallback = nullptr;
- with_lock (Lock) {
- CallbackFutures.push(std::async(std::launch::async, std::move(cb2)));
- }
- }
-
- void Wait() {
- std::queue<std::future<void>> futuresQueue;
- with_lock (Lock) {
- CallbackFutures.swap(futuresQueue);
- }
- while (!futuresQueue.empty()) {
- futuresQueue.front().wait();
- futuresQueue.pop();
- }
- }
-
- void Validate() {
- UNIT_ASSERT(CallbackFutures.empty());
- ConnectedCallback = nullptr;
- ConnectTimeoutCallback = nullptr;
- }
-
- std::atomic<size_t> CreateCallsCount = 0;
-
-private:
- TAdaptiveLock Lock;
- typename IFactory::TConnectedCallback ConnectedCallback;
- typename IFactory::TConnectTimeoutCallback ConnectTimeoutCallback;
- std::queue<std::future<void>> CallbackFutures;
-};
-
-
-struct TStartPartitionSessionRequest {
- TPartitionId PartitionId;
- TPartitionSessionId PartitionSessionId;
- TNodeId NodeId;
- TGeneration Generation;
-};
-
-struct TStopPartitionSessionRequest {
- TPartitionSessionId PartitionSessionId;
- bool Graceful;
- i64 CommittedOffset;
- TDirectReadId LastDirectReadId;
-};
-
-
-struct TMockReadSessionProcessor : public TMockProcessorFactory<Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer>::IProcessor {
- // Request to read.
- struct TClientReadInfo {
- TReadCallback Callback;
- Ydb::Topic::StreamReadMessage::FromServer* Dst;
-
- operator bool() const {
- return Dst != nullptr;
- }
- };
-
- // Response from server.
- struct TServerReadInfo {
- NYdbGrpc::TGrpcStatus Status;
- Ydb::Topic::StreamReadMessage::FromServer Response;
-
- TServerReadInfo& Failure(grpc::StatusCode status = grpc::StatusCode::UNAVAILABLE, const TString& message = {}, bool internal = false) {
- Status.GRpcStatusCode = status;
- Status.InternalError = internal;
- Status.Msg = message;
- return *this;
- }
-
- TServerReadInfo& InitResponse(const TString& sessionId) {
- Response.mutable_init_response()->set_session_id(sessionId);
- return *this;
- }
-
- TServerReadInfo& StartPartitionSessionRequest(TStartPartitionSessionRequest request) {
- auto* req = Response.mutable_start_partition_session_request();
-
- auto* session = req->mutable_partition_session();
- session->set_partition_session_id(request.PartitionSessionId);
- session->set_partition_id(request.PartitionId);
-
- auto* location = req->mutable_partition_location();
- location->set_node_id(request.NodeId);
- location->set_generation(request.Generation);
-
- return *this;
- }
-
- TServerReadInfo& StopPartitionSession(TStopPartitionSessionRequest request) {
- auto* req = Response.mutable_stop_partition_session_request();
- req->set_partition_session_id(request.PartitionSessionId);
- req->set_graceful(request.Graceful);
- req->set_committed_offset(request.CommittedOffset);
- req->set_last_direct_read_id(request.LastDirectReadId);
- return *this;
- }
-
- };
-
- ~TMockReadSessionProcessor() {
- Wait();
- }
-
- void Cancel() override {
- }
-
- void ReadInitialMetadata(std::unordered_multimap<std::string, std::string>* metadata, TReadCallback callback) override {
- Y_UNUSED(metadata);
- Y_UNUSED(callback);
- UNIT_ASSERT_C(false, "This method is not expected to be called");
- }
-
- void Finish(TReadCallback callback) override {
- Y_UNUSED(callback);
- UNIT_ASSERT_C(false, "This method is not expected to be called");
- }
-
- void AddFinishedCallback(TReadCallback callback) override {
- Y_UNUSED(callback);
- UNIT_ASSERT_C(false, "This method is not expected to be called");
- }
-
- void Read(Ydb::Topic::StreamReadMessage::FromServer* response, TReadCallback callback) override {
- with_lock (Lock) {
- UNIT_ASSERT(!ActiveRead);
- ActiveRead.Callback = std::move(callback);
- ActiveRead.Dst = response;
- if (!ReadResponses.empty()) {
- StartProcessReadImpl();
- }
- }
- }
-
- void StartProcessReadImpl() {
- CallbackFutures.push(std::async(std::launch::async, &TMockReadSessionProcessor::ProcessRead, this));
- }
-
- void Write(Ydb::Topic::StreamReadMessage::FromClient&& request, TWriteCallback callback) override {
- UNIT_ASSERT(!callback); // Read session doesn't set callbacks.
- using FromClient = Ydb::Topic::StreamReadMessage_FromClient;
-
- switch (request.client_message_case()) {
- case FromClient::kInitRequest:
- OnInitRequest(request.init_request());
- break;
- case FromClient::kReadRequest:
- OnReadRequest(request.read_request());
- break;
- case FromClient::kCommitOffsetRequest:
- OnCommitOffsetRequest(request.commit_offset_request());
- break;
- case FromClient::kDirectReadAck:
- OnDirectReadAck(request.direct_read_ack());
- break;
- case FromClient::kStartPartitionSessionResponse:
- OnStartPartitionSessionResponse(request.start_partition_session_response());
- break;
- case FromClient::kStopPartitionSessionResponse:
- OnStopPartitionSessionResponse(request.stop_partition_session_response());
- break;
- case FromClient::CLIENT_MESSAGE_NOT_SET:
- UNIT_ASSERT_C(false, "Invalid request");
- break;
- default:
- Y_UNREACHABLE();
- }
- }
- MOCK_METHOD(void, OnInitRequest, (const Ydb::Topic::StreamReadMessage::InitRequest&), ());
- MOCK_METHOD(void, OnReadRequest, (const Ydb::Topic::StreamReadMessage::ReadRequest&), ());
- MOCK_METHOD(void, OnDirectReadAck, (const Ydb::Topic::StreamReadMessage::DirectReadAck&), ());
- MOCK_METHOD(void, OnCommitOffsetRequest, (const Ydb::Topic::StreamReadMessage::CommitOffsetRequest&), ());
- MOCK_METHOD(void, OnStartPartitionSessionResponse, (const Ydb::Topic::StreamReadMessage::StartPartitionSessionResponse&), ());
- MOCK_METHOD(void, OnStopPartitionSessionResponse, (const Ydb::Topic::StreamReadMessage::StopPartitionSessionResponse&), ());
-
- void Wait() {
- std::queue<std::future<void>> callbackFutures;
- with_lock (Lock) {
- CallbackFutures.swap(callbackFutures);
- }
-
- while (!callbackFutures.empty()) {
- callbackFutures.front().wait();
- callbackFutures.pop();
- }
- }
-
- void Validate() {
- with_lock (Lock) {
- UNIT_ASSERT(ReadResponses.empty());
- UNIT_ASSERT(CallbackFutures.empty());
-
- ActiveRead = TClientReadInfo{};
- }
- }
-
- void ProcessRead() {
- NYdbGrpc::TGrpcStatus status;
- TReadCallback callback;
- with_lock (Lock) {
- if (ActiveRead) {
- *ActiveRead.Dst = ReadResponses.front().Response;
- ActiveRead.Dst = nullptr;
- status = std::move(ReadResponses.front().Status);
- ReadResponses.pop();
- callback = std::move(ActiveRead.Callback);
- }
- }
- if (callback) {
- callback(std::move(status));
- }
- }
-
- void AddServerResponse(TServerReadInfo result) {
- NYdbGrpc::TGrpcStatus status;
- TReadCallback callback;
- with_lock (Lock) {
- ReadResponses.emplace(std::move(result));
- if (ActiveRead) {
- *ActiveRead.Dst = ReadResponses.front().Response;
- ActiveRead.Dst = nullptr;
- status = std::move(ReadResponses.front().Status);
- ReadResponses.pop();
- callback = std::move(ActiveRead.Callback);
- }
- }
- if (callback) {
- callback(std::move(status));
- }
- }
-
- TAdaptiveLock Lock;
- TClientReadInfo ActiveRead;
- std::queue<TServerReadInfo> ReadResponses;
- std::queue<std::future<void>> CallbackFutures;
-};
-
-struct TMockDirectReadSessionProcessor : public TMockProcessorFactory<TDirectReadClientMessage, TDirectReadServerMessage>::IProcessor {
- // Request to read.
- struct TClientReadInfo {
- TReadCallback Callback;
- TDirectReadServerMessage* Dst;
-
- operator bool() const {
- return Dst != nullptr;
- }
- };
-
- // Response from server.
- struct TServerReadInfo {
- NYdbGrpc::TGrpcStatus Status;
- TDirectReadServerMessage Response;
-
- TServerReadInfo& Failure(grpc::StatusCode status = grpc::StatusCode::UNAVAILABLE, const TString& message = {}, bool internal = false) {
- Status.GRpcStatusCode = status;
- Status.InternalError = internal;
- Status.Msg = message;
- return *this;
- }
-
- TServerReadInfo& InitResponse() {
- Response.mutable_init_response();
- return *this;
- }
-
- TServerReadInfo& StartDirectReadPartitionSessionResponse(TPartitionSessionId partitionSessionId) {
- auto* resp = Response.mutable_start_direct_read_partition_session_response();
- resp->set_partition_session_id(partitionSessionId);
- return *this;
- }
-
- TServerReadInfo& StopDirectReadPartitionSession(Ydb::StatusIds::StatusCode status, TPartitionSessionId partitionSessionId) {
- auto* req = Response.mutable_stop_direct_read_partition_session();
- req->set_status(status);
- req->set_partition_session_id(partitionSessionId);
- return *this;
- }
-
- // Data helpers.
- TServerReadInfo& PartitionData(TPartitionSessionId partitionSessionId, TDirectReadId directReadId, ui64 bytesSize = 0) {
- auto* response = Response.mutable_direct_read_response();
- response->set_partition_session_id(partitionSessionId);
- response->set_direct_read_id(directReadId);
- response->set_bytes_size(bytesSize);
- response->mutable_partition_data()->set_partition_session_id(partitionSessionId);
- return *this;
- }
-
- TServerReadInfo& Batch(
- const TString& producerId,
- Ydb::Topic::Codec codec,
- TInstant writeTimestamp = TInstant::MilliSeconds(42),
- const std::vector<std::pair<TString, TString>>& writeSessionMeta = {}
- ) {
- auto* batch = Response.mutable_direct_read_response()->mutable_partition_data()->add_batches();
- batch->set_producer_id(producerId);
- batch->set_codec(codec);
- *batch->mutable_written_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(writeTimestamp.MilliSeconds());
- auto* meta = batch->mutable_write_session_meta();
- for (auto&& [k, v] : writeSessionMeta) {
- (*meta)[k] = v;
- }
- return *this;
- }
-
- TServerReadInfo& Message(
- ui64 offset,
- const TString& data,
- ui64 seqNo = 1,
- TInstant createdAt = TInstant::MilliSeconds(42),
- i64 uncompressedSize = 135,
- const TString& messageGroupId = "",
- const std::vector<std::pair<TString, TString>>& meta = {}
- ) {
- const int lastBatch = Response.direct_read_response().partition_data().batches_size();
- UNIT_ASSERT(lastBatch > 0);
- auto* batch = Response.mutable_direct_read_response()->mutable_partition_data()->mutable_batches(lastBatch - 1);
- auto* req = batch->add_message_data();
- req->set_offset(offset);
- req->set_seq_no(seqNo);
- *req->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(createdAt.MilliSeconds());
- req->set_data(data);
- req->set_message_group_id(messageGroupId);
- req->set_uncompressed_size(uncompressedSize);
- for (auto&& [k, v] : meta) {
- auto* pair = req->add_metadata_items();
- pair->set_key(k);
- pair->set_value(v);
- }
- return *this;
- }
- };
-
- virtual ~TMockDirectReadSessionProcessor() {
- Wait();
- }
-
- void Cancel() override {
- }
-
- void ReadInitialMetadata(std::unordered_multimap<std::string, std::string>* metadata, TReadCallback callback) override {
- Y_UNUSED(metadata);
- Y_UNUSED(callback);
- UNIT_ASSERT_C(false, "This method is not expected to be called");
- }
-
- void Finish(TReadCallback callback) override {
- Y_UNUSED(callback);
- UNIT_ASSERT_C(false, "This method is not expected to be called");
- }
-
- void AddFinishedCallback(TReadCallback callback) override {
- Y_UNUSED(callback);
- UNIT_ASSERT_C(false, "This method is not expected to be called");
- }
-
- void Read(TDirectReadServerMessage* response, TReadCallback callback) override {
- NYdbGrpc::TGrpcStatus status;
- TReadCallback cb;
- with_lock (Lock) {
- Cerr << (TStringBuilder() << "XXXXX Read 1 " << response->DebugString() << "\n");
- UNIT_ASSERT(!ActiveRead);
- ActiveRead.Callback = std::move(callback);
- ActiveRead.Dst = response;
- if (!ReadResponses.empty()) {
- Cerr << (TStringBuilder() << "XXXXX Read 2 " << response->DebugString() << "\n");
- *ActiveRead.Dst = ReadResponses.front().Response;
- ActiveRead.Dst = nullptr;
- status = std::move(ReadResponses.front().Status);
- ReadResponses.pop();
- cb = std::move(ActiveRead.Callback);
- }
- }
- if (cb) {
- Cerr << (TStringBuilder() << "XXXXX Read 3 " << response->DebugString() << "\n");
- cb(std::move(status));
- }
- }
-
- void StartProcessReadImpl() {
- CallbackFutures.push(std::async(std::launch::async, &TMockDirectReadSessionProcessor::ProcessRead, this));
- }
-
- void Write(TDirectReadClientMessage&& request, TWriteCallback callback) override {
- UNIT_ASSERT(!callback); // Read session doesn't set callbacks.
- switch (request.client_message_case()) {
- case TDirectReadClientMessage::kInitRequest:
- OnInitRequest(request.init_request());
- break;
- case TDirectReadClientMessage::kStartDirectReadPartitionSessionRequest:
- OnStartDirectReadPartitionSessionRequest(request.start_direct_read_partition_session_request());
- break;
- case TDirectReadClientMessage::kUpdateTokenRequest:
- OnUpdateTokenRequest(request.update_token_request());
- break;
- case TDirectReadClientMessage::CLIENT_MESSAGE_NOT_SET:
- UNIT_ASSERT_C(false, "Invalid request");
- break;
- }
- }
-
- MOCK_METHOD(void, OnInitRequest, (const Ydb::Topic::StreamDirectReadMessage::InitRequest&), ());
- MOCK_METHOD(void, OnStartDirectReadPartitionSessionRequest, (const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest&), ());
- MOCK_METHOD(void, OnUpdateTokenRequest, (const Ydb::Topic::UpdateTokenRequest&), ());
-
- void Wait() {
- std::queue<std::future<void>> callbackFutures;
- with_lock (Lock) {
- CallbackFutures.swap(callbackFutures);
- }
-
- while (!callbackFutures.empty()) {
- callbackFutures.front().wait();
- callbackFutures.pop();
- }
- }
-
- void Validate() {
- Cerr << "XXXXX Validate\n";
- with_lock (Lock) {
- UNIT_ASSERT(ReadResponses.empty());
- UNIT_ASSERT(CallbackFutures.empty());
-
- ActiveRead = TClientReadInfo{};
- }
- }
-
- void ProcessRead() {
- Cerr << "XXXXX ProcessRead\n";
- NYdbGrpc::TGrpcStatus status;
- TReadCallback callback;
- // GotActiveRead.GetFuture().Wait();
- with_lock (Lock) {
- *ActiveRead.Dst = ReadResponses.front().Response;
- ActiveRead.Dst = nullptr;
- status = std::move(ReadResponses.front().Status);
- ReadResponses.pop();
- callback = std::move(ActiveRead.Callback);
- }
- callback(std::move(status));
- }
-
- void AddServerResponse(TServerReadInfo result) {
- NYdbGrpc::TGrpcStatus status;
- TReadCallback callback;
- with_lock (Lock) {
- Cerr << (TStringBuilder() << "XXXXX AddServerResponse 1 " << result.Response.DebugString() << "\n");
- ReadResponses.emplace(std::move(result));
- if (ActiveRead) {
- Cerr << (TStringBuilder() << "XXXXX AddServerResponse 2\n");
- *ActiveRead.Dst = ReadResponses.front().Response;
- ActiveRead.Dst = nullptr;
- status = std::move(ReadResponses.front().Status);
- ReadResponses.pop();
- callback = std::move(ActiveRead.Callback);
- }
- }
- if (callback) {
- Cerr << (TStringBuilder() << "XXXXX AddServerResponse 3\n");
- callback(std::move(status));
- }
- }
-
- TAdaptiveLock Lock;
- // NThreading::TPromise<void> GotActiveRead = NThreading::NewPromise();
- TClientReadInfo ActiveRead;
- std::queue<TServerReadInfo> ReadResponses;
- std::queue<std::future<void>> CallbackFutures;
-};
-
-class TMockRetryPolicy : public IRetryPolicy {
-public:
- MOCK_METHOD(IRetryPolicy::IRetryState::TPtr, CreateRetryState, (), (const, override));
- TMaybe<TDuration> Delay;
-};
-
-class TMockRetryState : public IRetryPolicy::IRetryState {
-public:
- TMockRetryState(std::shared_ptr<TMockRetryPolicy> policy)
- : Policy(policy) {}
-
- TMaybe<TDuration> GetNextRetryDelay(EStatus) {
- return Policy->Delay;
- }
-private:
- std::shared_ptr<TMockRetryPolicy> Policy;
-};
-
-// Class for testing read session impl with mocks.
-class TDirectReadSessionImplTestSetup {
-public:
- // Types
- using IDirectReadSessionConnectionProcessorFactory = ISessionConnectionProcessorFactory<TDirectReadClientMessage, TDirectReadServerMessage>;
- using TMockDirectReadProcessorFactory = TMockProcessorFactory<TDirectReadClientMessage, TDirectReadServerMessage>;
- using TMockReadProcessorFactory = TMockProcessorFactory<Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer>;
-
- struct TFakeContext : public NYdbGrpc::IQueueClientContext {
- IQueueClientContextPtr CreateContext() override {
- return std::make_shared<TFakeContext>();
- }
-
- grpc::CompletionQueue* CompletionQueue() override {
- UNIT_ASSERT_C(false, "This method is not expected to be called");
- return nullptr;
- }
-
- bool IsCancelled() const override {
- UNIT_ASSERT_C(false, "This method is not expected to be called");
- return false;
- }
-
- bool Cancel() override {
- return false;
- }
-
- void SubscribeCancel(std::function<void()>) override {
- UNIT_ASSERT_C(false, "This method is not expected to be called");
- }
- };
-
- // Methods
- TDirectReadSessionImplTestSetup();
- ~TDirectReadSessionImplTestSetup() noexcept(false); // Performs extra validation and UNIT_ASSERTs
-
- TSingleClusterReadSessionImpl<false>* GetControlSession();
- TDirectReadSession* GetDirectReadSession(IDirectReadSessionControlCallbacks::TPtr);
- void WaitForWorkingDirectReadSession();
-
- std::shared_ptr<TReadSessionEventsQueue<false>> GetEventsQueue();
- IExecutor::TPtr GetDefaultExecutor();
-
- void SuccessfulInit(bool flag = true);
-
- void AddControlResponse(TMockReadSessionProcessor::TServerReadInfo&);
- void AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo&);
-
- // Assertions.
- void AssertNoEvents();
-
-public:
- // Members
- TReadSessionSettings ReadSessionSettings;
- TLog Log = CreateLogBackend("cerr");
- std::shared_ptr<TReadSessionEventsQueue<false>> EventsQueue;
- std::shared_ptr<TFakeContext> FakeContext = std::make_shared<TFakeContext>();
- std::shared_ptr<TMockRetryPolicy> MockRetryPolicy = std::make_shared<TMockRetryPolicy>();
- std::shared_ptr<TMockReadProcessorFactory> MockReadProcessorFactory = std::make_shared<TMockReadProcessorFactory>();
- std::shared_ptr<TMockDirectReadProcessorFactory> MockDirectReadProcessorFactory = std::make_shared<TMockDirectReadProcessorFactory>();
- TIntrusivePtr<TMockReadSessionProcessor> MockReadProcessor = MakeIntrusive<TMockReadSessionProcessor>();
- TIntrusivePtr<TMockDirectReadSessionProcessor> MockDirectReadProcessor = MakeIntrusive<TMockDirectReadSessionProcessor>();
-
- TSingleClusterReadSessionImpl<false>::TPtr SingleClusterReadSession;
- TSingleClusterReadSessionContextPtr SingleClusterReadSessionContextPtr;
-
- TDirectReadSessionManager::TPtr DirectReadSessionManagerPtr;
- TDirectReadSession::TPtr DirectReadSessionPtr;
- TDirectReadSessionContextPtr DirectReadSessionContextPtr;
-
- std::shared_ptr<TThreadPool> ThreadPool;
- IExecutor::TPtr DefaultExecutor;
-};
-
-TDirectReadSessionImplTestSetup::TDirectReadSessionImplTestSetup() {
- ReadSessionSettings
- // .DirectRead(true)
- .AppendTopics({"TestTopic"})
- .ConsumerName("TestConsumer")
- .RetryPolicy(NYdb::NTopic::IRetryPolicy::GetFixedIntervalPolicy(TDuration::MilliSeconds(10)))
- .Counters(MakeIntrusive<NYdb::NTopic::TReaderCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>()));
-
- Log.SetFormatter(GetPrefixLogFormatter(""));
-}
-
-TDirectReadSessionImplTestSetup::~TDirectReadSessionImplTestSetup() noexcept(false) {
- if (!std::uncaught_exceptions()) { // Exiting from test successfully. Check additional expectations.
- MockReadProcessorFactory->Wait();
- MockReadProcessor->Wait();
-
- MockReadProcessorFactory->Validate();
- MockReadProcessor->Validate();
-
- MockDirectReadProcessorFactory->Wait();
- MockDirectReadProcessor->Wait();
-
- MockDirectReadProcessorFactory->Validate();
- MockDirectReadProcessor->Validate();
- }
-
- if (SingleClusterReadSessionContextPtr) {
- if (auto session = SingleClusterReadSessionContextPtr->LockShared()) {
- session->Close({});
- }
- SingleClusterReadSessionContextPtr->Cancel();
- }
-
- if (DirectReadSessionContextPtr) {
- if (auto session = DirectReadSessionContextPtr->LockShared()) {
- session->Close();
- }
- DirectReadSessionContextPtr->Cancel();
- }
-
- SingleClusterReadSession = nullptr;
-
- if (ThreadPool) {
- ThreadPool->Stop();
- }
-}
-
-void TDirectReadSessionImplTestSetup::AddControlResponse(TMockReadSessionProcessor::TServerReadInfo& response) {
- MockReadProcessor->AddServerResponse(response);
-}
-
-void TDirectReadSessionImplTestSetup::AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo& response) {
- MockDirectReadProcessor->AddServerResponse(response);
-}
-
-void TDirectReadSessionImplTestSetup::SuccessfulInit(bool hasInitRequest) {
- EXPECT_CALL(*MockReadProcessorFactory, OnCreateProcessor(1))
- .WillOnce([&](){ MockReadProcessorFactory->CreateProcessor(MockReadProcessor); });
- if (hasInitRequest) {
- EXPECT_CALL(*MockReadProcessor, OnInitRequest(_));
- }
- AddControlResponse(TMockReadSessionProcessor::TServerReadInfo().InitResponse("session-1"));
- GetControlSession()->Start();
- MockReadProcessorFactory->Wait();
- MockReadProcessor->Wait();
-}
-
-std::shared_ptr<TReadSessionEventsQueue<false>> TDirectReadSessionImplTestSetup::GetEventsQueue() {
- if (!EventsQueue) {
- EventsQueue = std::make_shared<TReadSessionEventsQueue<false>>(ReadSessionSettings);
- }
- return EventsQueue;
-}
-
-void TDirectReadSessionImplTestSetup::AssertNoEvents() {
- std::optional<TReadSessionEvent::TEvent> event = GetEventsQueue()->GetEvent(false);
- UNIT_ASSERT(!event);
-}
-
-IExecutor::TPtr TDirectReadSessionImplTestSetup::GetDefaultExecutor() {
- if (!DefaultExecutor) {
- ThreadPool = std::make_shared<TThreadPool>();
- ThreadPool->Start(1);
- DefaultExecutor = CreateThreadPoolExecutorAdapter(ThreadPool);
- }
- return DefaultExecutor;
-}
-
-TSingleClusterReadSessionImpl<false>* TDirectReadSessionImplTestSetup::GetControlSession() {
- if (!SingleClusterReadSession) {
- if (!ReadSessionSettings.DecompressionExecutor_) {
- ReadSessionSettings.DecompressionExecutor(GetDefaultExecutor());
- }
- if (!ReadSessionSettings.EventHandlers_.HandlersExecutor_) {
- ReadSessionSettings.EventHandlers_.HandlersExecutor(GetDefaultExecutor());
- }
- SingleClusterReadSessionContextPtr = MakeWithCallbackContext<TSingleClusterReadSessionImpl<false>>(
- ReadSessionSettings,
- "db",
- "client-session-id-1",
- "",
- Log,
- MockReadProcessorFactory,
- GetEventsQueue(),
- FakeContext,
- 1,
- 1,
- TSingleClusterReadSessionImpl<false>::TScheduleCallbackFunc {},
- MockDirectReadProcessorFactory);
- SingleClusterReadSession = SingleClusterReadSessionContextPtr->TryGet();
- }
- return SingleClusterReadSession.get();
-}
-
-TDirectReadSession* TDirectReadSessionImplTestSetup::GetDirectReadSession(IDirectReadSessionControlCallbacks::TPtr controlCallbacks) {
- if (!DirectReadSessionPtr) {
- DirectReadSessionContextPtr = MakeWithCallbackContext<TDirectReadSession>(
- TNodeId(1),
- SERVER_SESSION_ID,
- ReadSessionSettings,
- controlCallbacks,
- FakeContext,
- MockDirectReadProcessorFactory,
- Log);
- DirectReadSessionPtr = DirectReadSessionContextPtr->TryGet();
- }
- return DirectReadSessionPtr.get();
-}
-
-void TDirectReadSessionImplTestSetup::WaitForWorkingDirectReadSession() {
- while (DirectReadSessionPtr->State != TDirectReadSession::EState::WORKING) {
- Sleep(TDuration::MilliSeconds(10));
- }
-}
-
-class TDirectReadTestsFixture : public NUnitTest::TBaseFixture {
- void SetUp(NUnitTest::TTestContext&) override {
- }
-};
-
-Y_UNIT_TEST_SUITE_F(DirectReadWithClient, TDirectReadTestsFixture) {
-
- /*
- This suite tests direct read mode only through IReadSession, without using internal classes.
- */
-
- Y_UNIT_TEST(OneMessage) {
- /*
- The simplest case: write one message and read it back.
- */
-
- TTopicSdkTestSetup setup(TEST_CASE_NAME);
- TTopicClient client = setup.MakeClient();
-
- {
- // Write a message:
-
- auto settings = TWriteSessionSettings()
- .Path(TEST_TOPIC)
- .ProducerId(TEST_MESSAGE_GROUP_ID)
- .MessageGroupId(TEST_MESSAGE_GROUP_ID);
- auto writer = client.CreateSimpleBlockingWriteSession(settings);
- UNIT_ASSERT(writer->Write("message"));
- writer->Close();
- }
-
- {
- // Read the message:
-
- auto settings = TReadSessionSettings()
- .ConsumerName(TEST_CONSUMER)
- .AppendTopics(TEST_TOPIC)
- // .DirectRead(true)
- ;
- auto reader = client.CreateReadSession(settings);
-
- {
- // Start partition session:
- auto event = reader->GetEvent(true);
- UNIT_ASSERT(event);
- UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TStartPartitionSessionEvent);
- std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event).Confirm();
- }
-
- {
- // Receive the message and commit.
- auto event = reader->GetEvent(true);
- UNIT_ASSERT(event);
- UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
- auto& dataReceived = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
- auto& messages = dataReceived.GetMessages();
- UNIT_ASSERT_EQUAL(messages.size(), 1);
- dataReceived.Commit();
- }
-
- {
- // Get commit ack.
- auto event = reader->GetEvent(true);
- UNIT_ASSERT(event);
- UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TCommitOffsetAcknowledgementEvent);
- }
- }
- }
-
- Y_UNIT_TEST(ManyMessages) {
- /*
- Write many messages and read them back.
-
- Don't compress messages and set MaxMemoryUsageBytes for the reader to 1MB,
- so the server sends multiple DirectReadResponses.
- */
-
- TTopicSdkTestSetup setup(TEST_CASE_NAME, TTopicSdkTestSetup::MakeServerSettings(), false);
- constexpr size_t partitionCount = 2;
- size_t messageCount = 100;
- size_t totalMessageCount = partitionCount * messageCount;
- setup.CreateTopic(std::string(TEST_TOPIC), std::string(TEST_CONSUMER), partitionCount);
- TTopicClient client = setup.MakeClient();
-
- TString message(950_KB, 'x');
-
- // Write messages to all partitions:
- for (size_t partitionId = 0; partitionId < partitionCount; ++partitionId) {
- auto settings = TWriteSessionSettings()
- .Path(TEST_TOPIC)
- .Codec(ECodec::RAW)
- .PartitionId(partitionId)
- .ProducerId(TEST_MESSAGE_GROUP_ID)
- .MessageGroupId(TEST_MESSAGE_GROUP_ID);
-
- auto writer = client.CreateSimpleBlockingWriteSession(settings);
- for (size_t i = 0; i < messageCount; ++i) {
- UNIT_ASSERT(writer->Write(message));
- }
- writer->Close();
- }
-
- std::atomic<bool> work = true;
-
- auto killer = std::thread([&]() {
- while (work.load()) {
- std::this_thread::sleep_for(std::chrono::seconds(5));
- // setup.GetServer().KillTopicPqrbTablet(setup.GetTopicPath());
- }
- });
-
- {
- // Read messages:
-
- size_t gotMessages = 0;
- std::array<size_t, partitionCount> committedOffset{};
- auto settings = TReadSessionSettings()
- .ConsumerName(TEST_CONSUMER)
- .AppendTopics(TEST_TOPIC)
- .MaxMemoryUsageBytes(1_MB)
- // .DirectRead(GetEnv("DIRECT", "0") == "1")
- ;
-
- std::shared_ptr<IReadSession> reader;
-
- settings.EventHandlers_.SimpleDataHandlers(
- [&](NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& e) {
- gotMessages += e.GetMessages().size();
- Cerr << "XXXXX gotMessages: " << gotMessages << " partition_id: " << e.GetPartitionSession()->GetPartitionId() << "\n";
- e.Commit();
- });
-
- settings.EventHandlers_.CommitOffsetAcknowledgementHandler(
- [&](NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent& e) {
- auto partitionId = e.GetPartitionSession()->GetPartitionId();
- committedOffset[partitionId] = e.GetCommittedOffset();
- Cerr << "XXXXX committedOffset: ";
- for (auto offset : committedOffset) {
- Cerr << offset << " ";
- }
- Cerr << Endl;
- if (std::ranges::all_of(committedOffset, [&](size_t offset) { return offset == messageCount; })) {
- reader->Close();
- }
- });
-
- reader = client.CreateReadSession(settings);
-
- reader->GetEvent(/*block = */true);
-
- UNIT_ASSERT_EQUAL(gotMessages, totalMessageCount);
- }
-
- work.store(false);
- killer.join();
- }
-} // Y_UNIT_TEST_SUITE_F(DirectReadWithClient)
-
-
-Y_UNIT_TEST_SUITE_F(DirectReadWithControlSession, TDirectReadTestsFixture) {
-
- /*
- This suite tests direct read sessions together with a control session.
- */
-
- void SuccessfulInitImpl(bool thenTimeout) {
- TDirectReadSessionImplTestSetup setup;
- setup.ReadSessionSettings
- .MaxLag(TDuration::Seconds(32))
- .ReadFromTimestamp(TInstant::Seconds(42));
-
- setup.ReadSessionSettings.Topics_[0]
- .ReadFromTimestamp(TInstant::Seconds(146))
- .AppendPartitionIds(100)
- .AppendPartitionIds(101);
-
- {
- ::testing::InSequence seq;
-
- EXPECT_CALL(*setup.MockReadProcessorFactory, OnCreateProcessor(_))
- .WillOnce([&](){
- if (thenTimeout) {
- setup.MockReadProcessorFactory->CreateAndThenTimeout(setup.MockReadProcessor);
- } else {
- setup.MockReadProcessorFactory->CreateProcessor(setup.MockReadProcessor);
- }
- });
-
- EXPECT_CALL(*setup.MockReadProcessor, OnInitRequest(_))
- .WillOnce(Invoke([&setup](const Ydb::Topic::StreamReadMessage::InitRequest& req) {
- UNIT_ASSERT_STRINGS_EQUAL(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
- UNIT_ASSERT(req.direct_read());
- UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings_size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
- UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings(0).read_from().seconds(), 146);
- UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings(0).partition_ids_size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings(0).partition_ids(0), 100);
- UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings(0).partition_ids(1), 101);
- }));
-
- EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_));
- }
-
- setup.GetControlSession()->Start();
- setup.MockReadProcessorFactory->Wait();
-
- setup.AddControlResponse(TMockReadSessionProcessor::TServerReadInfo().InitResponse("session id"));
-
- setup.AssertNoEvents();
- }
-
- Y_UNIT_TEST(Init) {
- SuccessfulInitImpl(true);
- SuccessfulInitImpl(false);
- }
-
- Y_UNIT_TEST(StopPartitionSessionGracefully) {
- auto const startPartitionSessionRequest = TStartPartitionSessionRequest{
- .PartitionId = 1,
- .PartitionSessionId = 2,
- .NodeId = 3,
- .Generation = 4,
- };
-
- auto const stopPartitionSessionRequest = TStopPartitionSessionRequest{
- .PartitionSessionId = 2,
- .Graceful = true,
- .CommittedOffset = 0,
- .LastDirectReadId = 5,
- };
-
- TDirectReadSessionImplTestSetup setup;
- setup.ReadSessionSettings.Topics_[0].AppendPartitionIds(startPartitionSessionRequest.PartitionId);
-
- {
- {
- ::testing::InSequence seq;
-
- EXPECT_CALL(*setup.MockReadProcessorFactory, OnCreateProcessor(_))
- .WillOnce([&]() {
- setup.MockReadProcessorFactory->CreateProcessor(setup.MockReadProcessor);
- });
-
- EXPECT_CALL(*setup.MockReadProcessor, OnInitRequest(_))
- .WillOnce(Invoke([&](const Ydb::Topic::StreamReadMessage::InitRequest& req) {
- UNIT_ASSERT(req.direct_read());
- UNIT_ASSERT_EQUAL(req.topics_read_settings_size(), 1);
- UNIT_ASSERT_EQUAL(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
- UNIT_ASSERT_EQUAL(req.topics_read_settings(0).partition_ids_size(), 1);
- UNIT_ASSERT_EQUAL(req.topics_read_settings(0).partition_ids(0), startPartitionSessionRequest.PartitionId);
- }));
-
- EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_));
-
- EXPECT_CALL(*setup.MockReadProcessor, OnStartPartitionSessionResponse(_))
- .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamReadMessage::StartPartitionSessionResponse& resp) {
- UNIT_ASSERT_EQUAL(resp.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
- }));
-
- EXPECT_CALL(*setup.MockReadProcessor, OnDirectReadAck(_))
- .Times(4);
- }
-
- // There are two sequences, because OnCreateProcessor from the second sequence may be called
- // before OnStartPartitionSessionResponse from the first sequence.
-
- {
- ::testing::InSequence sequence;
-
- EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
- .WillOnce([&]() {
- setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor);
- });
-
- EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
- .WillOnce(Invoke([&setup](const Ydb::Topic::StreamDirectReadMessage::InitRequest& req) {
- UNIT_ASSERT_EQUAL(req.session_id(), SERVER_SESSION_ID);
- UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings_size(), setup.ReadSessionSettings.Topics_.size());
- UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
- UNIT_ASSERT_VALUES_EQUAL(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
- }));
-
- EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
- .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest& request) {
- UNIT_ASSERT_VALUES_EQUAL(request.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
- UNIT_ASSERT_VALUES_EQUAL(request.generation(), startPartitionSessionRequest.Generation);
- }));
-
- // Expect OnReadRequest in case it is called before the test ends.
- // TODO(qyryq) Fix number, not 10.
- EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_)).Times(AtMost(10));
- }
- }
-
- setup.GetControlSession()->Start();
- setup.MockReadProcessorFactory->Wait();
- setup.AddControlResponse(TMockReadSessionProcessor::TServerReadInfo().InitResponse(SERVER_SESSION_ID));
- setup.AddControlResponse(TMockReadSessionProcessor::TServerReadInfo().StartPartitionSessionRequest(startPartitionSessionRequest));
-
- {
- std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
- UNIT_ASSERT(event);
- UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TStartPartitionSessionEvent);
- std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event).Confirm();
- }
-
- setup.AddControlResponse(TMockReadSessionProcessor::TServerReadInfo()
- .StopPartitionSession(stopPartitionSessionRequest));
-
- setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- .InitResponse());
-
- setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- .StartDirectReadPartitionSessionResponse(startPartitionSessionRequest.PartitionSessionId));
-
- size_t offset = 0, i = 0;
-
- // Verify that the session receives data sent to direct read session:
- for (size_t directReadId = 1; directReadId < stopPartitionSessionRequest.LastDirectReadId; ++directReadId) {
- auto resp = TMockDirectReadSessionProcessor::TServerReadInfo()
- .PartitionData(startPartitionSessionRequest.PartitionSessionId, directReadId)
- // TODO(qyryq) Test with compression!
- // .Batch("producer-id-1", Ydb::Topic::Codec::CODEC_ZSTD);
- .Batch("producer-id-1", Ydb::Topic::Codec::CODEC_RAW);
-
- resp.Message(offset, TStringBuilder() << "message-" << offset, offset);
- ++offset;
- resp.Message(offset, TStringBuilder() << "message-" << offset, offset);
- ++offset;
- setup.AddDirectReadResponse(resp);
-
- std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
- UNIT_ASSERT(event);
- UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
- auto& e = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
- i += e.GetMessagesCount();
- }
-
- while (i < offset) {
- std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
- UNIT_ASSERT(event);
- UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
- auto& e = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
- i += e.GetMessagesCount();
- }
-
- {
- // Verify that the session receives TStopPartitionSessionEvent(graceful=true) after data was received:
-
- std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
- UNIT_ASSERT(event);
- UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TStopPartitionSessionEvent);
- auto e = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&*event);
- e->Confirm();
- }
-
- {
- // Verify that the session receives TPartitionSessionClosedEvent after data was received:
-
- std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
- UNIT_ASSERT(event);
- UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TPartitionSessionClosedEvent);
- // auto e = std::get_if<TReadSessionEvent::TPartitionSessionClosedEvent>(&*event);
- }
-
- setup.AssertNoEvents();
-
- // ::testing::Mock::VerifyAndClear(setup.MockDirectReadProcessorFactory);
- // ::testing::Mock::VerifyAndClear(setup.MockDirectReadProcessor);
- }
-
- Y_UNIT_TEST(StopPartitionSession) {
- auto const startPartitionSessionRequest = TStartPartitionSessionRequest{
- .PartitionId = 1,
- .PartitionSessionId = 2,
- .NodeId = 3,
- .Generation = 4,
- };
-
- TDirectReadSessionImplTestSetup setup;
- setup.ReadSessionSettings.Topics_[0].AppendPartitionIds(startPartitionSessionRequest.PartitionId);
-
- {
- {
- ::testing::InSequence seq;
-
- EXPECT_CALL(*setup.MockReadProcessorFactory, OnCreateProcessor(_))
- .WillOnce([&]() {
- setup.MockReadProcessorFactory->CreateProcessor(setup.MockReadProcessor);
- });
-
- EXPECT_CALL(*setup.MockReadProcessor, OnInitRequest(_))
- .WillOnce(Invoke([&](const Ydb::Topic::StreamReadMessage::InitRequest& req) {
- UNIT_ASSERT(req.direct_read());
- UNIT_ASSERT_EQUAL(req.topics_read_settings_size(), 1);
- UNIT_ASSERT_EQUAL(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
- UNIT_ASSERT_EQUAL(req.topics_read_settings(0).partition_ids_size(), 1);
- UNIT_ASSERT_EQUAL(req.topics_read_settings(0).partition_ids(0), startPartitionSessionRequest.PartitionId);
- }));
-
- EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_));
-
- EXPECT_CALL(*setup.MockReadProcessor, OnStartPartitionSessionResponse(_))
- .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamReadMessage::StartPartitionSessionResponse& resp) {
- UNIT_ASSERT_EQUAL(resp.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
- }));
-
- EXPECT_CALL(*setup.MockReadProcessor, OnDirectReadAck(_))
- .Times(4);
- }
-
- // There are two sequences, because OnCreateProcessor from the second sequence may be called
- // before OnStartPartitionSessionResponse from the first sequence.
-
- {
- ::testing::InSequence sequence;
-
- EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
- .WillOnce([&]() {
- setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor);
- });
-
- EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
- .WillOnce(Invoke([&setup](const Ydb::Topic::StreamDirectReadMessage::InitRequest& req) {
- UNIT_ASSERT_EQUAL(req.session_id(), SERVER_SESSION_ID);
- UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings_size(), setup.ReadSessionSettings.Topics_.size());
- UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
- UNIT_ASSERT_VALUES_EQUAL(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
- }));
-
- EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
- .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest& request) {
- UNIT_ASSERT_VALUES_EQUAL(request.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
- UNIT_ASSERT_VALUES_EQUAL(request.generation(), startPartitionSessionRequest.Generation);
- }));
-
- // Expect OnReadRequest in case it is called before the test ends.
- // TODO(qyryq) Fix number, not 10.
- EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_)).Times(AtMost(10));
- }
- }
-
- setup.GetControlSession()->Start();
- {
- auto r = TMockReadSessionProcessor::TServerReadInfo();
- setup.AddControlResponse(r.InitResponse(SERVER_SESSION_ID));
- }
-
- {
- auto r = TMockReadSessionProcessor::TServerReadInfo();
- setup.AddControlResponse(r.StartPartitionSessionRequest(startPartitionSessionRequest));
- }
-
- {
- std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
- UNIT_ASSERT(event);
- UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TStartPartitionSessionEvent);
- std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event).Confirm();
- }
-
- {
- auto r = TMockDirectReadSessionProcessor::TServerReadInfo();
- setup.AddDirectReadResponse(r.InitResponse());
- }
-
- {
- auto r = TMockDirectReadSessionProcessor::TServerReadInfo();
- setup.AddDirectReadResponse(r.StartDirectReadPartitionSessionResponse(startPartitionSessionRequest.PartitionSessionId));
- }
-
- i64 offset = 0, i = 0;
-
- // Verify that the session receives data sent to direct read session:
- for (size_t directReadId = 1; directReadId < 5; ++directReadId) {
- auto resp = TMockDirectReadSessionProcessor::TServerReadInfo();
- resp.PartitionData(startPartitionSessionRequest.PartitionSessionId, directReadId)
- // TODO(qyryq) Test with compression!
- // .Batch("producer-id-1", Ydb::Topic::Codec::CODEC_ZSTD);
- .Batch("producer-id-1", Ydb::Topic::Codec::CODEC_RAW);
-
- resp.Message(offset, TStringBuilder() << "message-" << offset, offset);
- ++offset;
- resp.Message(offset, TStringBuilder() << "message-" << offset, offset);
- ++offset;
- setup.AddDirectReadResponse(resp);
-
- std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
- UNIT_ASSERT(event);
- UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
- auto& e = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
- i += e.GetMessagesCount();
- e.Commit();
- }
-
- while (i < offset) {
- std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
- UNIT_ASSERT(event);
- UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
- auto& e = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
- i += e.GetMessagesCount();
- }
-
- {
- auto r = TMockReadSessionProcessor::TServerReadInfo();
- setup.AddControlResponse(
- r.StopPartitionSession({
- .PartitionSessionId = 2,
- .Graceful = false,
- .CommittedOffset = offset,
- }));
- }
-
- // TODO(qyryq) Send some bogus events from server, the client should ignore them.
-
- {
- // Verify that the session receives TStopPartitionSessionEvent after data was received:
-
- std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
- UNIT_ASSERT(event);
- UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TPartitionSessionClosedEvent);
- // auto e = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&*event);
- // UNIT_ASSERT(!e.Graceful);
- // UNIT_ASSERT(e.CommittedOffset == offset);
- }
-
- setup.MockReadProcessorFactory->Wait();
- setup.MockDirectReadProcessorFactory->Wait();
-
- setup.AssertNoEvents();
- }
-
- Y_UNIT_TEST(EmptyDirectReadResponse) {
- // Sometimes the server might send a DirectReadResponse with no data, but with bytes_size value > 0.
- // It can happen, if the server tried to send DirectReadResponse, but did not succeed,
- // and in the meantime the messages that should had been sent have been rotated by retention period,
- // and do not exist anymore. To keep ReadSizeBudget bookkeeping correct, the server still sends the an DirectReadResponse,
- // and SDK should process it correctly: basically it should immediately send a ReadRequest(bytes_size=DirectReadResponse.bytes_size).
-
- auto const startPartitionSessionRequest = TStartPartitionSessionRequest{
- .PartitionId = 1,
- .PartitionSessionId = 2,
- .NodeId = 3,
- .Generation = 4,
- };
-
- i64 bytesSize = 12345;
-
- TDirectReadSessionImplTestSetup setup;
- setup.ReadSessionSettings.Topics_[0].AppendPartitionIds(startPartitionSessionRequest.PartitionId);
-
- {
- {
- ::testing::InSequence seq;
-
- EXPECT_CALL(*setup.MockReadProcessorFactory, OnCreateProcessor(_))
- .WillOnce([&]() {
- setup.MockReadProcessorFactory->CreateProcessor(setup.MockReadProcessor);
- });
-
- EXPECT_CALL(*setup.MockReadProcessor, OnInitRequest(_))
- .WillOnce(Invoke([&](const Ydb::Topic::StreamReadMessage::InitRequest& req) {
- UNIT_ASSERT(req.direct_read());
- UNIT_ASSERT_EQUAL(req.topics_read_settings_size(), 1);
- UNIT_ASSERT_EQUAL(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
- UNIT_ASSERT_EQUAL(req.topics_read_settings(0).partition_ids_size(), 1);
- UNIT_ASSERT_EQUAL(req.topics_read_settings(0).partition_ids(0), startPartitionSessionRequest.PartitionId);
- }));
-
- EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_));
-
- EXPECT_CALL(*setup.MockReadProcessor, OnStartPartitionSessionResponse(_))
- .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamReadMessage::StartPartitionSessionResponse& resp) {
- UNIT_ASSERT_EQUAL(resp.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
- }));
-
- EXPECT_CALL(*setup.MockReadProcessor, OnDirectReadAck(_))
- .Times(1);
-
- EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_))
- .WillOnce(Invoke([&](const Ydb::Topic::StreamReadMessage::ReadRequest& req) {
- UNIT_ASSERT_EQUAL(req.bytes_size(), bytesSize);
- }));
- }
-
- // There are two sequences, because OnCreateProcessor from the second sequence may be called
- // before OnStartPartitionSessionResponse from the first sequence.
-
- {
- ::testing::InSequence sequence;
-
- EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
- .WillOnce([&]() {
- setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor);
- });
-
- EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
- .WillOnce(Invoke([&setup](const Ydb::Topic::StreamDirectReadMessage::InitRequest& req) {
- UNIT_ASSERT_EQUAL(req.session_id(), SERVER_SESSION_ID);
- UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings_size(), setup.ReadSessionSettings.Topics_.size());
- UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
- UNIT_ASSERT_VALUES_EQUAL(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
- }));
-
- EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
- .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest& request) {
- UNIT_ASSERT_VALUES_EQUAL(request.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
- UNIT_ASSERT_VALUES_EQUAL(request.generation(), startPartitionSessionRequest.Generation);
- }));
- }
- }
-
- setup.GetControlSession()->Start();
- {
- auto r = TMockReadSessionProcessor::TServerReadInfo();
- setup.AddControlResponse(r.InitResponse(SERVER_SESSION_ID));
- }
-
- {
- auto r = TMockReadSessionProcessor::TServerReadInfo();
- setup.AddControlResponse(r.StartPartitionSessionRequest(startPartitionSessionRequest));
- }
-
- {
- std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
- UNIT_ASSERT(event);
- UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TStartPartitionSessionEvent);
- std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event).Confirm();
- }
-
- {
- auto r = TMockDirectReadSessionProcessor::TServerReadInfo();
- setup.AddDirectReadResponse(r.InitResponse());
- }
-
- {
- auto r = TMockDirectReadSessionProcessor::TServerReadInfo();
- setup.AddDirectReadResponse(r.StartDirectReadPartitionSessionResponse(startPartitionSessionRequest.PartitionSessionId));
- }
-
- i64 directReadId = 1;
-
- auto resp = TMockDirectReadSessionProcessor::TServerReadInfo();
- resp.PartitionData(startPartitionSessionRequest.PartitionSessionId, directReadId, bytesSize);
- setup.AddDirectReadResponse(resp);
-
- setup.MockReadProcessorFactory->Wait();
- setup.MockDirectReadProcessorFactory->Wait();
-
- setup.AssertNoEvents();
- }
-
-} // Y_UNIT_TEST_SUITE_F(DirectReadWithControlSession)
-
-
-Y_UNIT_TEST_SUITE_F(DirectReadSession, TDirectReadTestsFixture) {
-
- /*
- This suite tests TDirectReadSession in isolation, without control session.
- */
-
- Y_UNIT_TEST(InitAndStartPartitionSession) {
- /*
- Create DirectRead processor, send InitRequest, StartDirectReadPartitionSessionRequest.
- */
-
- TDirectReadSessionImplTestSetup setup;
-
- auto gotStart = NThreading::NewPromise();
-
- TPartitionSessionId partitionSessionId = 1;
-
- class TControlCallbacks : public IDirectReadSessionControlCallbacks {};
- auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>());
-
- {
- ::testing::InSequence seq;
-
- EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
- .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
-
- EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
- .WillOnce(Invoke([&](const Ydb::Topic::StreamDirectReadMessage::InitRequest& req) {
- UNIT_ASSERT_EQUAL(req.session_id(), SERVER_SESSION_ID);
- UNIT_ASSERT_EQUAL(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
- }));
-
- EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
- .WillOnce(Invoke([&](const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest& req) {
- UNIT_ASSERT_EQUAL(req.partition_session_id(), static_cast<i64>(partitionSessionId));
- gotStart.SetValue();
- }));
- }
-
- session->Start();
-
- setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- .InitResponse());
-
- session->AddPartitionSession({ .PartitionSessionId = partitionSessionId, .Location = {2, 3} });
-
- gotStart.GetFuture().Wait();
- }
-
- Y_UNIT_TEST(NoRetryDirectReadSession) {
- /*
- If the session cannot establish a connection, and the retry policy does not allow to make another retry,
- the session should be aborted and the client should receive TSessionClosedEvent.
- */
-
- TDirectReadSessionImplTestSetup setup;
- setup.ReadSessionSettings.RetryPolicy(NYdb::NTopic::IRetryPolicy::GetNoRetryPolicy());
-
- auto gotClosedEvent = NThreading::NewPromise();
-
- EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
- .WillOnce([&]() { setup.MockDirectReadProcessorFactory->FailCreation(); });
-
- class TControlCallbacks : public IDirectReadSessionControlCallbacks {
- public:
- TControlCallbacks(NThreading::TPromise<void>& gotClosedEvent) : GotClosedEvent(gotClosedEvent) {}
- void AbortSession(TSessionClosedEvent&&) override { GotClosedEvent.SetValue(); }
- NThreading::TPromise<void>& GotClosedEvent;
- };
-
- auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(gotClosedEvent));
-
- session->Start();
- setup.MockDirectReadProcessorFactory->Wait();
- gotClosedEvent.GetFuture().Wait();
- }
-
- Y_UNIT_TEST(RetryDirectReadSession) {
- /*
- If the retry policy allows retries, keep trying to establish connection.
- */
- TDirectReadSessionImplTestSetup setup;
- size_t nRetries = 2;
- setup.ReadSessionSettings.RetryPolicy(NYdb::NTopic::IRetryPolicy::GetFixedIntervalPolicy(
- TDuration::MilliSeconds(1), TDuration::MilliSeconds(1), nRetries));
-
- auto gotClosedEvent = NThreading::NewPromise();
-
- ON_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
- .WillByDefault([&]() { setup.MockDirectReadProcessorFactory->FailCreation(); });
-
- EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
- .Times(1 + nRetries); // First call + N retries.
-
- class TControlCallbacks : public IDirectReadSessionControlCallbacks {
- public:
- TControlCallbacks(NThreading::TPromise<void>& gotClosedEvent) : GotClosedEvent(gotClosedEvent) {}
- void AbortSession(TSessionClosedEvent&&) override { GotClosedEvent.SetValue(); }
- NThreading::TPromise<void>& GotClosedEvent;
- };
-
- auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(gotClosedEvent));
- session->Start();
- setup.MockDirectReadProcessorFactory->Wait();
-
- gotClosedEvent.GetFuture().Wait();
- }
-
- // Y_UNIT_TEST(NoRetryPartitionSession) {
- // /*
- // If we get a StopDirectReadPartitionSession event, and the retry policy does not allow to send another Start-request,
- // the session should be aborted and the client should receive TSessionClosedEvent.
- // */
- // TDirectReadSessionImplTestSetup setup;
- // setup.ReadSessionSettings.RetryPolicy(NYdb::NTopic::IRetryPolicy::GetNoRetryPolicy());
-
- // auto gotClosedEvent = NThreading::NewPromise();
-
- // {
- // ::testing::InSequence seq;
-
- // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
- // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
-
- // EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_));
-
- // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_));
- // }
-
- // class TControlCallbacks : public IDirectReadSessionControlCallbacks {
- // public:
- // TControlCallbacks(NThreading::TPromise<void>& gotClosedEvent) : GotClosedEvent(gotClosedEvent) {}
- // void AbortSession(TSessionClosedEvent&&) override { GotClosedEvent.SetValue(); }
- // NThreading::TPromise<void>& GotClosedEvent;
- // };
-
- // auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(gotClosedEvent));
- // session->Start();
- // setup.MockDirectReadProcessorFactory->Wait();
-
- // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- // .InitResponse());
-
- // session->AddPartitionSession({ .PartitionSessionId = 1, .Location = {2, 3} });
-
- // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, TPartitionSessionId(1)));
-
- // gotClosedEvent.GetFuture().Wait();
- // }
-
- // Y_UNIT_TEST(RetryPartitionSession) {
- // /*
- // Keep sending Start-requests until the retry policy denies next retry.
- // */
- // TDirectReadSessionImplTestSetup setup;
- // size_t nRetries = 2;
- // setup.ReadSessionSettings.RetryPolicy(NYdb::NTopic::IRetryPolicy::GetFixedIntervalPolicy(
- // TDuration::MilliSeconds(1), TDuration::MilliSeconds(1), nRetries));
-
- // auto gotClosedEvent = NThreading::NewPromise();
-
- // {
- // ::testing::InSequence seq;
-
- // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
- // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
-
- // EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_));
-
- // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
- // .Times(1 + nRetries);
- // }
-
- // class TControlCallbacks : public IDirectReadSessionControlCallbacks {
- // public:
- // TControlCallbacks(NThreading::TPromise<void>& gotClosedEvent) : GotClosedEvent(gotClosedEvent) {}
- // void AbortSession(TSessionClosedEvent&&) override { GotClosedEvent.SetValue(); }
- // void ScheduleCallback(TDuration, std::function<void()> cb, TDeferredActions<false>& deferred) override {
- // deferred.DeferCallback(cb);
- // }
- // NThreading::TPromise<void>& GotClosedEvent;
- // };
-
- // auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(gotClosedEvent));
-
- // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- // .InitResponse());
-
- // session->Start();
- // setup.MockDirectReadProcessorFactory->Wait();
-
- // TPartitionSessionId partitionSessionId = 1;
-
- // session->AddPartitionSession({ .PartitionSessionId = partitionSessionId, .Location = {2, 3} });
-
- // for (size_t i = 0; i < 1 + nRetries; ++i) {
- // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, partitionSessionId));
- // }
-
- // gotClosedEvent.GetFuture().Wait();
- // }
-
- // Y_UNIT_TEST(ResetRetryStateOnSuccess) {
- // /*
- // Test that the client creates a new retry state on the first error after a successful response.
-
- // With the default retry policy (exponential backoff), retry delays grow after each unsuccessful request.
- // After the first successful request retry state should be reset, so the delay after another unsuccessful request will be small.
-
- // E.g. if the exponential backoff policy is used, and minDelay is 1ms, and scaleFactor is 1000, then the following should happen:
-
- // client -> server: InitRequest
- // client <-- server: InitResponse
- // client -> server: StartDirectReadPartitionSessionRequest
- // client <- server: StopDirectReadPartitionSession(OVERLOADED)
- // note over client: Wait 1 ms
- // client -> server: StartDirectReadPartitionSessionRequest
- // client <-- server: StartDirectReadPartitionSessionResponse
- // note over client: Reset RetryState
- // client <- server: StopDirectReadPartitionSession(OVERLOADED)
- // note over client: Wait 1 ms, not 1 second
- // client -> server: StartDirectReadPartitionSessionRequest
- // */
-
- // TDirectReadSessionImplTestSetup setup;
- // setup.ReadSessionSettings.RetryPolicy(setup.MockRetryPolicy);
-
- // auto gotFinalStart = NThreading::NewPromise();
- // TPartitionSessionId partitionSessionId = 1;
-
- // {
- // ::testing::InSequence sequence;
-
- // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
- // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
-
- // EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_));
- // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_));
-
- // // The client receives StopDirectReadPartitionSession, create TDirectReadSession::PartitionSessions[i].RetryState
- // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
- // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
-
- // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_));
-
- // // The client receives StartDirectReadPartitionSessionResponse, resets retry state,
- // // then receives StopDirectReadPartitionSession and has to create a new retry state.
- // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
- // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
-
- // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
- // .WillOnce([&]() { gotFinalStart.SetValue(); });
- // }
-
- // class TControlCallbacks : public IDirectReadSessionControlCallbacks {
- // public:
- // void ScheduleCallback(TDuration, std::function<void()> cb, TDeferredActions<false>& deferred) override {
- // deferred.DeferCallback(cb);
- // }
- // };
-
- // auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>());
-
- // session->Start();
- // setup.MockDirectReadProcessorFactory->Wait();
-
- // session->AddPartitionSession({ .PartitionSessionId = partitionSessionId, .Location = {2, 3} });
-
- // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- // .InitResponse());
-
- // setup.MockRetryPolicy->Delay = TDuration::MilliSeconds(1);
-
- // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, partitionSessionId));
-
- // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- // .StartDirectReadPartitionSessionResponse(partitionSessionId));
-
- // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, partitionSessionId));
-
- // gotFinalStart.GetFuture().Wait();
- // }
-
- // Y_UNIT_TEST(PartitionSessionRetainsRetryStateOnReconnects) {
- // /*
- // We need to retain retry states of separate partition sessions
- // even after reestablishing the connection to a node.
-
- // E.g. partition session receives StopDirectReadPartitionSession
- // and we need to send StartDirectReadPartitionSessionRequest in 5 minutes due to the retry policy.
-
- // But in the meantime, the session loses connection to the server and reconnects within several seconds.
-
- // We must not send that StartDirectReadPartitionSessionRequest right away, but wait ~5 minutes.
-
- // client -> server: InitRequest
- // client <-- server: InitResponse
- // client -> server: StartDirectReadPartitionSessionRequest
- // client <- server: StopDirectReadPartitionSession(OVERLOADED)
- // note over client: Wait N seconds before sending Start again
- // ... Connection lost, client reconnects to the server ...
- // client -> server: InitRequest
- // client <-- server: InitResponse
- // note over client: Still has to wait ~N seconds
- // client -> server: StartDirectReadPartitionSessionRequest
- // */
-
- // TDirectReadSessionImplTestSetup setup;
- // setup.ReadSessionSettings.RetryPolicy(setup.MockRetryPolicy);
-
- // auto gotFinalStart = NThreading::NewPromise();
- // auto gotInitRequest = NThreading::NewPromise();
- // auto calledRead = NThreading::NewPromise();
- // TPartitionSessionId partitionSessionId = 1;
- // auto secondProcessor = MakeIntrusive<TMockDirectReadSessionProcessor>();
- // auto delay = TDuration::Seconds(300);
-
- // {
- // ::testing::InSequence sequence;
-
- // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(1))
- // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
-
- // EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_));
-
- // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_));
-
- // // The client receives StopDirectReadPartitionSession, create TDirectReadSession::PartitionSessions[i].RetryState
- // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
- // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
-
- // // The client loses connection, create TDirectReadSession.RetryState
- // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
- // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
-
- // // The connection is lost at this point, the client tries to reconnect.
- // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(2))
- // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(secondProcessor); });
-
- // EXPECT_CALL(*secondProcessor, OnInitRequest(_))
- // .WillOnce([&]() { gotInitRequest.SetValue(); });
-
- // // The client waits `delay` seconds before sending the StartDirectReadPartitionSessionRequest.
-
- // EXPECT_CALL(*secondProcessor, OnStartDirectReadPartitionSessionRequest(_))
- // .WillOnce([&]() { gotFinalStart.SetValue(); });
- // }
-
- // std::function<void()> callback;
-
- // class TControlCallbacks : public IDirectReadSessionControlCallbacks {
- // public:
- // TControlCallbacks(std::function<void()>& callback) : Callback(callback) {}
- // void ScheduleCallback(TDuration, std::function<void()> cb, TDeferredActions<false>&) override {
- // Callback = cb;
- // }
- // std::function<void()>& Callback;
- // };
-
- // auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(callback));
-
- // session->Start();
- // setup.MockDirectReadProcessorFactory->Wait();
-
- // session->AddPartitionSession({ .PartitionSessionId = partitionSessionId, .Location = {2, 3} });
-
- // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- // .InitResponse());
-
- // setup.MockRetryPolicy->Delay = delay;
-
- // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, partitionSessionId));
-
- // // Besides logs, these durations don't really affect anything in tests.
- // setup.MockRetryPolicy->Delay = TDuration::Seconds(1);
-
- // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- // .Failure());
-
- // gotInitRequest.GetFuture().Wait();
- // secondProcessor->AddServerResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- // .InitResponse());
-
- // // Ensure that the callback is called after the direct session got InitResponse.
- // setup.WaitForWorkingDirectReadSession();
-
- // callback();
-
- // gotFinalStart.GetFuture().Wait();
-
- // secondProcessor->Wait();
- // secondProcessor->Validate();
- // }
-
- // Y_UNIT_TEST(RetryWithoutConnectionResetsPartitionSession) {
- // /*
- // If there are pending StartDirectReadPartitionSession requests that were delayed due to previous errors,
- // and the entire session then loses connection for an extended period of time (greater than the callback delays),
- // the following process should be followed:
-
- // When the session finally reconnects, the pending Start requests should be sent immediately.
- // This is because their callbacks have already been fired, but the requests were not sent due to the lack of connection.
-
- // client -> server: InitRequest
- // client <-- server: InitResponse
- // client -> server: StartDirectReadPartitionSessionRequest
- // client <- server: StopDirectReadPartitionSession(OVERLOADED)
- // note over client: Wait 1 second before sending Start again
- // ... Connection lost ...
- // note over client: SendStart... callback fires, resets state
- // ... Connection reestablished in 1 minute ...
- // client -> server: InitRequest
- // client <-- server: InitResponse
- // note over client: Send the Start request immediately
- // client -> server: StartDirectReadPartitionSessionRequest
- // */
-
- // TDirectReadSessionImplTestSetup setup;
- // setup.ReadSessionSettings.RetryPolicy(setup.MockRetryPolicy);
-
- // auto gotFinalStart = NThreading::NewPromise();
- // auto calledRead = NThreading::NewPromise();
- // TPartitionSessionId partitionSessionId = 1;
- // auto secondProcessor = MakeIntrusive<TMockDirectReadSessionProcessor>();
- // auto delay = TDuration::MilliSeconds(1);
-
- // {
- // ::testing::InSequence sequence;
-
- // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(1))
- // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
-
- // EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
- // .Times(1);
-
- // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
- // .Times(1);
-
- // // The client receives StopDirectReadPartitionSession, create TDirectReadSession::PartitionSessions[i].RetryState
- // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
- // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
-
- // // The client loses connection, create TDirectReadSession.RetryState
- // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
- // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
-
- // // The connection is lost at this point, the client tries to reconnect.
- // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(2))
- // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(secondProcessor); });
-
- // EXPECT_CALL(*secondProcessor, OnInitRequest(_))
- // .Times(1);
-
- // EXPECT_CALL(*secondProcessor, OnStartDirectReadPartitionSessionRequest(_))
- // .WillOnce([&]() { gotFinalStart.SetValue(); });
- // }
-
- // std::function<void()> callback;
-
- // class TControlCallbacks : public IDirectReadSessionControlCallbacks {
- // public:
- // TControlCallbacks(TDuration delay, std::function<void()>& callback) : Delay(delay), Callback(callback) {}
- // void ScheduleCallback(TDuration d, std::function<void()> cb, TDeferredActions<false>&) override {
- // UNIT_ASSERT_EQUAL(Delay, d);
- // Callback = cb;
- // }
- // TDuration Delay;
- // std::function<void()>& Callback;
- // };
-
- // auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(delay, callback));
-
- // session->Start();
- // setup.MockDirectReadProcessorFactory->Wait();
-
- // session->AddPartitionSession({ .PartitionSessionId = partitionSessionId, .Location = {2, 3} });
-
- // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- // .InitResponse());
-
- // setup.MockRetryPolicy->Delay = delay;
-
- // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, partitionSessionId));
-
- // // Besides logs, these durations don't really affect anything in tests.
- // setup.MockRetryPolicy->Delay = TDuration::Seconds(10);
-
- // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- // .Failure());
-
- // // Delayed callback is fired, but there is no connection, so the partition session state changes to IDLE,
- // // and the request should be sent after receiving InitResponse.
- // callback();
-
- // secondProcessor->AddServerResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
- // .InitResponse());
-
- // gotFinalStart.GetFuture().Wait();
-
- // secondProcessor->Wait();
- // secondProcessor->Validate();
- // }
-
-} // Y_UNIT_TEST_SUITE_F(DirectReadSession)
-
-
Y_UNIT_TEST_SUITE(DirectReadWithServer) {
/*
@@ -2177,48 +173,6 @@ Y_UNIT_TEST_SUITE(DirectReadWithServer) {
reader->Close();
}
-
- Y_UNIT_TEST(Devslice) {
- return;
- auto driverConfig = NYdb::TDriverConfig()
- .SetEndpoint(GetEnv("ENDPOINT"))
- .SetDatabase("/Root/testdb")
- .SetLog(std::unique_ptr<TLogBackend>(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release()))
- .SetAuthToken(GetEnv("YDB_TOKEN"));
-
- auto driver = NYdb::TDriver(driverConfig);
-
- auto clientSettings = TTopicClientSettings();
- auto client = TTopicClient(driver, clientSettings);
-
- auto settings = TReadSessionSettings()
- .AppendTopics(TTopicReadSettings("t1").AppendPartitionIds({0}))
- .ConsumerName("c1")
- // .DirectRead(true)
- ;
-
- settings.EventHandlers_
- .StartPartitionSessionHandler([](TReadSessionEvent::TStartPartitionSessionEvent& e) {
- e.Confirm();
- })
- .StopPartitionSessionHandler([](TReadSessionEvent::TStopPartitionSessionEvent& e) {
- e.Confirm();
- })
- .DataReceivedHandler([](TReadSessionEvent::TDataReceivedEvent& e) {
- for (ui32 i = 0; i < e.GetMessages().size(); ++i) {
- auto& m = e.GetMessages()[i];
- Cerr << (TStringBuilder() << "Message: " << m.GetData() << Endl);
- m.Commit();
- }
- });
-
- auto reader = client.CreateReadSession(settings);
-
- Sleep(TDuration::Seconds(1000));
-
- reader->Close();
- }
-
} // Y_UNIT_TEST_SUITE_F(DirectReadWithServer)
} // namespace NYdb::NTopic::NTests
diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/local_partition_ut.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/local_partition_ut.cpp
index f5730147a4d..dd81aafc6b1 100644
--- a/ydb/public/sdk/cpp/src/client/topic/ut/local_partition_ut.cpp
+++ b/ydb/public/sdk/cpp/src/client/topic/ut/local_partition_ut.cpp
@@ -232,13 +232,6 @@ namespace NYdb::NTopic::NTests {
return Result{setup, client, mockDiscoveryService};
}
- Y_UNIT_TEST(Basic) {
- auto [setup, client, discovery] = Start(TEST_CASE_NAME);
-
- WriteMessage(*client);
- ReadMessage(*client);
- }
-
Y_UNIT_TEST(Restarts) {
auto [setup, client, discovery] = Start(TEST_CASE_NAME);
@@ -250,173 +243,6 @@ namespace NYdb::NTopic::NTests {
}
}
- Y_UNIT_TEST(DescribeBadPartition) {
- auto setup = CreateSetup(TEST_CASE_NAME);
-
-
- TMockDiscoveryService discovery;
- discovery.SetGoodEndpoints(*setup);
-
- auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>();
-
- // Set non-existing partition
- auto writeSettings = CreateWriteSessionSettings();
- writeSettings.RetryPolicy(retryPolicy);
- writeSettings.PartitionId(1);
-
- retryPolicy->Initialize();
- retryPolicy->ExpectBreakDown();
-
- Cerr << "=== Create write session\n";
- TTopicClient client(TDriver(CreateConfig(*setup, discovery.GetDiscoveryAddr())));
- auto writeSession = client.CreateWriteSession(writeSettings);
-
- Cerr << "=== Wait for retries\n";
- retryPolicy->WaitForRetriesSync(3);
-
- Cerr << "=== Alter partition count\n";
- TAlterTopicSettings alterSettings;
- alterSettings.AlterPartitioningSettings(2, 2);
- auto alterResult = client.AlterTopic(setup->GetTopicPath(), alterSettings).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
-
- Cerr << "=== Wait for repair\n";
- retryPolicy->WaitForRepairSync();
-
- Cerr << "=== Close write session\n";
- writeSession->Close();
- }
-
- Y_UNIT_TEST(DiscoveryServiceBadPort) {
- auto setup = CreateSetup(TEST_CASE_NAME);
-
- TMockDiscoveryService discovery;
- discovery.SetEndpoints(9999, 2, 0);
-
- auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>();
-
- auto writeSettings = CreateWriteSessionSettings();
- writeSettings.RetryPolicy(retryPolicy);
-
- retryPolicy->Initialize();
- retryPolicy->ExpectBreakDown();
-
- Cerr << "=== Create write session\n";
- TTopicClient client(TDriver(CreateConfig(*setup, discovery.GetDiscoveryAddr())));
- auto writeSession = client.CreateWriteSession(writeSettings);
-
- Cerr << "=== Wait for retries\n";
- retryPolicy->WaitForRetriesSync(3);
-
- discovery.SetGoodEndpoints(*setup);
-
- Cerr << "=== Wait for repair\n";
- retryPolicy->WaitForRepairSync();
-
- Cerr << "=== Close write session\n";
- writeSession->Close();
- }
-
- Y_UNIT_TEST(DiscoveryServiceBadNodeId) {
- auto setup = CreateSetup(TEST_CASE_NAME);
-
- TMockDiscoveryService discovery;
- discovery.SetEndpoints(9999, setup->GetRuntime().GetNodeCount(), setup->GetServer().GrpcPort);
-
- auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>();
-
- auto writeSettings = CreateWriteSessionSettings();
- writeSettings.RetryPolicy(retryPolicy);
-
- retryPolicy->Initialize();
- retryPolicy->ExpectBreakDown();
-
- Cerr << "=== Create write session\n";
- TTopicClient client(TDriver(CreateConfig(*setup, discovery.GetDiscoveryAddr())));
- auto writeSession = client.CreateWriteSession(writeSettings);
-
- Cerr << "=== Wait for retries\n";
- retryPolicy->WaitForRetriesSync(3);
-
- discovery.SetGoodEndpoints(*setup);
-
- Cerr << "=== Wait for repair\n";
- retryPolicy->WaitForRepairSync();
-
- Cerr << "=== Close write session\n";
- writeSession->Close();
- }
-
- Y_UNIT_TEST(DescribeHang) {
- auto setup = CreateSetup(TEST_CASE_NAME);
-
- TMockDiscoveryService discovery;
- discovery.SetEndpoints(9999, 2, 0);
-
- auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>(TDuration::Days(1));
-
- auto writeSettings = CreateWriteSessionSettings();
- writeSettings.RetryPolicy(retryPolicy);
-
- retryPolicy->Initialize();
- retryPolicy->ExpectBreakDown();
-
- Cerr << "=== Create write session\n";
- TTopicClient client(TDriver(CreateConfig(*setup, discovery.GetDiscoveryAddr())));
- auto writeSession = client.CreateWriteSession(writeSettings);
-
- Cerr << "=== Close write session\n";
- writeSession->Close();
- }
-
- Y_UNIT_TEST(DiscoveryHang) {
- auto setup = CreateSetup(TEST_CASE_NAME);
-
- TMockDiscoveryService discovery;
- discovery.SetGoodEndpoints(*setup);
- discovery.SetDelay(TDuration::Days(1));
-
- Cerr << "=== Create write session\n";
- TTopicClient client(TDriver(CreateConfig(*setup, discovery.GetDiscoveryAddr())));
- auto writeSession = client.CreateWriteSession(CreateWriteSessionSettings());
-
- Cerr << "=== Close write session\n";
- writeSession->Close();
- }
-
- Y_UNIT_TEST(WithoutPartition) {
- // Direct write without partition: happy way.
- auto setup = CreateSetup(TEST_CASE_NAME);
- TMockDiscoveryService discovery;
- discovery.SetGoodEndpoints(*setup);
- auto driverConfig = CreateConfig(*setup, discovery.GetDiscoveryAddr());
- auto* tracingBackend = new TTracingBackend();
- driverConfig.SetLog(std::unique_ptr<TLogBackend>(CreateCompositeLogBackend({new TStreamLogBackend(&Cerr), tracingBackend}).Release()));
- TDriver driver(driverConfig);
- TTopicClient client(driver);
- auto sessionSettings = TWriteSessionSettings()
- .Path(TEST_TOPIC)
- .ProducerId(TEST_MESSAGE_GROUP_ID)
- .MessageGroupId(TEST_MESSAGE_GROUP_ID)
- .DirectWriteToPartition(true);
- auto writeSession = client.CreateSimpleBlockingWriteSession(sessionSettings);
- UNIT_ASSERT(writeSession->Write("message"));
- writeSession->Close();
-
- auto node0_id = std::to_string(setup->GetRuntime().GetNodeId(0));
- TExpectedTrace expected{
- "InitRequest !partition_id !partition_with_generation",
- "InitResponse partition_id=0 session_id",
- "DescribePartitionRequest partition_id=0",
- std::format("DescribePartitionResponse partition_id=0 pl_generation=1 pl_node_id={}", node0_id),
- std::format("PreferredPartitionLocation Generation=1 NodeId={}", node0_id),
- "InitRequest !partition_id pwg_partition_id=0 pwg_generation=1",
- "InitResponse partition_id=0 session_id",
- };
- auto const events = tracingBackend->GetEvents();
- UNIT_ASSERT(expected.Matches(events));
- }
-
Y_UNIT_TEST(WithoutPartitionWithRestart) {
// Direct write without partition: with tablet restart.
auto setup = CreateSetup(TEST_CASE_NAME);
@@ -515,46 +341,6 @@ namespace NYdb::NTopic::NTests {
UNIT_ASSERT(expected.Matches(events));
}
- Y_UNIT_TEST(WithoutPartitionDeadNode) {
- // This test emulates a situation, when InitResponse directs us to an inaccessible node.
- auto setup = CreateSetup(TEST_CASE_NAME);
- TMockDiscoveryService discovery;
- discovery.SetEndpoints(setup->GetRuntime().GetNodeId(0), 1, 0);
- auto driverConfig = CreateConfig(*setup, discovery.GetDiscoveryAddr());
- auto* tracingBackend = new TTracingBackend();
- driverConfig.SetLog(std::unique_ptr<TLogBackend>(CreateCompositeLogBackend({new TStreamLogBackend(&Cerr), tracingBackend}).Release()));
- TDriver driver(driverConfig);
- TTopicClient client(driver);
- auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>();
- auto sessionSettings = TWriteSessionSettings()
- .Path(TEST_TOPIC)
- .MessageGroupId(TEST_MESSAGE_GROUP_ID)
- .DirectWriteToPartition(true)
- .PartitionId(0)
- .RetryPolicy(retryPolicy);
- retryPolicy->Initialize();
- retryPolicy->ExpectBreakDown();
- auto writeSession = client.CreateSimpleBlockingWriteSession(sessionSettings);
-
- retryPolicy->WaitForRetriesSync(1);
- discovery.SetGoodEndpoints(*setup);
- retryPolicy->WaitForRepairSync();
- UNIT_ASSERT(writeSession->Close());
-
- auto node0_id = std::to_string(setup->GetRuntime().GetNodeId(0));
- TExpectedTrace expected{
- "DescribePartitionRequest partition_id=0",
- "Error status=TRANSPORT_UNAVAILABLE",
- "DescribePartitionRequest partition_id=0",
- std::format("DescribePartitionResponse partition_id=0 pl_generation=1 pl_node_id={}", node0_id),
- std::format("PreferredPartitionLocation Generation=1 NodeId={}", node0_id),
- "InitRequest !partition_id pwg_partition_id=0 pwg_generation=1",
- "InitResponse partition_id=0",
- };
- auto const events = tracingBackend->GetEvents();
- UNIT_ASSERT(expected.Matches(events));
- }
-
Y_UNIT_TEST(WithoutPartitionPartitionRelocation) {
// This test emulates partition relocation from one node to another.
auto setup = CreateSetup(TEST_CASE_NAME, 2, /* createTopic = */ false);
diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/trace_ut.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/trace_ut.cpp
deleted file mode 100644
index 133bfb3e65c..00000000000
--- a/ydb/public/sdk/cpp/src/client/topic/ut/trace_ut.cpp
+++ /dev/null
@@ -1,168 +0,0 @@
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/trace.h>
-
-namespace NYdb::NTopic::NTests {
-
- Y_UNIT_TEST_SUITE(Trace) {
-
- Y_UNIT_TEST(SkipSpaces) {
- UNIT_ASSERT_STRINGS_EQUAL(SkipSpaces(""), "");
- UNIT_ASSERT_STRINGS_EQUAL(SkipSpaces(" "), "");
- UNIT_ASSERT_STRINGS_EQUAL(SkipSpaces(" a"), "a");
- UNIT_ASSERT_STRINGS_EQUAL(SkipSpaces(" a "), "a ");
- }
-
- Y_UNIT_TEST(NextToken) {
- UNIT_ASSERT_STRINGS_EQUAL(NextToken(""), "");
- UNIT_ASSERT_STRINGS_EQUAL(NextToken(" "), "");
- UNIT_ASSERT_STRINGS_EQUAL(NextToken("a"), "a");
- UNIT_ASSERT_STRINGS_EQUAL(NextToken(" a"), "a");
- UNIT_ASSERT_STRINGS_EQUAL(NextToken(" a "), "a");
- TStringBuf b("a=1");
- UNIT_ASSERT_STRINGS_EQUAL(NextToken(b, '='), "a");
- UNIT_ASSERT_STRINGS_EQUAL(b, "1");
- }
-
- Y_UNIT_TEST(TTraceEvent) {
- UNIT_ASSERT_TEST_FAILS(TTraceEvent::FromString(""));
- TString const eventName("init");
- {
- TString s(eventName);
- auto ev = TTraceEvent::FromString(s);
- UNIT_ASSERT_STRINGS_EQUAL(ev.Event, eventName);
- UNIT_ASSERT(ev.KeyValues.empty());
- }
- {
- TString s(eventName + " a");
- auto ev = TTraceEvent::FromString(s);
- UNIT_ASSERT_STRINGS_EQUAL(ev.Event, eventName);
- UNIT_ASSERT_EQUAL(ev.KeyValues.size(), 1);
- UNIT_ASSERT(ev.KeyValues.at("a").empty());
- }
- {
- TString s(eventName + " a b");
- auto ev = TTraceEvent::FromString(s);
- UNIT_ASSERT_STRINGS_EQUAL(ev.Event, eventName);
- UNIT_ASSERT_EQUAL(ev.KeyValues.size(), 2);
- UNIT_ASSERT(ev.KeyValues.at("a").empty());
- UNIT_ASSERT(ev.KeyValues.at("b").empty());
- }
- {
- TString s(eventName + " =");
- auto ev = TTraceEvent::FromString(s);
- UNIT_ASSERT_STRINGS_EQUAL(ev.Event, eventName);
- UNIT_ASSERT_EQUAL(ev.KeyValues.size(), 1);
- UNIT_ASSERT(ev.KeyValues.at("").empty());
- }
- {
- TString s(eventName + " a=1 b");
- auto ev = TTraceEvent::FromString(s);
- UNIT_ASSERT_STRINGS_EQUAL(ev.Event, eventName);
- UNIT_ASSERT_EQUAL(ev.KeyValues.size(), 2);
- UNIT_ASSERT_STRINGS_EQUAL(ev.KeyValues.at("a"), "1");
- UNIT_ASSERT(ev.KeyValues.at("b").empty());
- }
- {
- TString s(eventName + " a b=2");
- auto ev = TTraceEvent::FromString(s);
- UNIT_ASSERT_STRINGS_EQUAL(ev.Event, eventName);
- UNIT_ASSERT_EQUAL(ev.KeyValues.size(), 2);
- UNIT_ASSERT(ev.KeyValues.at("a").empty());
- UNIT_ASSERT_STRINGS_EQUAL(ev.KeyValues.at("b"), "2");
- }
- {
- TString s(eventName + " a=1 b=2");
- auto ev = TTraceEvent::FromString(s);
- UNIT_ASSERT_STRINGS_EQUAL(ev.Event, eventName);
- UNIT_ASSERT_EQUAL(ev.KeyValues.size(), 2);
- UNIT_ASSERT_STRINGS_EQUAL(ev.KeyValues.at("a"), "1");
- UNIT_ASSERT_STRINGS_EQUAL(ev.KeyValues.at("b"), "2");
- }
- {
- TExpectedTraceEvent expected = {eventName, {{"a", {"1"}}}, {"d"}};
- UNIT_ASSERT(!expected.Matches({"", {}}));
- UNIT_ASSERT(!expected.Matches({eventName, {}}));
- UNIT_ASSERT(!expected.Matches({eventName, {{"a", ""}}}));
- UNIT_ASSERT(!expected.Matches({eventName, {{"a", "0"}}}));
- UNIT_ASSERT(!expected.Matches({eventName, {{"c", "1"}}}));
- UNIT_ASSERT(expected.Matches({eventName, {{"a", "1"}}}));
- UNIT_ASSERT(expected.Matches({eventName, {{"a", "1"}, {"b", "2"}}}));
- UNIT_ASSERT(!expected.Matches({eventName, {{"a", "1"}, {"d", "4"}}})); // The "d" should NOT appear in the event.
- }
- }
-
- Y_UNIT_TEST(TExpectedTraceEvent) {
- UNIT_ASSERT_TEST_FAILS(TExpectedTraceEvent::FromString(""));
- TString const eventName("init");
- {
- TString s(eventName);
- auto ev = TExpectedTraceEvent::FromString(s);
- UNIT_ASSERT_STRINGS_EQUAL(ev.Event, eventName);
- UNIT_ASSERT(ev.KeyValues.empty());
- }
- {
- TString s(eventName + " a");
- auto ev = TExpectedTraceEvent::FromString(s);
- UNIT_ASSERT_STRINGS_EQUAL(ev.Event, eventName);
- UNIT_ASSERT_EQUAL(ev.KeyValues.size(), 1);
- UNIT_ASSERT(ev.KeyValues.at("a").empty());
- }
- {
- TString s(eventName + " a b");
- auto ev = TExpectedTraceEvent::FromString(s);
- UNIT_ASSERT_STRINGS_EQUAL(ev.Event, eventName);
- UNIT_ASSERT_EQUAL(ev.KeyValues.size(), 2);
- UNIT_ASSERT(ev.KeyValues.at("a").empty());
- UNIT_ASSERT(ev.KeyValues.at("b").empty());
- }
- {
- TString s(eventName + " =");
- auto ev = TExpectedTraceEvent::FromString(s);
- UNIT_ASSERT_STRINGS_EQUAL(ev.Event, eventName);
- UNIT_ASSERT_EQUAL(ev.KeyValues.size(), 1);
- UNIT_ASSERT(ev.KeyValues.at("").empty());
- }
- {
- TString s(eventName + " a=1 b");
- auto ev = TExpectedTraceEvent::FromString(s);
- UNIT_ASSERT_STRINGS_EQUAL(ev.Event, eventName);
- UNIT_ASSERT_EQUAL(ev.KeyValues.size(), 2);
- UNIT_ASSERT_STRINGS_EQUAL(ev.KeyValues.at("a"), "1");
- UNIT_ASSERT(ev.KeyValues.at("b").empty());
- }
- {
- TString s(eventName + " a b=2");
- auto ev = TExpectedTraceEvent::FromString(s);
- UNIT_ASSERT_STRINGS_EQUAL(ev.Event, eventName);
- UNIT_ASSERT_EQUAL(ev.KeyValues.size(), 2);
- UNIT_ASSERT(ev.KeyValues.at("a").empty());
- UNIT_ASSERT_STRINGS_EQUAL(ev.KeyValues.at("b"), "2");
- }
- {
- TString s(eventName + " a=1 b=2");
- auto ev = TExpectedTraceEvent::FromString(s);
- UNIT_ASSERT_STRINGS_EQUAL(ev.Event, eventName);
- UNIT_ASSERT_EQUAL(ev.KeyValues.size(), 2);
- UNIT_ASSERT_STRINGS_EQUAL(ev.KeyValues.at("a"), "1");
- UNIT_ASSERT_STRINGS_EQUAL(ev.KeyValues.at("b"), "2");
- }
- {
- TString s(eventName + " !a");
- auto ev = TExpectedTraceEvent::FromString(s);
- UNIT_ASSERT_STRINGS_EQUAL(ev.Event, eventName);
- UNIT_ASSERT(ev.KeyValues.empty());
- UNIT_ASSERT_EQUAL(ev.DeniedKeys.size(), 1);
- UNIT_ASSERT_STRINGS_EQUAL(ev.DeniedKeys[0], "a");
- }
- }
-
- Y_UNIT_TEST(TExpectedTrace) {
- TExpectedTrace expected{"A", "B"};
- TVector<TTraceEvent> events{{"X", {}}, {"A", {}}, {"X", {}}, {"B", {}}, {"X", {}}};
- UNIT_ASSERT(expected.Matches(events));
- expected = {"A", "B", "C"};
- UNIT_ASSERT(!expected.Matches(events));
- }
- }
-}
diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/ya.make b/ydb/public/sdk/cpp/src/client/topic/ut/ya.make
index 161d1615e1e..d6ce8e8f53c 100644
--- a/ydb/public/sdk/cpp/src/client/topic/ut/ya.make
+++ b/ydb/public/sdk/cpp/src/client/topic/ut/ya.make
@@ -40,7 +40,6 @@ SRCS(
describe_topic_ut.cpp
local_partition_ut.cpp
topic_to_table_ut.cpp
- trace_ut.cpp
)
RESOURCE(
diff --git a/ydb/public/sdk/cpp/tests/integration/basic_example/basic_example.cpp b/ydb/public/sdk/cpp/tests/integration/basic_example/basic_example.cpp
index 9928d4a8d50..63e6590664d 100644
--- a/ydb/public/sdk/cpp/tests/integration/basic_example/basic_example.cpp
+++ b/ydb/public/sdk/cpp/tests/integration/basic_example/basic_example.cpp
@@ -28,7 +28,7 @@ TRunArgs GetRunArgs() {
.SetAuthToken(std::getenv("YDB_TOKEN") ? std::getenv("YDB_TOKEN") : "");
NYdb::TDriver driver(driverConfig);
- return {driver, JoinPath(database, "basic")};
+ return {driver, database + "/" + std::string(std::getenv("YDB_TEST_ROOT")) + "/basic"};
}
///////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/public/sdk/cpp/tests/integration/basic_example/ya.make b/ydb/public/sdk/cpp/tests/integration/basic_example/ya.make
index 93acc5089a1..c7ed4c3f87b 100644
--- a/ydb/public/sdk/cpp/tests/integration/basic_example/ya.make
+++ b/ydb/public/sdk/cpp/tests/integration/basic_example/ya.make
@@ -1,4 +1,6 @@
GTEST()
+
+INCLUDE(${ARCADIA_ROOT}/ydb/public/sdk/cpp/tests/integration/tests_common.inc)
INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)
IF (SANITIZER_TYPE == "thread")
diff --git a/ydb/public/sdk/cpp/tests/integration/bulk_upsert/bulk_upsert.cpp b/ydb/public/sdk/cpp/tests/integration/bulk_upsert/bulk_upsert.cpp
index dee7c04a153..de419179fbe 100644
--- a/ydb/public/sdk/cpp/tests/integration/bulk_upsert/bulk_upsert.cpp
+++ b/ydb/public/sdk/cpp/tests/integration/bulk_upsert/bulk_upsert.cpp
@@ -5,20 +5,9 @@
static constexpr size_t BATCH_SIZE = 1000;
-static std::string JoinPath(const std::string& basePath, const std::string& path) {
- if (basePath.empty()) {
- return path;
- }
-
- std::filesystem::path prefixPathSplit(basePath);
- prefixPathSplit /= path;
-
- return prefixPathSplit;
-}
-
TRunArgs GetRunArgs() {
- std::string database = std::getenv("YDB_DATABASE");
std::string endpoint = std::getenv("YDB_ENDPOINT");
+ std::string database = std::getenv("YDB_DATABASE");
auto driverConfig = TDriverConfig()
.SetEndpoint(endpoint)
@@ -26,7 +15,7 @@ TRunArgs GetRunArgs() {
.SetAuthToken(std::getenv("YDB_TOKEN") ? std::getenv("YDB_TOKEN") : "");
TDriver driver(driverConfig);
- return {driver, JoinPath(database, "bulk")};
+ return {driver, database + "/" + std::string(std::getenv("YDB_TEST_ROOT")) + "/bulk"};
}
TStatus CreateTable(TTableClient& client, const std::string& table) {
diff --git a/ydb/public/sdk/cpp/tests/integration/bulk_upsert/ya.make b/ydb/public/sdk/cpp/tests/integration/bulk_upsert/ya.make
index 6077c176454..e3c72d430ed 100644
--- a/ydb/public/sdk/cpp/tests/integration/bulk_upsert/ya.make
+++ b/ydb/public/sdk/cpp/tests/integration/bulk_upsert/ya.make
@@ -1,4 +1,6 @@
GTEST()
+
+INCLUDE(${ARCADIA_ROOT}/ydb/public/sdk/cpp/tests/integration/tests_common.inc)
INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)
IF (SANITIZER_TYPE == "thread")
diff --git a/ydb/public/sdk/cpp/tests/integration/server_restart/main.cpp b/ydb/public/sdk/cpp/tests/integration/server_restart/main.cpp
index 02cd2d9994a..5f028411aec 100644
--- a/ydb/public/sdk/cpp/tests/integration/server_restart/main.cpp
+++ b/ydb/public/sdk/cpp/tests/integration/server_restart/main.cpp
@@ -139,7 +139,7 @@ protected:
.RegisterService(DisoveryService_.get())
.RegisterService(QueryService_.get())
.BuildAndStart();
-
+
DisoveryService_->SetPort(port);
Driver_ = std::make_unique<TDriver>(TDriverConfig()
diff --git a/ydb/public/sdk/cpp/tests/integration/server_restart/ya.make b/ydb/public/sdk/cpp/tests/integration/server_restart/ya.make
index 6e7bd7e8d9e..c3b5e83a5c8 100644
--- a/ydb/public/sdk/cpp/tests/integration/server_restart/ya.make
+++ b/ydb/public/sdk/cpp/tests/integration/server_restart/ya.make
@@ -1,4 +1,6 @@
GTEST()
+
+INCLUDE(${ARCADIA_ROOT}/ydb/public/sdk/cpp/tests/integration/tests_common.inc)
INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)
IF (SANITIZER_TYPE == "thread")
diff --git a/ydb/public/sdk/cpp/tests/integration/sessions/main.cpp b/ydb/public/sdk/cpp/tests/integration/sessions/main.cpp
index 6a1af34fb27..c5d40085fe1 100644
--- a/ydb/public/sdk/cpp/tests/integration/sessions/main.cpp
+++ b/ydb/public/sdk/cpp/tests/integration/sessions/main.cpp
@@ -19,18 +19,22 @@ using namespace NYdb::NTable;
namespace {
+std::string GetTablePath() {
+ return std::string(std::getenv("YDB_DATABASE")) + "/" + std::string(std::getenv("YDB_TEST_ROOT")) + "/sessions_test_table";
+}
+
void CreateTestTable(NYdb::TDriver& driver) {
NYdb::NTable::TTableClient client(driver);
auto sessionResult = client.GetSession().ExtractValueSync();
ASSERT_TRUE(sessionResult.IsSuccess());
auto session = sessionResult.GetSession();
- auto result = session.ExecuteSchemeQuery(R"___(
- CREATE TABLE `/local/t` (
+ auto result = session.ExecuteSchemeQuery(std::format(R"___(
+ CREATE TABLE `{}` (
Key Uint32,
Value String,
PRIMARY KEY (Key)
);
- )___").ExtractValueSync();
+ )___", GetTablePath())).ExtractValueSync();
ASSERT_TRUE(result.IsSuccess());
ASSERT_EQ(client.GetActiveSessionCount(), 1);
}
@@ -51,9 +55,10 @@ void WarmPoolCreateSession(NYdb::NQuery::TQueryClient& client, std::string& sess
void WaitForSessionsInPool(NYdb::NQuery::TQueryClient& client, std::int64_t expected) {
int attempt = 10;
while (attempt--) {
- if (client.GetCurrentPoolSize() == expected)
+ if (client.GetCurrentPoolSize() == expected) {
break;
- Sleep(TDuration::MilliSeconds(100));
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
ASSERT_EQ(client.GetCurrentPoolSize(), expected);
}
@@ -217,7 +222,7 @@ TEST(YdbSdkSessions, TestSdkFreeSessionAfterBadSessionQueryService) {
auto session = sessionResponse.GetSession();
ASSERT_EQ(session.GetId(), sessionId);
- auto res = session.ExecuteQuery("SELECT * FROM `/local/t`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+ auto res = session.ExecuteQuery(std::format("SELECT * FROM `{}`", GetTablePath()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
ASSERT_EQ(res.GetStatus(), EStatus::BAD_SESSION) << res.GetIssues().ToString();
}
@@ -229,7 +234,7 @@ TEST(YdbSdkSessions, TestSdkFreeSessionAfterBadSessionQueryService) {
ASSERT_TRUE(sessionResponse.IsSuccess());
auto session = sessionResponse.GetSession();
ASSERT_NE(session.GetId(), sessionId);
- auto res = session.ExecuteQuery("SELECT * FROM `/local/t`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+ auto res = session.ExecuteQuery(std::format("SELECT * FROM `{}`", GetTablePath()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
ASSERT_EQ(res.GetStatus(), EStatus::SUCCESS) << res.GetIssues().ToString();
}
@@ -265,7 +270,7 @@ TEST(YdbSdkSessions, TestSdkFreeSessionAfterBadSessionQueryServiceStreamCall) {
auto session = sessionResponse.GetSession();
ASSERT_EQ(session.GetId(), sessionId);
- auto it = session.StreamExecuteQuery("SELECT * FROM `/local/t`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+ auto it = session.StreamExecuteQuery(std::format("SELECT * FROM `{}`", GetTablePath()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
ASSERT_EQ(it.GetStatus(), EStatus::SUCCESS) << it.GetIssues().ToString();
@@ -281,7 +286,7 @@ TEST(YdbSdkSessions, TestSdkFreeSessionAfterBadSessionQueryServiceStreamCall) {
auto session = sessionResponse.GetSession();
ASSERT_NE(session.GetId(), sessionId);
- auto res = session.ExecuteQuery("SELECT * FROM `/local/t`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+ auto res = session.ExecuteQuery(std::format("SELECT * FROM `{}`", GetTablePath()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
ASSERT_EQ(res.GetStatus(), EStatus::SUCCESS) << res.GetIssues().ToString();
}
@@ -305,13 +310,13 @@ TEST(YdbSdkSessions, TestActiveSessionCountAfterTransportError) {
auto sessionResponse = client.GetSession().ExtractValueSync();
ASSERT_TRUE(sessionResponse.IsSuccess());
auto session = sessionResponse.GetSession();
- auto result = session.ExecuteSchemeQuery(R"___(
- CREATE TABLE `/local/t` (
+ auto result = session.ExecuteSchemeQuery(std::format(R"___(
+ CREATE TABLE `{}` (
Key Uint32,
Value String,
PRIMARY KEY (Key)
);
- )___").ExtractValueSync();
+ )___", GetTablePath())).ExtractValueSync();
ASSERT_TRUE(result.IsSuccess());
ASSERT_EQ(client.GetActiveSessionCount(), 1);
}
@@ -322,7 +327,7 @@ TEST(YdbSdkSessions, TestActiveSessionCountAfterTransportError) {
auto session = sessionResponse.GetSession();
// Assume 10us is too small to execute query and get response
- auto res = session.ExecuteDataQuery("SELECT COUNT(*) FROM `/local/t`;",
+ auto res = session.ExecuteDataQuery(std::format("SELECT COUNT(*) FROM `{}`;", GetTablePath()),
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync();
ASSERT_EQ(client.GetActiveSessionCount(), 1);
diff --git a/ydb/public/sdk/cpp/tests/integration/sessions/ya.make b/ydb/public/sdk/cpp/tests/integration/sessions/ya.make
index f263b4eb2b8..cd29861bfc6 100644
--- a/ydb/public/sdk/cpp/tests/integration/sessions/ya.make
+++ b/ydb/public/sdk/cpp/tests/integration/sessions/ya.make
@@ -1,4 +1,6 @@
GTEST()
+
+INCLUDE(${ARCADIA_ROOT}/ydb/public/sdk/cpp/tests/integration/tests_common.inc)
INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)
FORK_SUBTESTS()
diff --git a/ydb/public/sdk/cpp/tests/integration/sessions_pool/main.cpp b/ydb/public/sdk/cpp/tests/integration/sessions_pool/main.cpp
index c12937a25ad..35586965cf1 100644
--- a/ydb/public/sdk/cpp/tests/integration/sessions_pool/main.cpp
+++ b/ydb/public/sdk/cpp/tests/integration/sessions_pool/main.cpp
@@ -10,10 +10,10 @@
using namespace NYdb;
using namespace NYdb::NTable;
-class YdbSdkSessionsPool : public ::testing::TestWithParam<ui32> {
+class YdbSdkSessionsPool : public ::testing::TestWithParam<std::uint32_t> {
protected:
void SetUp() override {
- ui32 maxActiveSessions = GetParam();
+ std::uint32_t maxActiveSessions = GetParam();
Driver = std::make_unique<NYdb::TDriver>(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT")));
auto clientSettings = TClientSettings().SessionPoolSettings(
@@ -35,16 +35,16 @@ protected:
class YdbSdkSessionsPool1Session : public YdbSdkSessionsPool {};
-enum class EAction: ui8 {
+enum class EAction: std::uint8_t {
CreateFuture,
ExtractValue,
Return
};
-using TPlan = std::vector<std::pair<EAction, ui32>>;
+using TPlan = std::vector<std::pair<EAction, std::uint32_t>>;
void CheckPlan(TPlan plan) {
- std::unordered_map<ui32, EAction> sessions;
+ std::unordered_map<std::uint32_t, EAction> sessions;
for (const auto& [action, sessionId]: plan) {
if (action == EAction::CreateFuture) {
ASSERT_FALSE(sessions.contains(sessionId));
@@ -69,10 +69,10 @@ void CheckPlan(TPlan plan) {
}
void RunPlan(const TPlan& plan, NYdb::NTable::TTableClient& client) {
- std::unordered_map<ui32, NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
- std::unordered_map<ui32, NYdb::NTable::TCreateSessionResult> sessions;
+ std::unordered_map<std::uint32_t, NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
+ std::unordered_map<std::uint32_t, NYdb::NTable::TCreateSessionResult> sessions;
- ui32 requestedSessions = 0;
+ std::uint32_t requestedSessions = 0;
for (const auto& [action, sessionId]: plan) {
switch (action) {
@@ -100,8 +100,8 @@ void RunPlan(const TPlan& plan, NYdb::NTable::TTableClient& client) {
}
}
ASSERT_LE(client.GetActiveSessionCount(), client.GetActiveSessionsLimit());
- ASSERT_GE(client.GetActiveSessionCount(), static_cast<i64>(sessions.size()));
- ASSERT_LE(client.GetActiveSessionCount(), static_cast<i64>(sessions.size() + sessionFutures.size()));
+ ASSERT_GE(client.GetActiveSessionCount(), static_cast<std::int64_t>(sessions.size()));
+ ASSERT_LE(client.GetActiveSessionCount(), static_cast<std::int64_t>(sessions.size() + sessionFutures.size()));
}
}
@@ -111,14 +111,14 @@ int GetRand(std::mt19937& rng, int min, int max) {
}
-TPlan GenerateRandomPlan(ui32 numSessions) {
+TPlan GenerateRandomPlan(std::uint32_t numSessions) {
TPlan plan;
std::random_device dev;
std::mt19937 rng(dev());
- for (ui32 i = 0; i < numSessions; ++i) {
+ for (std::uint32_t i = 0; i < numSessions; ++i) {
std::uniform_int_distribution<std::mt19937::result_type> dist(0, plan.size());
- ui32 prevPos = 0;
+ std::uint32_t prevPos = 0;
for (EAction action: {EAction::CreateFuture, EAction::ExtractValue, EAction::Return}) {
int pos = GetRand(rng, prevPos, plan.size());
plan.emplace(plan.begin() + pos, std::make_pair(action, i));
@@ -146,18 +146,18 @@ TEST_P(YdbSdkSessionsPool1Session, GetSession) {
ASSERT_EQ(Client->GetCurrentPoolSize(), 1);
}
-void TestWaitQueue(NYdb::NTable::TTableClient& client, ui32 activeSessionsLimit) {
+void TestWaitQueue(NYdb::NTable::TTableClient& client, std::uint32_t activeSessionsLimit) {
std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
std::vector<NYdb::NTable::TCreateSessionResult> sessions;
// exhaust the pool
- for (ui32 i = 0; i < activeSessionsLimit; ++i) {
+ for (std::uint32_t i = 0; i < activeSessionsLimit; ++i) {
sessions.emplace_back(client.GetSession().ExtractValueSync());
}
ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit);
// next should be in the wait queue
- for (ui32 i = 0; i < activeSessionsLimit * 10; ++i) {
+ for (std::uint32_t i = 0; i < activeSessionsLimit * 10; ++i) {
sessionFutures.emplace_back(client.GetSession());
}
ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit);
@@ -212,17 +212,17 @@ TEST_P(YdbSdkSessionsPool1Session, CustomPlan) {
ASSERT_EQ(Client->GetActiveSessionCount(), 0);
}
-ui32 RunStressTestSync(ui32 n, ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) {
+std::uint32_t RunStressTestSync(std::uint32_t n, std::uint32_t activeSessionsLimit, NYdb::NTable::TTableClient& client) {
std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
std::vector<NYdb::NTable::TCreateSessionResult> sessions;
std::mt19937 rng(0);
- ui32 successCount = 0;
+ std::uint32_t successCount = 0;
- for (ui32 i = 0; i < activeSessionsLimit * 12; ++i) {
+ for (std::uint32_t i = 0; i < activeSessionsLimit * 12; ++i) {
sessionFutures.emplace_back(client.GetSession());
}
- for (ui32 i = 0; i < n; ++i) {
+ for (std::uint32_t i = 0; i < n; ++i) {
switch (static_cast<EAction>(GetRand(rng, 0, 2))) {
case EAction::CreateFuture: {
sessionFutures.emplace_back(client.GetSession());
@@ -259,7 +259,7 @@ ui32 RunStressTestSync(ui32 n, ui32 activeSessionsLimit, NYdb::NTable::TTableCli
}
TEST_P(YdbSdkSessionsPool, StressTestSync) {
- ui32 activeSessionsLimit = GetParam();
+ std::uint32_t activeSessionsLimit = GetParam();
RunStressTestSync(1000, activeSessionsLimit, *Client);
@@ -269,13 +269,13 @@ TEST_P(YdbSdkSessionsPool, StressTestSync) {
ASSERT_EQ(Client->GetCurrentPoolSize(), activeSessionsLimit);
}
-ui32 RunStressTestAsync(ui32 n, ui32 nThreads, NYdb::NTable::TTableClient& client) {
- std::atomic<ui32> successCount(0);
- std::atomic<ui32> jobIndex(0);
+std::uint32_t RunStressTestAsync(std::uint32_t n, std::uint32_t nThreads, NYdb::NTable::TTableClient& client) {
+ std::atomic<std::uint32_t> successCount(0);
+ std::atomic<std::uint32_t> jobIndex(0);
auto job = [&client, &successCount, &jobIndex, n]() mutable {
std::mt19937 rng(++jobIndex);
- for (ui32 i = 0; i < n; ++i) {
+ for (std::uint32_t i = 0; i < n; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100)));
auto sessionFuture = client.GetSession();
std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100)));
@@ -286,7 +286,7 @@ ui32 RunStressTestAsync(ui32 n, ui32 nThreads, NYdb::NTable::TTableClient& clien
};
std::vector<std::thread> threads;
- for (ui32 i = 0; i < nThreads; i++) {
+ for (std::uint32_t i = 0; i < nThreads; i++) {
threads.emplace_back(job);
}
for (auto& thread: threads) {
@@ -297,8 +297,8 @@ ui32 RunStressTestAsync(ui32 n, ui32 nThreads, NYdb::NTable::TTableClient& clien
}
TEST_P(YdbSdkSessionsPool, StressTestAsync) {
- ui32 activeSessionsLimit = GetParam();
- ui32 iterations = (activeSessionsLimit == 1) ? 100 : 1000;
+ std::uint32_t activeSessionsLimit = GetParam();
+ std::uint32_t iterations = (activeSessionsLimit == 1) ? 100 : 1000;
RunStressTestAsync(iterations, 10, *Client);
@@ -308,16 +308,16 @@ TEST_P(YdbSdkSessionsPool, StressTestAsync) {
ASSERT_EQ(Client->GetCurrentPoolSize(), activeSessionsLimit);
}
-void TestPeriodicTask(ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) {
+void TestPeriodicTask(std::uint32_t activeSessionsLimit, NYdb::NTable::TTableClient& client) {
std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
std::vector<NYdb::NTable::TCreateSessionResult> sessions;
- for (ui32 i = 0; i < activeSessionsLimit; ++i) {
+ for (std::uint32_t i = 0; i < activeSessionsLimit; ++i) {
sessions.emplace_back(client.GetSession().ExtractValueSync());
ASSERT_TRUE(sessions.back().IsSuccess());
}
- for (ui32 i = 0; i < activeSessionsLimit; ++i) {
+ for (std::uint32_t i = 0; i < activeSessionsLimit; ++i) {
sessionFutures.emplace_back(client.GetSession());
}
diff --git a/ydb/public/sdk/cpp/tests/integration/sessions_pool/ya.make b/ydb/public/sdk/cpp/tests/integration/sessions_pool/ya.make
index 998a6caec44..4e5a74604b3 100644
--- a/ydb/public/sdk/cpp/tests/integration/sessions_pool/ya.make
+++ b/ydb/public/sdk/cpp/tests/integration/sessions_pool/ya.make
@@ -1,4 +1,6 @@
GTEST()
+
+INCLUDE(${ARCADIA_ROOT}/ydb/public/sdk/cpp/tests/integration/tests_common.inc)
INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)
FORK_SUBTESTS()
diff --git a/ydb/public/sdk/cpp/tests/integration/tests_common.inc b/ydb/public/sdk/cpp/tests/integration/tests_common.inc
new file mode 100644
index 00000000000..5111ca40a75
--- /dev/null
+++ b/ydb/public/sdk/cpp/tests/integration/tests_common.inc
@@ -0,0 +1 @@
+ENV(YDB_TEST_ROOT="sdk_tests")
diff --git a/ydb/public/sdk/cpp/tests/integration/topic/basic_usage.cpp b/ydb/public/sdk/cpp/tests/integration/topic/basic_usage.cpp
index d6360431362..1a8602e8123 100644
--- a/ydb/public/sdk/cpp/tests/integration/topic/basic_usage.cpp
+++ b/ydb/public/sdk/cpp/tests/integration/topic/basic_usage.cpp
@@ -1,3 +1,5 @@
+#include "setup/fixture.h"
+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
#include <ydb/public/sdk/cpp/src/client/persqueue_public/persqueue.h>
@@ -8,15 +10,13 @@
#include <util/generic/overloaded.h>
#include <util/stream/zlib.h>
-#include <gtest/gtest.h>
-
#include <future>
static const bool EnableDirectRead = !std::string{std::getenv("PQ_EXPERIMENTAL_DIRECT_READ") ? std::getenv("PQ_EXPERIMENTAL_DIRECT_READ") : ""}.empty();
-namespace NYdb::NPersQueue::NTests {
+namespace NYdb::inline Dev::NPersQueue::NTests {
class TSimpleWriteSessionTestAdapter {
public:
@@ -40,7 +40,7 @@ std::uint64_t TSimpleWriteSessionTestAdapter::GetAcquiredMessagesCount() const {
}
-namespace NYdb::NTopic::NTests {
+namespace NYdb::inline Dev::NTopic::NTests {
class TManagedExecutor : public IExecutor {
public:
@@ -172,65 +172,6 @@ TIntrusivePtr<TManagedExecutor> CreateSyncManagedExecutor()
return MakeIntrusive<TManagedExecutor>(NYdb::NTopic::CreateSyncExecutor());
}
-class TTopicTestFixture : public ::testing::Test {
-protected:
- void SetUp() override {
- TTopicClient client(MakeDriver());
- client.DropTopic(GetTopicPath()).GetValueSync();
-
- CreateTopic(GetTopicPath());
- }
-
- void TearDown() override {
- DropTopic(GetTopicPath());
- }
-
- void CreateTopic(const std::string& path, const std::string& consumer = "test-consumer", size_t partitionCount = 1,
- std::optional<size_t> maxPartitionCount = std::nullopt) {
- TTopicClient client(MakeDriver());
-
- TCreateTopicSettings topics;
- topics
- .BeginConfigurePartitioningSettings()
- .MinActivePartitions(partitionCount)
- .MaxActivePartitions(maxPartitionCount.value_or(partitionCount));
-
- if (maxPartitionCount.has_value() && maxPartitionCount.value() > partitionCount) {
- topics
- .BeginConfigurePartitioningSettings()
- .BeginConfigureAutoPartitioningSettings()
- .Strategy(EAutoPartitioningStrategy::ScaleUp);
- }
-
- TConsumerSettings<TCreateTopicSettings> consumers(topics, consumer);
- topics.AppendConsumers(consumers);
-
- auto status = client.CreateTopic(path, topics).GetValueSync();
- ASSERT_TRUE(status.IsSuccess());
- }
-
- std::string GetTopicPath() {
- const testing::TestInfo* const testInfo = testing::UnitTest::GetInstance()->current_test_info();
-
- return std::string(testInfo->test_suite_name()) + "/" + std::string(testInfo->name()) + "/test-topic";
- }
-
- void DropTopic(const std::string& path) {
- TTopicClient client(MakeDriver());
- auto status = client.DropTopic(path).GetValueSync();
- ASSERT_TRUE(status.IsSuccess());
- }
-
- TDriver MakeDriver() {
- auto cfg = NYdb::TDriverConfig()
- .SetEndpoint(std::getenv("YDB_ENDPOINT"))
- .SetDatabase(std::getenv("YDB_DATABASE"))
- .SetLog(std::unique_ptr<TLogBackend>(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release()));
-
- return NYdb::TDriver(cfg);
- }
-};
-
class BasicUsage : public TTopicTestFixture {};
TEST_F(BasicUsage, ConnectToYDB) {
diff --git a/ydb/public/sdk/cpp/tests/integration/topic/describe_topic.cpp b/ydb/public/sdk/cpp/tests/integration/topic/describe_topic.cpp
new file mode 100644
index 00000000000..6f587b46a19
--- /dev/null
+++ b/ydb/public/sdk/cpp/tests/integration/topic/describe_topic.cpp
@@ -0,0 +1,250 @@
+#include "setup/fixture.h"
+
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
+
+#include <thread>
+
+namespace NYdb::inline Dev::NTopic::NTests {
+
+class Describe : public TTopicTestFixture {
+protected:
+ void DescribeTopic(TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation) {
+ TDescribeTopicSettings settings;
+ settings.IncludeStats(requireStats);
+ settings.IncludeLocation(requireLocation);
+
+ {
+ auto result = client.DescribeTopic(GetTopicPath(), settings).GetValueSync();
+ EXPECT_TRUE(result.IsSuccess());
+
+ const auto& description = result.GetTopicDescription();
+
+ const auto& partitions = description.GetPartitions();
+ EXPECT_EQ(partitions.size(), 1u);
+
+ const auto& partition = partitions[0];
+ EXPECT_TRUE(partition.GetActive());
+ EXPECT_EQ(partition.GetPartitionId(), 0u);
+
+ if (requireStats) {
+ const auto& stats = description.GetTopicStats();
+
+ if (requireNonEmptyStats) {
+ EXPECT_GT(stats.GetStoreSizeBytes(), 0u);
+ EXPECT_GT(stats.GetBytesWrittenPerMinute(), 0u);
+ EXPECT_GT(stats.GetBytesWrittenPerHour(), 0u);
+ EXPECT_GT(stats.GetBytesWrittenPerDay(), 0u);
+ EXPECT_GT(stats.GetMaxWriteTimeLag(), TDuration::Zero());
+ EXPECT_GT(stats.GetMinLastWriteTime(), TInstant::Zero());
+ } else {
+ EXPECT_EQ(stats.GetStoreSizeBytes(), 0u);
+ }
+ }
+
+ if (requireLocation) {
+ EXPECT_TRUE(partition.GetPartitionLocation());
+ const auto& partitionLocation = *partition.GetPartitionLocation();
+ EXPECT_GT(partitionLocation.GetNodeId(), 0);
+ EXPECT_GE(partitionLocation.GetGeneration(), 0); // greater-or-equal 0
+ }
+ }
+ }
+
+ void DescribeConsumer(TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation){
+ TDescribeConsumerSettings settings;
+ settings.IncludeStats(requireStats);
+ settings.IncludeLocation(requireLocation);
+
+ {
+ auto result = client.DescribeConsumer(GetTopicPath(), "test-consumer", settings).GetValueSync();
+ EXPECT_TRUE(result.IsSuccess()) << result.GetIssues().ToString();
+
+ const auto& description = result.GetConsumerDescription();
+
+ const auto& partitions = description.GetPartitions();
+ EXPECT_EQ(partitions.size(), 1u);
+
+ const auto& partition = partitions[0];
+ EXPECT_TRUE(partition.GetActive());
+ EXPECT_EQ(partition.GetPartitionId(), 0u);
+
+ if (requireStats) {
+ const auto& stats = partition.GetPartitionStats();
+ const auto& consumerStats = partition.GetPartitionConsumerStats();
+ EXPECT_TRUE(stats);
+ EXPECT_TRUE(consumerStats);
+
+ if (requireNonEmptyStats) {
+ EXPECT_GE(stats->GetStartOffset(), 0u);
+ EXPECT_GE(stats->GetEndOffset(), 0u);
+ EXPECT_GT(stats->GetStoreSizeBytes(), 0u);
+ EXPECT_GT(stats->GetLastWriteTime(), TInstant::Zero());
+ EXPECT_GT(stats->GetMaxWriteTimeLag(), TDuration::Zero());
+ EXPECT_GT(stats->GetBytesWrittenPerMinute(), 0u);
+ EXPECT_GT(stats->GetBytesWrittenPerHour(), 0u);
+ EXPECT_GT(stats->GetBytesWrittenPerDay(), 0u);
+
+ EXPECT_GT(consumerStats->GetLastReadOffset(), 0u);
+ EXPECT_GT(consumerStats->GetCommittedOffset(), 0u);
+ EXPECT_GE(consumerStats->GetReadSessionId().size(), 0u);
+ EXPECT_EQ(consumerStats->GetReaderName(), "");
+ EXPECT_GE(consumerStats->GetMaxWriteTimeLag(), TDuration::Seconds(100));
+ } else {
+ EXPECT_EQ(stats->GetStartOffset(), 0u);
+ EXPECT_EQ(consumerStats->GetLastReadOffset(), 0u);
+ }
+ }
+
+ if (requireLocation) {
+ EXPECT_TRUE(partition.GetPartitionLocation());
+ const auto& partitionLocation = *partition.GetPartitionLocation();
+ EXPECT_GT(partitionLocation.GetNodeId(), 0);
+ EXPECT_GE(partitionLocation.GetGeneration(), 0); // greater-or-equal 0
+ }
+ }
+ }
+
+ void DescribePartition(TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation) {
+ TDescribePartitionSettings settings;
+ settings.IncludeStats(requireStats);
+ settings.IncludeLocation(requireLocation);
+
+ std::uint64_t testPartitionId = 0;
+
+ {
+ auto result = client.DescribePartition(GetTopicPath(), testPartitionId, settings).GetValueSync();
+ EXPECT_TRUE(result.IsSuccess()) << result.GetIssues().ToString();
+
+ const auto& description = result.GetPartitionDescription();
+
+ const auto& partition = description.GetPartition();
+ EXPECT_TRUE(partition.GetActive());
+ EXPECT_EQ(partition.GetPartitionId(), testPartitionId);
+
+ if (requireStats) {
+ const auto& stats = partition.GetPartitionStats();
+ EXPECT_TRUE(stats);
+
+ if (requireNonEmptyStats) {
+ EXPECT_GE(stats->GetStartOffset(), 0u);
+ EXPECT_GE(stats->GetEndOffset(), 0u);
+ EXPECT_GT(stats->GetStoreSizeBytes(), 0u);
+ EXPECT_GT(stats->GetLastWriteTime(), TInstant::Zero());
+ EXPECT_GT(stats->GetMaxWriteTimeLag(), TDuration::Zero());
+ EXPECT_GT(stats->GetBytesWrittenPerMinute(), 0u);
+ EXPECT_GT(stats->GetBytesWrittenPerHour(), 0u);
+ EXPECT_GT(stats->GetBytesWrittenPerDay(), 0u);
+ } else {
+ EXPECT_EQ(stats->GetStoreSizeBytes(), 0u);
+ }
+ }
+
+ if (requireLocation) {
+ EXPECT_TRUE(partition.GetPartitionLocation());
+ const auto& partitionLocation = *partition.GetPartitionLocation();
+ EXPECT_GT(partitionLocation.GetNodeId(), 0);
+ EXPECT_GE(partitionLocation.GetGeneration(), 0); // greater-or-equal 0
+ }
+ }
+ }
+};
+
+TEST_F(Describe, Basic) {
+ TTopicClient client(MakeDriver());
+
+ DescribeTopic(client, false, false, false);
+ DescribeConsumer(client, false, false, false);
+ DescribePartition(client, false, false, false);
+}
+
+TEST_F(Describe, Statistics) {
+ // TODO(abcdef): temporarily deleted
+ GTEST_SKIP() << "temporarily deleted";
+
+ TTopicClient client(MakeDriver());
+
+ // Get empty description
+ DescribeTopic(client, true, false, false);
+ DescribeConsumer(client, true, false, false);
+ DescribePartition(client, true, false, false);
+
+ const size_t messagesCount = 1;
+
+ // Write a message
+ {
+ auto writeSettings = TWriteSessionSettings().Path(GetTopicPath()).MessageGroupId("test-message_group_id").Codec(ECodec::RAW);
+ auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings);
+ std::string message(32_MB, 'x');
+
+ for (size_t i = 0; i < messagesCount; ++i) {
+ EXPECT_TRUE(writeSession->Write(message, {}, TInstant::Now() - TDuration::Seconds(100)));
+ }
+ writeSession->Close();
+ }
+
+ // Read a message
+ {
+ auto readSettings = TReadSessionSettings().ConsumerName("test-consumer").AppendTopics(GetTopicPath());
+ auto readSession = client.CreateReadSession(readSettings);
+
+ // Event 1: start partition session
+ {
+ std::optional<TReadSessionEvent::TEvent> event = readSession->GetEvent(true);
+ EXPECT_TRUE(event);
+ auto startPartitionSession = std::get_if<TReadSessionEvent::TStartPartitionSessionEvent>(&event.value());
+ EXPECT_TRUE(startPartitionSession) << DebugString(*event);
+
+ startPartitionSession->Confirm();
+ }
+
+ // Event 2: data received
+ {
+ std::optional<TReadSessionEvent::TEvent> event = readSession->GetEvent(true);
+ EXPECT_TRUE(event);
+ auto dataReceived = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&event.value());
+ EXPECT_TRUE(dataReceived) << DebugString(*event);
+
+ dataReceived->Commit();
+ }
+
+ // Event 3: commit acknowledgement
+ {
+ std::optional<TReadSessionEvent::TEvent> event = readSession->GetEvent(true);
+ EXPECT_TRUE(event);
+ auto commitOffsetAck = std::get_if<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event.value());
+
+ EXPECT_TRUE(commitOffsetAck) << DebugString(*event);
+
+ EXPECT_EQ(commitOffsetAck->GetCommittedOffset(), messagesCount);
+ }
+ }
+
+ // Additional write
+ {
+ auto writeSettings = TWriteSessionSettings().Path(GetTopicPath()).MessageGroupId("test-message_group_id").Codec(ECodec::RAW);
+ auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings);
+ std::string message(32, 'x');
+
+ for(size_t i = 0; i < messagesCount; ++i) {
+ EXPECT_TRUE(writeSession->Write(message));
+ }
+ writeSession->Close();
+ }
+ std::this_thread::sleep_for(std::chrono::seconds(3));
+
+ // Get non-empty description
+
+ DescribeTopic(client, true, true, false);
+ DescribeConsumer(client, true, true, false);
+ DescribePartition(client, true, true, false);
+}
+
+TEST_F(Describe, Location) {
+ TTopicClient client(MakeDriver());
+
+ DescribeTopic(client, false, false, true);
+ DescribeConsumer(client, false, false, true);
+ DescribePartition(client, false, false, true);
+}
+
+}
diff --git a/ydb/public/sdk/cpp/tests/integration/topic/direct_read.cpp b/ydb/public/sdk/cpp/tests/integration/topic/direct_read.cpp
new file mode 100644
index 00000000000..4ce19aa887a
--- /dev/null
+++ b/ydb/public/sdk/cpp/tests/integration/topic/direct_read.cpp
@@ -0,0 +1,2062 @@
+#include "setup/fixture.h"
+
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
+#include <library/cpp/retry/retry_policy.h>
+
+#include <ydb/public/sdk/cpp/src/client/topic/impl/common.h>
+#include <ydb/public/sdk/cpp/src/client/topic/common/executor_impl.h>
+#include <ydb/public/sdk/cpp/src/client/persqueue_public/impl/write_session.h>
+#include <ydb/public/sdk/cpp/src/client/topic/impl/write_session.h>
+
+#include <gtest/gtest.h>
+#include <gmock/gmock.h>
+
+#include <algorithm>
+#include <future>
+
+using namespace ::testing; // Google mock.
+
+
+#define ASSERT_EVENT_TYPE(event, type) \
+ ASSERT_TRUE(std::holds_alternative<type>(event)) \
+ << "Real event got: " << DebugString(event)
+
+#define ASSERT_NOT_EVENT_TYPE(event, type) \
+ ASSERT_TRUE(!std::holds_alternative<type>(event)) \
+ << "Real event got: " << DebugString(event)
+
+
+
+namespace NYdb::inline Dev::NTopic::NTests {
+
+namespace {
+ const char* SERVER_SESSION_ID = "server-session-id-1";
+}
+
+
+template <class TRequest, class TResponse>
+struct TMockProcessorFactory : public ISessionConnectionProcessorFactory<TRequest, TResponse> {
+ using IFactory = ISessionConnectionProcessorFactory<TRequest, TResponse>;
+
+ virtual ~TMockProcessorFactory() {
+ Wait();
+ }
+
+ void CreateProcessor( // ISessionConnectionProcessorFactory method.
+ typename IFactory::TConnectedCallback callback,
+ const TRpcRequestSettings& requestSettings,
+ NYdbGrpc::IQueueClientContextPtr connectContext,
+ TDuration connectTimeout,
+ NYdbGrpc::IQueueClientContextPtr connectTimeoutContext,
+ typename IFactory::TConnectTimeoutCallback connectTimeoutCallback,
+ TDuration connectDelay,
+ NYdbGrpc::IQueueClientContextPtr connectDelayOperationContext) override
+ {
+ ASSERT_FALSE(ConnectedCallback) << "Only one connect at a time is expected";
+ ASSERT_FALSE(ConnectTimeoutCallback) << "Only one connect at a time is expected";
+ ConnectedCallback = callback;
+ ConnectTimeoutCallback = connectTimeoutCallback;
+
+ Y_UNUSED(requestSettings);
+ // TODO Check requestSettings.PreferredEndpoint.GetNodeId()?
+ EXPECT_TRUE(connectContext);
+ EXPECT_TRUE(connectTimeout);
+ EXPECT_TRUE(connectTimeoutContext);
+ EXPECT_TRUE(connectTimeoutCallback);
+ EXPECT_TRUE(!connectDelay || connectDelayOperationContext);
+
+ OnCreateProcessor(++CreateCallsCount);
+ }
+
+ // Handler is called in CreateProcessor() method after parameter validation.
+ MOCK_METHOD(void, OnCreateProcessor, (size_t callNumber)); // 1-based
+
+ // Actions to use in OnCreateProcessor handler:
+ void CreateProcessor(typename IFactory::IProcessor::TPtr processor) { // Success.
+ EXPECT_TRUE(ConnectedCallback);
+ auto cb = std::move(ConnectedCallback);
+ ConnectedCallback = nullptr;
+ ConnectTimeoutCallback = nullptr;
+ {
+ std::lock_guard lock(Lock);
+ CallbackFutures.push(std::async(std::launch::async, std::move(cb), TPlainStatus(), processor));
+ }
+ }
+
+ void FailCreation(EStatus status = EStatus::INTERNAL_ERROR, const std::string& message = {}) { // Fail.
+ EXPECT_TRUE(ConnectedCallback);
+ auto cb = std::move(ConnectedCallback);
+ ConnectedCallback = nullptr;
+ ConnectTimeoutCallback = nullptr;
+ {
+ std::lock_guard lock(Lock);
+ CallbackFutures.push(std::async(std::launch::async, std::move(cb), TPlainStatus(status, message), nullptr));
+ }
+ }
+
+ void Timeout() { // Timeout.
+ EXPECT_TRUE(ConnectTimeoutCallback);
+ auto cb = std::move(ConnectTimeoutCallback);
+ ConnectedCallback = nullptr;
+ ConnectTimeoutCallback = nullptr;
+ {
+ std::lock_guard lock(Lock);
+ CallbackFutures.push(std::async(std::launch::async, std::move(cb), true));
+ }
+ }
+
+ void CreateAndThenTimeout(typename IFactory::IProcessor::TPtr processor) {
+ EXPECT_TRUE(ConnectedCallback);
+ EXPECT_TRUE(ConnectTimeoutCallback);
+ auto cb2 = [cbt = std::move(ConnectTimeoutCallback), cb = std::move(ConnectedCallback), processor]() mutable {
+ cb(TPlainStatus(), std::move(processor));
+ cbt(true);
+ };
+ ConnectedCallback = nullptr;
+ ConnectTimeoutCallback = nullptr;
+ {
+ std::lock_guard lock(Lock);
+ CallbackFutures.push(std::async(std::launch::async, std::move(cb2)));
+ }
+ }
+
+ void FailAndThenTimeout(EStatus status = EStatus::INTERNAL_ERROR, const std::string& message = {}) {
+ EXPECT_TRUE(ConnectedCallback);
+ EXPECT_TRUE(ConnectTimeoutCallback);
+ auto cb2 = [cbt = std::move(ConnectTimeoutCallback), cb = std::move(ConnectedCallback), status, message]() mutable {
+ cb(TPlainStatus(status, message), nullptr);
+ cbt(true);
+ };
+ ConnectedCallback = nullptr;
+ ConnectTimeoutCallback = nullptr;
+ {
+ std::lock_guard lock(Lock);
+ CallbackFutures.push(std::async(std::launch::async, std::move(cb2)));
+ }
+ }
+
+ void TimeoutAndThenCreate(typename IFactory::IProcessor::TPtr processor) {
+ EXPECT_TRUE(ConnectedCallback);
+ EXPECT_TRUE(ConnectTimeoutCallback);
+ auto cb2 = [cbt = std::move(ConnectTimeoutCallback), cb = std::move(ConnectedCallback), processor]() mutable {
+ cbt(true);
+ cb(TPlainStatus(), std::move(processor));
+ };
+ ConnectedCallback = nullptr;
+ ConnectTimeoutCallback = nullptr;
+ {
+ std::lock_guard lock(Lock);
+ CallbackFutures.push(std::async(std::launch::async, std::move(cb2)));
+ }
+ }
+
+ void Wait() {
+ std::queue<std::future<void>> futuresQueue;
+ {
+ std::lock_guard lock(Lock);
+ CallbackFutures.swap(futuresQueue);
+ }
+ while (!futuresQueue.empty()) {
+ futuresQueue.front().wait();
+ futuresQueue.pop();
+ }
+ }
+
+ void Validate() {
+ EXPECT_TRUE(CallbackFutures.empty());
+ ConnectedCallback = nullptr;
+ ConnectTimeoutCallback = nullptr;
+ }
+
+ std::atomic<std::size_t> CreateCallsCount = 0;
+
+private:
+ std::mutex Lock;
+ typename IFactory::TConnectedCallback ConnectedCallback;
+ typename IFactory::TConnectTimeoutCallback ConnectTimeoutCallback;
+ std::queue<std::future<void>> CallbackFutures;
+};
+
+
+struct TStartPartitionSessionRequest {
+ TPartitionId PartitionId;
+ TPartitionSessionId PartitionSessionId;
+ TNodeId NodeId;
+ TGeneration Generation;
+};
+
+struct TStopPartitionSessionRequest {
+ TPartitionSessionId PartitionSessionId;
+ bool Graceful;
+ std::int64_t CommittedOffset;
+ TDirectReadId LastDirectReadId;
+};
+
+
+struct TMockReadSessionProcessor : public TMockProcessorFactory<Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer>::IProcessor {
+ // Request to read.
+ struct TClientReadInfo {
+ TReadCallback Callback;
+ Ydb::Topic::StreamReadMessage::FromServer* Dst;
+
+ operator bool() const {
+ return Dst != nullptr;
+ }
+ };
+
+ // Response from server.
+ struct TServerReadInfo {
+ NYdbGrpc::TGrpcStatus Status;
+ Ydb::Topic::StreamReadMessage::FromServer Response;
+
+ TServerReadInfo& Failure(grpc::StatusCode status = grpc::StatusCode::UNAVAILABLE, const std::string& message = {}, bool internal = false) {
+ Status.GRpcStatusCode = status;
+ Status.InternalError = internal;
+ Status.Msg = message;
+ return *this;
+ }
+
+ TServerReadInfo& InitResponse(const std::string& sessionId) {
+ Response.mutable_init_response()->set_session_id(sessionId);
+ return *this;
+ }
+
+ TServerReadInfo& StartPartitionSessionRequest(TStartPartitionSessionRequest request) {
+ auto* req = Response.mutable_start_partition_session_request();
+
+ auto* session = req->mutable_partition_session();
+ session->set_partition_session_id(request.PartitionSessionId);
+ session->set_partition_id(request.PartitionId);
+
+ auto* location = req->mutable_partition_location();
+ location->set_node_id(request.NodeId);
+ location->set_generation(request.Generation);
+
+ return *this;
+ }
+
+ TServerReadInfo& StopPartitionSession(TStopPartitionSessionRequest request) {
+ auto* req = Response.mutable_stop_partition_session_request();
+ req->set_partition_session_id(request.PartitionSessionId);
+ req->set_graceful(request.Graceful);
+ req->set_committed_offset(request.CommittedOffset);
+ req->set_last_direct_read_id(request.LastDirectReadId);
+ return *this;
+ }
+
+ };
+
+ ~TMockReadSessionProcessor() {
+ Wait();
+ }
+
+ void Cancel() override {
+ }
+
+ void ReadInitialMetadata(std::unordered_multimap<std::string, std::string>* metadata, TReadCallback callback) override {
+ Y_UNUSED(metadata);
+ Y_UNUSED(callback);
+ EXPECT_TRUE(false) << "This method is not expected to be called";
+ }
+
+ void Finish(TReadCallback callback) override {
+ Y_UNUSED(callback);
+ EXPECT_TRUE(false) << "This method is not expected to be called";
+ }
+
+ void AddFinishedCallback(TReadCallback callback) override {
+ Y_UNUSED(callback);
+ EXPECT_TRUE(false) << "This method is not expected to be called";
+ }
+
+ void Read(Ydb::Topic::StreamReadMessage::FromServer* response, TReadCallback callback) override {
+ {
+ std::lock_guard lock(Lock);
+ EXPECT_FALSE(ActiveRead);
+ ActiveRead.Callback = std::move(callback);
+ ActiveRead.Dst = response;
+ if (!ReadResponses.empty()) {
+ StartProcessReadImpl();
+ }
+ }
+ }
+
+ void StartProcessReadImpl() {
+ CallbackFutures.push(std::async(std::launch::async, &TMockReadSessionProcessor::ProcessRead, this));
+ }
+
+ void Write(Ydb::Topic::StreamReadMessage::FromClient&& request, TWriteCallback callback) override {
+ EXPECT_FALSE(callback); // Read session doesn't set callbacks.
+ using FromClient = Ydb::Topic::StreamReadMessage_FromClient;
+
+ switch (request.client_message_case()) {
+ case FromClient::kInitRequest:
+ OnInitRequest(request.init_request());
+ break;
+ case FromClient::kReadRequest:
+ OnReadRequest(request.read_request());
+ break;
+ case FromClient::kCommitOffsetRequest:
+ OnCommitOffsetRequest(request.commit_offset_request());
+ break;
+ case FromClient::kDirectReadAck:
+ OnDirectReadAck(request.direct_read_ack());
+ break;
+ case FromClient::kStartPartitionSessionResponse:
+ OnStartPartitionSessionResponse(request.start_partition_session_response());
+ break;
+ case FromClient::kStopPartitionSessionResponse:
+ OnStopPartitionSessionResponse(request.stop_partition_session_response());
+ break;
+ case FromClient::CLIENT_MESSAGE_NOT_SET:
+ EXPECT_TRUE(false) << "Invalid request";
+ break;
+ default:
+ Y_UNREACHABLE();
+ }
+ }
+ MOCK_METHOD(void, OnInitRequest, (const Ydb::Topic::StreamReadMessage::InitRequest&), ());
+ MOCK_METHOD(void, OnReadRequest, (const Ydb::Topic::StreamReadMessage::ReadRequest&), ());
+ MOCK_METHOD(void, OnDirectReadAck, (const Ydb::Topic::StreamReadMessage::DirectReadAck&), ());
+ MOCK_METHOD(void, OnCommitOffsetRequest, (const Ydb::Topic::StreamReadMessage::CommitOffsetRequest&), ());
+ MOCK_METHOD(void, OnStartPartitionSessionResponse, (const Ydb::Topic::StreamReadMessage::StartPartitionSessionResponse&), ());
+ MOCK_METHOD(void, OnStopPartitionSessionResponse, (const Ydb::Topic::StreamReadMessage::StopPartitionSessionResponse&), ());
+
+ void Wait() {
+ std::queue<std::future<void>> callbackFutures;
+ {
+ std::lock_guard lock(Lock);
+ CallbackFutures.swap(callbackFutures);
+ }
+
+ while (!callbackFutures.empty()) {
+ callbackFutures.front().wait();
+ callbackFutures.pop();
+ }
+ }
+
+ void Validate() {
+ {
+ std::lock_guard lock(Lock);
+ EXPECT_TRUE(ReadResponses.empty());
+ EXPECT_TRUE(CallbackFutures.empty());
+
+ ActiveRead = TClientReadInfo{};
+ }
+ }
+
+ void ProcessRead() {
+ NYdbGrpc::TGrpcStatus status;
+ TReadCallback callback;
+ {
+ std::lock_guard lock(Lock);
+ if (ActiveRead) {
+ *ActiveRead.Dst = ReadResponses.front().Response;
+ ActiveRead.Dst = nullptr;
+ status = std::move(ReadResponses.front().Status);
+ ReadResponses.pop();
+ callback = std::move(ActiveRead.Callback);
+ }
+ }
+ if (callback) {
+ callback(std::move(status));
+ }
+ }
+
+ void AddServerResponse(TServerReadInfo result) {
+ NYdbGrpc::TGrpcStatus status;
+ TReadCallback callback;
+ {
+ std::lock_guard lock(Lock);
+ ReadResponses.emplace(std::move(result));
+ if (ActiveRead) {
+ *ActiveRead.Dst = ReadResponses.front().Response;
+ ActiveRead.Dst = nullptr;
+ status = std::move(ReadResponses.front().Status);
+ ReadResponses.pop();
+ callback = std::move(ActiveRead.Callback);
+ }
+ }
+ if (callback) {
+ callback(std::move(status));
+ }
+ }
+
+ std::mutex Lock;
+ TClientReadInfo ActiveRead;
+ std::queue<TServerReadInfo> ReadResponses;
+ std::queue<std::future<void>> CallbackFutures;
+};
+
+struct TMockDirectReadSessionProcessor : public TMockProcessorFactory<TDirectReadClientMessage, TDirectReadServerMessage>::IProcessor {
+ // Request to read.
+ struct TClientReadInfo {
+ TReadCallback Callback;
+ TDirectReadServerMessage* Dst;
+
+ operator bool() const {
+ return Dst != nullptr;
+ }
+ };
+
+ // Response from server.
+ struct TServerReadInfo {
+ NYdbGrpc::TGrpcStatus Status;
+ TDirectReadServerMessage Response;
+
+ TServerReadInfo& Failure(grpc::StatusCode status = grpc::StatusCode::UNAVAILABLE, const std::string& message = {}, bool internal = false) {
+ Status.GRpcStatusCode = status;
+ Status.InternalError = internal;
+ Status.Msg = message;
+ return *this;
+ }
+
+ TServerReadInfo& InitResponse() {
+ Response.mutable_init_response();
+ return *this;
+ }
+
+ TServerReadInfo& StartDirectReadPartitionSessionResponse(TPartitionSessionId partitionSessionId) {
+ auto* resp = Response.mutable_start_direct_read_partition_session_response();
+ resp->set_partition_session_id(partitionSessionId);
+ return *this;
+ }
+
+ TServerReadInfo& StopDirectReadPartitionSession(Ydb::StatusIds::StatusCode status, TPartitionSessionId partitionSessionId) {
+ auto* req = Response.mutable_stop_direct_read_partition_session();
+ req->set_status(status);
+ req->set_partition_session_id(partitionSessionId);
+ return *this;
+ }
+
+ // Data helpers.
+ TServerReadInfo& PartitionData(TPartitionSessionId partitionSessionId, TDirectReadId directReadId, std::uint64_t bytesSize = 0) {
+ auto* response = Response.mutable_direct_read_response();
+ response->set_partition_session_id(partitionSessionId);
+ response->set_direct_read_id(directReadId);
+ response->set_bytes_size(bytesSize);
+ response->mutable_partition_data()->set_partition_session_id(partitionSessionId);
+ return *this;
+ }
+
+ TServerReadInfo& Batch(
+ const std::string& producerId,
+ Ydb::Topic::Codec codec,
+ TInstant writeTimestamp = TInstant::MilliSeconds(42),
+ const std::vector<std::pair<std::string, std::string>>& writeSessionMeta = {}
+ ) {
+ auto* batch = Response.mutable_direct_read_response()->mutable_partition_data()->add_batches();
+ batch->set_producer_id(producerId);
+ batch->set_codec(codec);
+ *batch->mutable_written_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(writeTimestamp.MilliSeconds());
+ auto* meta = batch->mutable_write_session_meta();
+ for (auto&& [k, v] : writeSessionMeta) {
+ (*meta)[k] = v;
+ }
+ return *this;
+ }
+
+ TServerReadInfo& Message(
+ std::uint64_t offset,
+ const std::string& data,
+ std::uint64_t seqNo = 1,
+ TInstant createdAt = TInstant::MilliSeconds(42),
+ std::int64_t uncompressedSize = 135,
+ const std::string& messageGroupId = "",
+ const std::vector<std::pair<std::string, std::string>>& meta = {}
+ ) {
+ const int lastBatch = Response.direct_read_response().partition_data().batches_size();
+ EXPECT_GT(lastBatch, 0);
+ auto* batch = Response.mutable_direct_read_response()->mutable_partition_data()->mutable_batches(lastBatch - 1);
+ auto* req = batch->add_message_data();
+ req->set_offset(offset);
+ req->set_seq_no(seqNo);
+ *req->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(createdAt.MilliSeconds());
+ req->set_data(data);
+ req->set_message_group_id(messageGroupId);
+ req->set_uncompressed_size(uncompressedSize);
+ for (auto&& [k, v] : meta) {
+ auto* pair = req->add_metadata_items();
+ pair->set_key(k);
+ pair->set_value(v);
+ }
+ return *this;
+ }
+ };
+
+ virtual ~TMockDirectReadSessionProcessor() {
+ Wait();
+ }
+
+ void Cancel() override {
+ }
+
+ void ReadInitialMetadata(std::unordered_multimap<std::string, std::string>* metadata, TReadCallback callback) override {
+ Y_UNUSED(metadata);
+ Y_UNUSED(callback);
+ EXPECT_TRUE(false) << "This method is not expected to be called";
+ }
+
+ void Finish(TReadCallback callback) override {
+ Y_UNUSED(callback);
+ EXPECT_TRUE(false) << "This method is not expected to be called";
+ }
+
+ void AddFinishedCallback(TReadCallback callback) override {
+ Y_UNUSED(callback);
+ EXPECT_TRUE(false) << "This method is not expected to be called";
+ }
+
+ void Read(TDirectReadServerMessage* response, TReadCallback callback) override {
+ NYdbGrpc::TGrpcStatus status;
+ TReadCallback cb;
+ {
+ std::lock_guard lock(Lock);
+ std::cerr << "XXXXX Read 1 " << response->DebugString() << "\n";
+ EXPECT_FALSE(ActiveRead);
+ ActiveRead.Callback = std::move(callback);
+ ActiveRead.Dst = response;
+ if (!ReadResponses.empty()) {
+ std::cerr << "XXXXX Read 2 " << response->DebugString() << "\n";
+ *ActiveRead.Dst = ReadResponses.front().Response;
+ ActiveRead.Dst = nullptr;
+ status = std::move(ReadResponses.front().Status);
+ ReadResponses.pop();
+ cb = std::move(ActiveRead.Callback);
+ }
+ }
+ if (cb) {
+ std::cerr << "XXXXX Read 3 " << response->DebugString() << "\n";
+ cb(std::move(status));
+ }
+ }
+
+ void StartProcessReadImpl() {
+ CallbackFutures.push(std::async(std::launch::async, &TMockDirectReadSessionProcessor::ProcessRead, this));
+ }
+
+ void Write(TDirectReadClientMessage&& request, TWriteCallback callback) override {
+ EXPECT_FALSE(callback); // Read session doesn't set callbacks.
+ switch (request.client_message_case()) {
+ case TDirectReadClientMessage::kInitRequest:
+ OnInitRequest(request.init_request());
+ break;
+ case TDirectReadClientMessage::kStartDirectReadPartitionSessionRequest:
+ OnStartDirectReadPartitionSessionRequest(request.start_direct_read_partition_session_request());
+ break;
+ case TDirectReadClientMessage::kUpdateTokenRequest:
+ OnUpdateTokenRequest(request.update_token_request());
+ break;
+ case TDirectReadClientMessage::CLIENT_MESSAGE_NOT_SET:
+ EXPECT_TRUE(false) << "Invalid request";
+ break;
+ }
+ }
+
+ MOCK_METHOD(void, OnInitRequest, (const Ydb::Topic::StreamDirectReadMessage::InitRequest&), ());
+ MOCK_METHOD(void, OnStartDirectReadPartitionSessionRequest, (const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest&), ());
+ MOCK_METHOD(void, OnUpdateTokenRequest, (const Ydb::Topic::UpdateTokenRequest&), ());
+
+ void Wait() {
+ std::queue<std::future<void>> callbackFutures;
+ {
+ std::lock_guard lock(Lock);
+ CallbackFutures.swap(callbackFutures);
+ }
+
+ while (!callbackFutures.empty()) {
+ callbackFutures.front().wait();
+ callbackFutures.pop();
+ }
+ }
+
+ void Validate() {
+ std::cerr << "XXXXX Validate\n";
+ {
+ std::lock_guard lock(Lock);
+ EXPECT_TRUE(ReadResponses.empty());
+ EXPECT_TRUE(CallbackFutures.empty());
+
+ ActiveRead = TClientReadInfo{};
+ }
+ }
+
+ void ProcessRead() {
+ std::cerr << "XXXXX ProcessRead\n";
+ NYdbGrpc::TGrpcStatus status;
+ TReadCallback callback;
+ // GotActiveRead.GetFuture().Wait();
+ {
+ std::lock_guard lock(Lock);
+ *ActiveRead.Dst = ReadResponses.front().Response;
+ ActiveRead.Dst = nullptr;
+ status = std::move(ReadResponses.front().Status);
+ ReadResponses.pop();
+ callback = std::move(ActiveRead.Callback);
+ }
+ callback(std::move(status));
+ }
+
+ void AddServerResponse(TServerReadInfo result) {
+ NYdbGrpc::TGrpcStatus status;
+ TReadCallback callback;
+ {
+ std::lock_guard lock(Lock);
+ std::cerr << "XXXXX AddServerResponse 1 " << result.Response.DebugString() << "\n";
+ ReadResponses.emplace(std::move(result));
+ if (ActiveRead) {
+ std::cerr << "XXXXX AddServerResponse 2\n";
+ *ActiveRead.Dst = ReadResponses.front().Response;
+ ActiveRead.Dst = nullptr;
+ status = std::move(ReadResponses.front().Status);
+ ReadResponses.pop();
+ callback = std::move(ActiveRead.Callback);
+ }
+ }
+ if (callback) {
+ std::cerr << "XXXXX AddServerResponse 3\n";
+ callback(std::move(status));
+ }
+ }
+
+ std::mutex Lock;
+ // NThreading::TPromise<void> GotActiveRead = NThreading::NewPromise();
+ TClientReadInfo ActiveRead;
+ std::queue<TServerReadInfo> ReadResponses;
+ std::queue<std::future<void>> CallbackFutures;
+};
+
+class TMockRetryPolicy : public IRetryPolicy {
+public:
+ MOCK_METHOD(IRetryPolicy::IRetryState::TPtr, CreateRetryState, (), (const, override));
+ TMaybe<TDuration> Delay;
+};
+
+class TMockRetryState : public IRetryPolicy::IRetryState {
+public:
+ TMockRetryState(std::shared_ptr<TMockRetryPolicy> policy)
+ : Policy(policy) {}
+
+ TMaybe<TDuration> GetNextRetryDelay(EStatus) {
+ return Policy->Delay;
+ }
+private:
+ std::shared_ptr<TMockRetryPolicy> Policy;
+};
+
+// Class for testing read session impl with mocks.
+class TDirectReadSessionImplTestSetup {
+public:
+ // Types
+ using IDirectReadSessionConnectionProcessorFactory = ISessionConnectionProcessorFactory<TDirectReadClientMessage, TDirectReadServerMessage>;
+ using TMockDirectReadProcessorFactory = TMockProcessorFactory<TDirectReadClientMessage, TDirectReadServerMessage>;
+ using TMockReadProcessorFactory = TMockProcessorFactory<Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer>;
+
+ struct TFakeContext : public NYdbGrpc::IQueueClientContext {
+ IQueueClientContextPtr CreateContext() override {
+ return std::make_shared<TFakeContext>();
+ }
+
+ grpc::CompletionQueue* CompletionQueue() override {
+ EXPECT_TRUE(false) << "This method is not expected to be called";
+ return nullptr;
+ }
+
+ bool IsCancelled() const override {
+ EXPECT_TRUE(false) << "This method is not expected to be called";
+ return false;
+ }
+
+ bool Cancel() override {
+ return false;
+ }
+
+ void SubscribeCancel(std::function<void()>) override {
+ EXPECT_TRUE(false) << "This method is not expected to be called";
+ }
+ };
+
+ // Methods
+ TDirectReadSessionImplTestSetup();
+ ~TDirectReadSessionImplTestSetup() noexcept(false); // Performs extra validation and UNIT_ASSERTs
+
+ TSingleClusterReadSessionImpl<false>* GetControlSession();
+ TDirectReadSession* GetDirectReadSession(IDirectReadSessionControlCallbacks::TPtr);
+ void WaitForWorkingDirectReadSession();
+
+ std::shared_ptr<TReadSessionEventsQueue<false>> GetEventsQueue();
+ IExecutor::TPtr GetDefaultExecutor();
+
+ void SuccessfulInit(bool flag = true);
+
+ void AddControlResponse(TMockReadSessionProcessor::TServerReadInfo&);
+ void AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo&);
+
+ // Assertions.
+ void AssertNoEvents();
+
+public:
+ // Members
+ TReadSessionSettings ReadSessionSettings;
+ TLog Log = CreateLogBackend("cerr");
+ std::shared_ptr<TReadSessionEventsQueue<false>> EventsQueue;
+ std::shared_ptr<TFakeContext> FakeContext = std::make_shared<TFakeContext>();
+ std::shared_ptr<TMockRetryPolicy> MockRetryPolicy = std::make_shared<TMockRetryPolicy>();
+ std::shared_ptr<TMockReadProcessorFactory> MockReadProcessorFactory = std::make_shared<TMockReadProcessorFactory>();
+ std::shared_ptr<TMockDirectReadProcessorFactory> MockDirectReadProcessorFactory = std::make_shared<TMockDirectReadProcessorFactory>();
+ TIntrusivePtr<TMockReadSessionProcessor> MockReadProcessor = MakeIntrusive<TMockReadSessionProcessor>();
+ TIntrusivePtr<TMockDirectReadSessionProcessor> MockDirectReadProcessor = MakeIntrusive<TMockDirectReadSessionProcessor>();
+
+ TSingleClusterReadSessionImpl<false>::TPtr SingleClusterReadSession;
+ TSingleClusterReadSessionContextPtr SingleClusterReadSessionContextPtr;
+
+ TDirectReadSessionManager::TPtr DirectReadSessionManagerPtr;
+ TDirectReadSession::TPtr DirectReadSessionPtr;
+ TDirectReadSessionContextPtr DirectReadSessionContextPtr;
+
+ std::shared_ptr<TThreadPool> ThreadPool;
+ IExecutor::TPtr DefaultExecutor;
+};
+
+TDirectReadSessionImplTestSetup::TDirectReadSessionImplTestSetup() {
+ ReadSessionSettings
+ // .DirectRead(true)
+ .AppendTopics({"TestTopic"})
+ .ConsumerName("TestConsumer")
+ .RetryPolicy(NYdb::NTopic::IRetryPolicy::GetFixedIntervalPolicy(TDuration::MilliSeconds(10)))
+ .Counters(MakeIntrusive<NYdb::NTopic::TReaderCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>()));
+
+ Log.SetFormatter(GetPrefixLogFormatter(""));
+}
+
+TDirectReadSessionImplTestSetup::~TDirectReadSessionImplTestSetup() noexcept(false) {
+ if (!std::uncaught_exceptions()) { // Exiting from test successfully. Check additional expectations.
+ MockReadProcessorFactory->Wait();
+ MockReadProcessor->Wait();
+
+ MockReadProcessorFactory->Validate();
+ MockReadProcessor->Validate();
+
+ MockDirectReadProcessorFactory->Wait();
+ MockDirectReadProcessor->Wait();
+
+ MockDirectReadProcessorFactory->Validate();
+ MockDirectReadProcessor->Validate();
+ }
+
+ if (SingleClusterReadSessionContextPtr) {
+ if (auto session = SingleClusterReadSessionContextPtr->LockShared()) {
+ session->Close({});
+ }
+ SingleClusterReadSessionContextPtr->Cancel();
+ }
+
+ if (DirectReadSessionContextPtr) {
+ if (auto session = DirectReadSessionContextPtr->LockShared()) {
+ session->Close();
+ }
+ DirectReadSessionContextPtr->Cancel();
+ }
+
+ SingleClusterReadSession = nullptr;
+
+ if (ThreadPool) {
+ ThreadPool->Stop();
+ }
+}
+
+void TDirectReadSessionImplTestSetup::AddControlResponse(TMockReadSessionProcessor::TServerReadInfo& response) {
+ MockReadProcessor->AddServerResponse(response);
+}
+
+void TDirectReadSessionImplTestSetup::AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo& response) {
+ MockDirectReadProcessor->AddServerResponse(response);
+}
+
+void TDirectReadSessionImplTestSetup::SuccessfulInit(bool hasInitRequest) {
+ EXPECT_CALL(*MockReadProcessorFactory, OnCreateProcessor(1))
+ .WillOnce([&](){ MockReadProcessorFactory->CreateProcessor(MockReadProcessor); });
+ if (hasInitRequest) {
+ EXPECT_CALL(*MockReadProcessor, OnInitRequest(_));
+ }
+ AddControlResponse(TMockReadSessionProcessor::TServerReadInfo().InitResponse("session-1"));
+ GetControlSession()->Start();
+ MockReadProcessorFactory->Wait();
+ MockReadProcessor->Wait();
+}
+
+std::shared_ptr<TReadSessionEventsQueue<false>> TDirectReadSessionImplTestSetup::GetEventsQueue() {
+ if (!EventsQueue) {
+ EventsQueue = std::make_shared<TReadSessionEventsQueue<false>>(ReadSessionSettings);
+ }
+ return EventsQueue;
+}
+
+void TDirectReadSessionImplTestSetup::AssertNoEvents() {
+ std::optional<TReadSessionEvent::TEvent> event = GetEventsQueue()->GetEvent(false);
+ EXPECT_FALSE(event);
+}
+
+IExecutor::TPtr TDirectReadSessionImplTestSetup::GetDefaultExecutor() {
+ if (!DefaultExecutor) {
+ ThreadPool = std::make_shared<TThreadPool>();
+ ThreadPool->Start(1);
+ DefaultExecutor = CreateThreadPoolExecutorAdapter(ThreadPool);
+ }
+ return DefaultExecutor;
+}
+
+TSingleClusterReadSessionImpl<false>* TDirectReadSessionImplTestSetup::GetControlSession() {
+ if (!SingleClusterReadSession) {
+ if (!ReadSessionSettings.DecompressionExecutor_) {
+ ReadSessionSettings.DecompressionExecutor(GetDefaultExecutor());
+ }
+ if (!ReadSessionSettings.EventHandlers_.HandlersExecutor_) {
+ ReadSessionSettings.EventHandlers_.HandlersExecutor(GetDefaultExecutor());
+ }
+ SingleClusterReadSessionContextPtr = MakeWithCallbackContext<TSingleClusterReadSessionImpl<false>>(
+ ReadSessionSettings,
+ "db",
+ "client-session-id-1",
+ "",
+ Log,
+ MockReadProcessorFactory,
+ GetEventsQueue(),
+ FakeContext,
+ 1,
+ 1,
+ TSingleClusterReadSessionImpl<false>::TScheduleCallbackFunc {},
+ MockDirectReadProcessorFactory);
+ SingleClusterReadSession = SingleClusterReadSessionContextPtr->TryGet();
+ }
+ return SingleClusterReadSession.get();
+}
+
+TDirectReadSession* TDirectReadSessionImplTestSetup::GetDirectReadSession(IDirectReadSessionControlCallbacks::TPtr controlCallbacks) {
+ if (!DirectReadSessionPtr) {
+ DirectReadSessionContextPtr = MakeWithCallbackContext<TDirectReadSession>(
+ TNodeId(1),
+ SERVER_SESSION_ID,
+ ReadSessionSettings,
+ controlCallbacks,
+ FakeContext,
+ MockDirectReadProcessorFactory,
+ Log);
+ DirectReadSessionPtr = DirectReadSessionContextPtr->TryGet();
+ }
+ return DirectReadSessionPtr.get();
+}
+
+void TDirectReadSessionImplTestSetup::WaitForWorkingDirectReadSession() {
+ while (DirectReadSessionPtr->State != TDirectReadSession::EState::WORKING) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+}
+
+/*
+This suite tests direct read mode only through IReadSession, without using internal classes.
+*/
+
+class DirectReadWithClient : public TTopicTestFixture {};
+
+TEST_F(DirectReadWithClient, OneMessage) {
+ /*
+ The simplest case: write one message and read it back.
+ */
+
+ TTopicClient client{MakeDriver()};
+
+ {
+ // Write a message:
+
+ auto settings = TWriteSessionSettings()
+ .Path(GetTopicPath())
+ .ProducerId("test-message_group_id")
+ .MessageGroupId("test-message_group_id");
+ auto writer = client.CreateSimpleBlockingWriteSession(settings);
+ ASSERT_TRUE(writer->Write("message"));
+ writer->Close();
+ }
+
+ {
+ // Read the message:
+
+ auto settings = TReadSessionSettings()
+ .ConsumerName("test-consumer")
+ .AppendTopics(GetTopicPath())
+ // .DirectRead(true)
+ ;
+ auto reader = client.CreateReadSession(settings);
+
+ {
+ // Start partition session:
+ auto event = reader->GetEvent(true);
+ ASSERT_TRUE(event);
+ ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TStartPartitionSessionEvent);
+ std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event).Confirm();
+ }
+
+ {
+ // Receive the message and commit.
+ auto event = reader->GetEvent(true);
+ ASSERT_TRUE(event);
+ ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
+ auto& dataReceived = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
+ auto& messages = dataReceived.GetMessages();
+ ASSERT_EQ(messages.size(), 1u);
+ dataReceived.Commit();
+ }
+
+ {
+ // Get commit ack.
+ auto event = reader->GetEvent(true);
+ ASSERT_TRUE(event);
+ ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TCommitOffsetAcknowledgementEvent);
+ }
+ }
+}
+
+TEST_F(DirectReadWithClient, ManyMessages) {
+ /*
+ Write many messages and read them back.
+
+ Don't compress messages and set MaxMemoryUsageBytes for the reader to 1MB,
+ so the server sends multiple DirectReadResponses.
+ */
+
+ DropTopic(GetTopicPath());
+
+ constexpr std::size_t partitionCount = 2;
+ std::size_t messageCount = 100;
+ std::size_t totalMessageCount = partitionCount * messageCount;
+ CreateTopic(GetTopicPath(), "test-consumer", partitionCount);
+ TTopicClient client{MakeDriver()};
+
+ std::string message(950_KB, 'x');
+
+ // Write messages to all partitions:
+ for (std::size_t partitionId = 0; partitionId < partitionCount; ++partitionId) {
+ std::string messageGroup = "test-message_group_id_" + std::to_string(partitionId);
+ auto settings = TWriteSessionSettings()
+ .Path(GetTopicPath())
+ .Codec(ECodec::RAW)
+ .PartitionId(partitionId)
+ .ProducerId(messageGroup)
+ .MessageGroupId(messageGroup);
+
+ auto writer = client.CreateSimpleBlockingWriteSession(settings);
+ for (std::size_t i = 0; i < messageCount; ++i) {
+ ASSERT_TRUE(writer->Write(message)) << "Failed to write message " << i << " to partition " << partitionId;
+ }
+ ASSERT_TRUE(writer->Close());
+ }
+
+ std::atomic<bool> work = true;
+
+ auto killer = std::thread([&]() {
+ while (work.load()) {
+ std::this_thread::sleep_for(std::chrono::seconds(5));
+ // setup.GetServer().KillTopicPqrbTablet(setup.GetTopicPath());
+ }
+ });
+
+ {
+ // Read messages:
+
+ std::size_t gotMessages = 0;
+ std::array<std::size_t, partitionCount> committedOffset{};
+ auto settings = TReadSessionSettings()
+ .ConsumerName("test-consumer")
+ .AppendTopics(GetTopicPath())
+ .MaxMemoryUsageBytes(1_MB)
+ // .DirectRead(GetEnv("DIRECT", "0") == "1")
+ ;
+
+ std::shared_ptr<IReadSession> reader;
+
+ settings.EventHandlers_.SimpleDataHandlers(
+ [&](NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& e) {
+ gotMessages += e.GetMessages().size();
+ std::cerr << "XXXXX gotMessages: " << gotMessages << " partition_id: " << e.GetPartitionSession()->GetPartitionId() << "\n";
+ e.Commit();
+ });
+
+ settings.EventHandlers_.CommitOffsetAcknowledgementHandler(
+ [&](NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent& e) {
+ auto partitionId = e.GetPartitionSession()->GetPartitionId();
+ committedOffset[partitionId] = e.GetCommittedOffset();
+ std::cerr << "XXXXX committedOffset: ";
+ for (auto offset : committedOffset) {
+ std::cerr << offset << " ";
+ }
+ std::cerr << std::endl;
+ if (std::ranges::all_of(committedOffset, [&](size_t offset) { return offset == messageCount; })) {
+ reader->Close();
+ }
+ });
+
+ reader = client.CreateReadSession(settings);
+
+ reader->GetEvent(/*block = */true);
+
+ ASSERT_EQ(gotMessages, totalMessageCount);
+ }
+
+ work.store(false);
+ killer.join();
+}
+
+/*
+This suite tests direct read sessions together with a control session.
+*/
+
+class DirectReadWithControlSession : public TTopicTestFixture {};
+
+void SuccessfulInitImpl(bool thenTimeout) {
+ TDirectReadSessionImplTestSetup setup;
+ setup.ReadSessionSettings
+ .MaxLag(TDuration::Seconds(32))
+ .ReadFromTimestamp(TInstant::Seconds(42));
+
+ setup.ReadSessionSettings.Topics_[0]
+ .ReadFromTimestamp(TInstant::Seconds(146))
+ .AppendPartitionIds(100)
+ .AppendPartitionIds(101);
+
+ {
+ ::testing::InSequence seq;
+
+ EXPECT_CALL(*setup.MockReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&](){
+ if (thenTimeout) {
+ setup.MockReadProcessorFactory->CreateAndThenTimeout(setup.MockReadProcessor);
+ } else {
+ setup.MockReadProcessorFactory->CreateProcessor(setup.MockReadProcessor);
+ }
+ });
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnInitRequest(_))
+ .WillOnce(Invoke([&setup](const Ydb::Topic::StreamReadMessage::InitRequest& req) {
+ ASSERT_EQ(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
+ ASSERT_TRUE(req.direct_read());
+ ASSERT_EQ(req.topics_read_settings_size(), 1);
+ ASSERT_EQ(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
+ ASSERT_EQ(req.topics_read_settings(0).read_from().seconds(), 146);
+ ASSERT_EQ(req.topics_read_settings(0).partition_ids_size(), 2);
+ ASSERT_EQ(req.topics_read_settings(0).partition_ids(0), 100);
+ ASSERT_EQ(req.topics_read_settings(0).partition_ids(1), 101);
+ }));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_));
+ }
+
+ setup.GetControlSession()->Start();
+ setup.MockReadProcessorFactory->Wait();
+
+ setup.AddControlResponse(TMockReadSessionProcessor::TServerReadInfo().InitResponse("session id"));
+
+ setup.AssertNoEvents();
+}
+
+TEST_F(DirectReadWithControlSession, Init) {
+ SuccessfulInitImpl(true);
+ SuccessfulInitImpl(false);
+}
+
+TEST_F(DirectReadWithControlSession, StopPartitionSessionGracefully) {
+ auto const startPartitionSessionRequest = TStartPartitionSessionRequest{
+ .PartitionId = 1,
+ .PartitionSessionId = 2,
+ .NodeId = 3,
+ .Generation = 4,
+ };
+
+ auto const stopPartitionSessionRequest = TStopPartitionSessionRequest{
+ .PartitionSessionId = 2,
+ .Graceful = true,
+ .CommittedOffset = 0,
+ .LastDirectReadId = 5,
+ };
+
+ TDirectReadSessionImplTestSetup setup;
+ setup.ReadSessionSettings.Topics_[0].AppendPartitionIds(startPartitionSessionRequest.PartitionId);
+
+ {
+ {
+ ::testing::InSequence seq;
+
+ EXPECT_CALL(*setup.MockReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&]() {
+ setup.MockReadProcessorFactory->CreateProcessor(setup.MockReadProcessor);
+ });
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnInitRequest(_))
+ .WillOnce(Invoke([&](const Ydb::Topic::StreamReadMessage::InitRequest& req) {
+ ASSERT_TRUE(req.direct_read());
+ ASSERT_EQ(req.topics_read_settings_size(), 1);
+ ASSERT_EQ(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
+ ASSERT_EQ(req.topics_read_settings(0).partition_ids_size(), 1);
+ ASSERT_EQ(req.topics_read_settings(0).partition_ids(0), startPartitionSessionRequest.PartitionId);
+ }));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnStartPartitionSessionResponse(_))
+ .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamReadMessage::StartPartitionSessionResponse& resp) {
+ ASSERT_EQ(static_cast<std::uint64_t>(resp.partition_session_id()), startPartitionSessionRequest.PartitionSessionId);
+ }));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnDirectReadAck(_))
+ .Times(4);
+ }
+
+ // There are two sequences, because OnCreateProcessor from the second sequence may be called
+ // before OnStartPartitionSessionResponse from the first sequence.
+
+ {
+ ::testing::InSequence sequence;
+
+ EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&]() {
+ setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor);
+ });
+
+ EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
+ .WillOnce(Invoke([&setup](const Ydb::Topic::StreamDirectReadMessage::InitRequest& req) {
+ ASSERT_EQ(req.session_id(), SERVER_SESSION_ID);
+ ASSERT_EQ(static_cast<std::size_t>(req.topics_read_settings_size()), setup.ReadSessionSettings.Topics_.size());
+ ASSERT_EQ(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
+ ASSERT_EQ(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
+ }));
+
+ EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest& request) {
+ ASSERT_EQ(static_cast<std::uint64_t>(request.partition_session_id()), startPartitionSessionRequest.PartitionSessionId);
+ ASSERT_EQ(request.generation(), startPartitionSessionRequest.Generation);
+ }));
+
+ // Expect OnReadRequest in case it is called before the test ends.
+ // TODO(qyryq) Fix number, not 10.
+ EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_)).Times(AtMost(10));
+ }
+ }
+
+ setup.GetControlSession()->Start();
+ setup.MockReadProcessorFactory->Wait();
+ setup.AddControlResponse(TMockReadSessionProcessor::TServerReadInfo().InitResponse(SERVER_SESSION_ID));
+ setup.AddControlResponse(TMockReadSessionProcessor::TServerReadInfo().StartPartitionSessionRequest(startPartitionSessionRequest));
+
+ {
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ ASSERT_TRUE(event);
+ ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TStartPartitionSessionEvent);
+ std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event).Confirm();
+ }
+
+ setup.AddControlResponse(TMockReadSessionProcessor::TServerReadInfo()
+ .StopPartitionSession(stopPartitionSessionRequest));
+
+ setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ .InitResponse());
+
+ setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ .StartDirectReadPartitionSessionResponse(startPartitionSessionRequest.PartitionSessionId));
+
+ std::size_t offset = 0, i = 0;
+
+ // Verify that the session receives data sent to direct read session:
+ for (std::size_t directReadId = 1; directReadId < stopPartitionSessionRequest.LastDirectReadId; ++directReadId) {
+ auto resp = TMockDirectReadSessionProcessor::TServerReadInfo()
+ .PartitionData(startPartitionSessionRequest.PartitionSessionId, directReadId)
+ // TODO(qyryq) Test with compression!
+ // .Batch("producer-id-1", Ydb::Topic::Codec::CODEC_ZSTD);
+ .Batch("producer-id-1", Ydb::Topic::Codec::CODEC_RAW);
+
+ resp.Message(offset, TStringBuilder() << "message-" << offset, offset);
+ ++offset;
+ resp.Message(offset, TStringBuilder() << "message-" << offset, offset);
+ ++offset;
+ setup.AddDirectReadResponse(resp);
+
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ ASSERT_TRUE(event);
+ ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
+ auto& e = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
+ i += e.GetMessagesCount();
+ }
+
+ while (i < offset) {
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ ASSERT_TRUE(event);
+ ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
+ auto& e = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
+ i += e.GetMessagesCount();
+ }
+
+ {
+ // Verify that the session receives TStopPartitionSessionEvent(graceful=true) after data was received:
+
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ ASSERT_TRUE(event);
+ ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TStopPartitionSessionEvent);
+ auto e = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&*event);
+ e->Confirm();
+ }
+
+ {
+ // Verify that the session receives TPartitionSessionClosedEvent after data was received:
+
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ ASSERT_TRUE(event);
+ ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TPartitionSessionClosedEvent);
+ // auto e = std::get_if<TReadSessionEvent::TPartitionSessionClosedEvent>(&*event);
+ }
+
+ setup.AssertNoEvents();
+
+ // ::testing::Mock::VerifyAndClear(setup.MockDirectReadProcessorFactory);
+ // ::testing::Mock::VerifyAndClear(setup.MockDirectReadProcessor);
+}
+
+TEST_F(DirectReadWithControlSession, StopPartitionSession) {
+ auto const startPartitionSessionRequest = TStartPartitionSessionRequest{
+ .PartitionId = 1,
+ .PartitionSessionId = 2,
+ .NodeId = 3,
+ .Generation = 4,
+ };
+
+ TDirectReadSessionImplTestSetup setup;
+ setup.ReadSessionSettings.Topics_[0].AppendPartitionIds(startPartitionSessionRequest.PartitionId);
+
+ {
+ {
+ ::testing::InSequence seq;
+
+ EXPECT_CALL(*setup.MockReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&]() {
+ setup.MockReadProcessorFactory->CreateProcessor(setup.MockReadProcessor);
+ });
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnInitRequest(_))
+ .WillOnce(Invoke([&](const Ydb::Topic::StreamReadMessage::InitRequest& req) {
+ ASSERT_TRUE(req.direct_read());
+ ASSERT_EQ(req.topics_read_settings_size(), 1);
+ ASSERT_EQ(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
+ ASSERT_EQ(req.topics_read_settings(0).partition_ids_size(), 1);
+ ASSERT_EQ(req.topics_read_settings(0).partition_ids(0), startPartitionSessionRequest.PartitionId);
+ }));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnStartPartitionSessionResponse(_))
+ .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamReadMessage::StartPartitionSessionResponse& resp) {
+ ASSERT_EQ(static_cast<std::uint64_t>(resp.partition_session_id()), startPartitionSessionRequest.PartitionSessionId);
+ }));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnDirectReadAck(_))
+ .Times(4);
+ }
+
+ // There are two sequences, because OnCreateProcessor from the second sequence may be called
+ // before OnStartPartitionSessionResponse from the first sequence.
+
+ {
+ ::testing::InSequence sequence;
+
+ EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&]() {
+ setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor);
+ });
+
+ EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
+ .WillOnce(Invoke([&setup](const Ydb::Topic::StreamDirectReadMessage::InitRequest& req) {
+ ASSERT_EQ(req.session_id(), SERVER_SESSION_ID);
+ ASSERT_EQ(static_cast<std::size_t>(req.topics_read_settings_size()), setup.ReadSessionSettings.Topics_.size());
+ ASSERT_EQ(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
+ ASSERT_EQ(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
+ }));
+
+ EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest& request) {
+ ASSERT_EQ(static_cast<std::uint64_t>(request.partition_session_id()), startPartitionSessionRequest.PartitionSessionId);
+ ASSERT_EQ(request.generation(), startPartitionSessionRequest.Generation);
+ }));
+
+ // Expect OnReadRequest in case it is called before the test ends.
+ // TODO(qyryq) Fix number, not 10.
+ EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_)).Times(AtMost(10));
+ }
+ }
+
+ setup.GetControlSession()->Start();
+ {
+ auto r = TMockReadSessionProcessor::TServerReadInfo();
+ setup.AddControlResponse(r.InitResponse(SERVER_SESSION_ID));
+ }
+
+ {
+ auto r = TMockReadSessionProcessor::TServerReadInfo();
+ setup.AddControlResponse(r.StartPartitionSessionRequest(startPartitionSessionRequest));
+ }
+
+ {
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ ASSERT_TRUE(event);
+ ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TStartPartitionSessionEvent);
+ std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event).Confirm();
+ }
+
+ {
+ auto r = TMockDirectReadSessionProcessor::TServerReadInfo();
+ setup.AddDirectReadResponse(r.InitResponse());
+ }
+
+ {
+ auto r = TMockDirectReadSessionProcessor::TServerReadInfo();
+ setup.AddDirectReadResponse(r.StartDirectReadPartitionSessionResponse(startPartitionSessionRequest.PartitionSessionId));
+ }
+
+ std::int64_t offset = 0, i = 0;
+
+ // Verify that the session receives data sent to direct read session:
+ for (std::size_t directReadId = 1; directReadId < 5; ++directReadId) {
+ auto resp = TMockDirectReadSessionProcessor::TServerReadInfo();
+ resp.PartitionData(startPartitionSessionRequest.PartitionSessionId, directReadId)
+ // TODO(qyryq) Test with compression!
+ // .Batch("producer-id-1", Ydb::Topic::Codec::CODEC_ZSTD);
+ .Batch("producer-id-1", Ydb::Topic::Codec::CODEC_RAW);
+
+ resp.Message(offset, TStringBuilder() << "message-" << offset, offset);
+ ++offset;
+ resp.Message(offset, TStringBuilder() << "message-" << offset, offset);
+ ++offset;
+ setup.AddDirectReadResponse(resp);
+
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ ASSERT_TRUE(event);
+ ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
+ auto& e = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
+ i += e.GetMessagesCount();
+ e.Commit();
+ }
+
+ while (i < offset) {
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ ASSERT_TRUE(event);
+ ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
+ auto& e = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
+ i += e.GetMessagesCount();
+ }
+
+ {
+ auto r = TMockReadSessionProcessor::TServerReadInfo();
+ setup.AddControlResponse(
+ r.StopPartitionSession({
+ .PartitionSessionId = 2,
+ .Graceful = false,
+ .CommittedOffset = offset,
+ }));
+ }
+
+ // TODO(qyryq) Send some bogus events from server, the client should ignore them.
+
+ {
+ // Verify that the session receives TStopPartitionSessionEvent after data was received:
+
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ ASSERT_TRUE(event);
+ ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TPartitionSessionClosedEvent);
+ // auto e = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&*event);
+ // UNIT_ASSERT(!e.Graceful);
+ // UNIT_ASSERT(e.CommittedOffset == offset);
+ }
+
+ setup.MockReadProcessorFactory->Wait();
+ setup.MockDirectReadProcessorFactory->Wait();
+
+ setup.AssertNoEvents();
+}
+
+TEST_F(DirectReadWithControlSession, EmptyDirectReadResponse) {
+ // Sometimes the server might send a DirectReadResponse with no data, but with bytes_size value > 0.
+ // It can happen, if the server tried to send DirectReadResponse, but did not succeed,
+ // and in the meantime the messages that should had been sent have been rotated by retention period,
+ // and do not exist anymore. To keep ReadSizeBudget bookkeeping correct, the server still sends the an DirectReadResponse,
+ // and SDK should process it correctly: basically it should immediately send a ReadRequest(bytes_size=DirectReadResponse.bytes_size).
+
+ auto const startPartitionSessionRequest = TStartPartitionSessionRequest{
+ .PartitionId = 1,
+ .PartitionSessionId = 2,
+ .NodeId = 3,
+ .Generation = 4,
+ };
+
+ std::int64_t bytesSize = 12345;
+
+ TDirectReadSessionImplTestSetup setup;
+ setup.ReadSessionSettings.Topics_[0].AppendPartitionIds(startPartitionSessionRequest.PartitionId);
+
+ {
+ {
+ ::testing::InSequence seq;
+
+ EXPECT_CALL(*setup.MockReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&]() {
+ setup.MockReadProcessorFactory->CreateProcessor(setup.MockReadProcessor);
+ });
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnInitRequest(_))
+ .WillOnce(Invoke([&](const Ydb::Topic::StreamReadMessage::InitRequest& req) {
+ ASSERT_TRUE(req.direct_read());
+ ASSERT_EQ(req.topics_read_settings_size(), 1);
+ ASSERT_EQ(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
+ ASSERT_EQ(req.topics_read_settings(0).partition_ids_size(), 1);
+ ASSERT_EQ(req.topics_read_settings(0).partition_ids(0), startPartitionSessionRequest.PartitionId);
+ }));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnStartPartitionSessionResponse(_))
+ .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamReadMessage::StartPartitionSessionResponse& resp) {
+ ASSERT_EQ(static_cast<std::uint64_t>(resp.partition_session_id()), startPartitionSessionRequest.PartitionSessionId);
+ }));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnDirectReadAck(_))
+ .Times(1);
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_))
+ .WillOnce(Invoke([&](const Ydb::Topic::StreamReadMessage::ReadRequest& req) {
+ ASSERT_EQ(req.bytes_size(), bytesSize);
+ }));
+ }
+
+ // There are two sequences, because OnCreateProcessor from the second sequence may be called
+ // before OnStartPartitionSessionResponse from the first sequence.
+
+ {
+ ::testing::InSequence sequence;
+
+ EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&]() {
+ setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor);
+ });
+
+ EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
+ .WillOnce(Invoke([&setup](const Ydb::Topic::StreamDirectReadMessage::InitRequest& req) {
+ ASSERT_EQ(req.session_id(), SERVER_SESSION_ID);
+ ASSERT_EQ(static_cast<std::size_t>(req.topics_read_settings_size()), setup.ReadSessionSettings.Topics_.size());
+ ASSERT_EQ(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
+ ASSERT_EQ(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
+ }));
+
+ EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest& request) {
+ ASSERT_EQ(static_cast<std::uint64_t>(request.partition_session_id()), startPartitionSessionRequest.PartitionSessionId);
+ ASSERT_EQ(request.generation(), startPartitionSessionRequest.Generation);
+ }));
+ }
+ }
+
+ setup.GetControlSession()->Start();
+ {
+ auto r = TMockReadSessionProcessor::TServerReadInfo();
+ setup.AddControlResponse(r.InitResponse(SERVER_SESSION_ID));
+ }
+
+ {
+ auto r = TMockReadSessionProcessor::TServerReadInfo();
+ setup.AddControlResponse(r.StartPartitionSessionRequest(startPartitionSessionRequest));
+ }
+
+ {
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ ASSERT_TRUE(event);
+ ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TStartPartitionSessionEvent);
+ std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event).Confirm();
+ }
+
+ {
+ auto r = TMockDirectReadSessionProcessor::TServerReadInfo();
+ setup.AddDirectReadResponse(r.InitResponse());
+ }
+
+ {
+ auto r = TMockDirectReadSessionProcessor::TServerReadInfo();
+ setup.AddDirectReadResponse(r.StartDirectReadPartitionSessionResponse(startPartitionSessionRequest.PartitionSessionId));
+ }
+
+ std::int64_t directReadId = 1;
+
+ auto resp = TMockDirectReadSessionProcessor::TServerReadInfo();
+ resp.PartitionData(startPartitionSessionRequest.PartitionSessionId, directReadId, bytesSize);
+ setup.AddDirectReadResponse(resp);
+
+ setup.MockReadProcessorFactory->Wait();
+ setup.MockDirectReadProcessorFactory->Wait();
+
+ setup.AssertNoEvents();
+}
+
+/*
+This suite tests TDirectReadSession in isolation, without control session.
+*/
+
+class DirectReadSession : public TTopicTestFixture {};
+
+TEST_F(DirectReadSession, InitAndStartPartitionSession) {
+ /*
+ Create DirectRead processor, send InitRequest, StartDirectReadPartitionSessionRequest.
+ */
+
+ TDirectReadSessionImplTestSetup setup;
+
+ auto gotStart = NThreading::NewPromise();
+
+ TPartitionSessionId partitionSessionId = 1;
+
+ class TControlCallbacks : public IDirectReadSessionControlCallbacks {};
+ auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>());
+
+ {
+ ::testing::InSequence seq;
+
+ EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
+
+ EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
+ .WillOnce(Invoke([&](const Ydb::Topic::StreamDirectReadMessage::InitRequest& req) {
+ ASSERT_EQ(req.session_id(), SERVER_SESSION_ID);
+ ASSERT_EQ(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
+ }));
+
+ EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ .WillOnce(Invoke([&](const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest& req) {
+ ASSERT_EQ(req.partition_session_id(), static_cast<std::int64_t>(partitionSessionId));
+ gotStart.SetValue();
+ }));
+ }
+
+ session->Start();
+
+ setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ .InitResponse());
+
+ session->AddPartitionSession({ .PartitionSessionId = partitionSessionId, .Location = {2, 3} });
+
+ gotStart.GetFuture().Wait();
+}
+
+TEST_F(DirectReadSession, NoRetryDirectReadSession) {
+ /*
+ If the session cannot establish a connection, and the retry policy does not allow to make another retry,
+ the session should be aborted and the client should receive TSessionClosedEvent.
+ */
+
+ TDirectReadSessionImplTestSetup setup;
+ setup.ReadSessionSettings.RetryPolicy(NYdb::NTopic::IRetryPolicy::GetNoRetryPolicy());
+
+ auto gotClosedEvent = NThreading::NewPromise();
+
+ EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&]() { setup.MockDirectReadProcessorFactory->FailCreation(); });
+
+ class TControlCallbacks : public IDirectReadSessionControlCallbacks {
+ public:
+ TControlCallbacks(NThreading::TPromise<void>& gotClosedEvent) : GotClosedEvent(gotClosedEvent) {}
+ void AbortSession(TSessionClosedEvent&&) override { GotClosedEvent.SetValue(); }
+ NThreading::TPromise<void>& GotClosedEvent;
+ };
+
+ auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(gotClosedEvent));
+
+ session->Start();
+ setup.MockDirectReadProcessorFactory->Wait();
+ gotClosedEvent.GetFuture().Wait();
+}
+
+TEST_F(DirectReadSession, RetryDirectReadSession) {
+ /*
+ If the retry policy allows retries, keep trying to establish connection.
+ */
+ TDirectReadSessionImplTestSetup setup;
+ std::size_t nRetries = 2;
+ setup.ReadSessionSettings.RetryPolicy(NYdb::NTopic::IRetryPolicy::GetFixedIntervalPolicy(
+ TDuration::MilliSeconds(1), TDuration::MilliSeconds(1), nRetries));
+
+ auto gotClosedEvent = NThreading::NewPromise();
+
+ ON_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ .WillByDefault([&]() { setup.MockDirectReadProcessorFactory->FailCreation(); });
+
+ EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ .Times(1 + nRetries); // First call + N retries.
+
+ class TControlCallbacks : public IDirectReadSessionControlCallbacks {
+ public:
+ TControlCallbacks(NThreading::TPromise<void>& gotClosedEvent) : GotClosedEvent(gotClosedEvent) {}
+ void AbortSession(TSessionClosedEvent&&) override { GotClosedEvent.SetValue(); }
+ NThreading::TPromise<void>& GotClosedEvent;
+ };
+
+ auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(gotClosedEvent));
+ session->Start();
+ setup.MockDirectReadProcessorFactory->Wait();
+
+ gotClosedEvent.GetFuture().Wait();
+}
+
+ // Y_UNIT_TEST(NoRetryPartitionSession) {
+ // /*
+ // If we get a StopDirectReadPartitionSession event, and the retry policy does not allow to send another Start-request,
+ // the session should be aborted and the client should receive TSessionClosedEvent.
+ // */
+ // TDirectReadSessionImplTestSetup setup;
+ // setup.ReadSessionSettings.RetryPolicy(NYdb::NTopic::IRetryPolicy::GetNoRetryPolicy());
+
+ // auto gotClosedEvent = NThreading::NewPromise();
+
+ // {
+ // ::testing::InSequence seq;
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_));
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_));
+ // }
+
+ // class TControlCallbacks : public IDirectReadSessionControlCallbacks {
+ // public:
+ // TControlCallbacks(NThreading::TPromise<void>& gotClosedEvent) : GotClosedEvent(gotClosedEvent) {}
+ // void AbortSession(TSessionClosedEvent&&) override { GotClosedEvent.SetValue(); }
+ // NThreading::TPromise<void>& GotClosedEvent;
+ // };
+
+ // auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(gotClosedEvent));
+ // session->Start();
+ // setup.MockDirectReadProcessorFactory->Wait();
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .InitResponse());
+
+ // session->AddPartitionSession({ .PartitionSessionId = 1, .Location = {2, 3} });
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, TPartitionSessionId(1)));
+
+ // gotClosedEvent.GetFuture().Wait();
+ // }
+
+ // Y_UNIT_TEST(RetryPartitionSession) {
+ // /*
+ // Keep sending Start-requests until the retry policy denies next retry.
+ // */
+ // TDirectReadSessionImplTestSetup setup;
+ // size_t nRetries = 2;
+ // setup.ReadSessionSettings.RetryPolicy(NYdb::NTopic::IRetryPolicy::GetFixedIntervalPolicy(
+ // TDuration::MilliSeconds(1), TDuration::MilliSeconds(1), nRetries));
+
+ // auto gotClosedEvent = NThreading::NewPromise();
+
+ // {
+ // ::testing::InSequence seq;
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_));
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ // .Times(1 + nRetries);
+ // }
+
+ // class TControlCallbacks : public IDirectReadSessionControlCallbacks {
+ // public:
+ // TControlCallbacks(NThreading::TPromise<void>& gotClosedEvent) : GotClosedEvent(gotClosedEvent) {}
+ // void AbortSession(TSessionClosedEvent&&) override { GotClosedEvent.SetValue(); }
+ // void ScheduleCallback(TDuration, std::function<void()> cb, TDeferredActions<false>& deferred) override {
+ // deferred.DeferCallback(cb);
+ // }
+ // NThreading::TPromise<void>& GotClosedEvent;
+ // };
+
+ // auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(gotClosedEvent));
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .InitResponse());
+
+ // session->Start();
+ // setup.MockDirectReadProcessorFactory->Wait();
+
+ // TPartitionSessionId partitionSessionId = 1;
+
+ // session->AddPartitionSession({ .PartitionSessionId = partitionSessionId, .Location = {2, 3} });
+
+ // for (size_t i = 0; i < 1 + nRetries; ++i) {
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, partitionSessionId));
+ // }
+
+ // gotClosedEvent.GetFuture().Wait();
+ // }
+
+ // Y_UNIT_TEST(ResetRetryStateOnSuccess) {
+ // /*
+ // Test that the client creates a new retry state on the first error after a successful response.
+
+ // With the default retry policy (exponential backoff), retry delays grow after each unsuccessful request.
+ // After the first successful request retry state should be reset, so the delay after another unsuccessful request will be small.
+
+ // E.g. if the exponential backoff policy is used, and minDelay is 1ms, and scaleFactor is 1000, then the following should happen:
+
+ // client -> server: InitRequest
+ // client <-- server: InitResponse
+ // client -> server: StartDirectReadPartitionSessionRequest
+ // client <- server: StopDirectReadPartitionSession(OVERLOADED)
+ // note over client: Wait 1 ms
+ // client -> server: StartDirectReadPartitionSessionRequest
+ // client <-- server: StartDirectReadPartitionSessionResponse
+ // note over client: Reset RetryState
+ // client <- server: StopDirectReadPartitionSession(OVERLOADED)
+ // note over client: Wait 1 ms, not 1 second
+ // client -> server: StartDirectReadPartitionSessionRequest
+ // */
+
+ // TDirectReadSessionImplTestSetup setup;
+ // setup.ReadSessionSettings.RetryPolicy(setup.MockRetryPolicy);
+
+ // auto gotFinalStart = NThreading::NewPromise();
+ // TPartitionSessionId partitionSessionId = 1;
+
+ // {
+ // ::testing::InSequence sequence;
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_));
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_));
+
+ // // The client receives StopDirectReadPartitionSession, create TDirectReadSession::PartitionSessions[i].RetryState
+ // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
+ // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_));
+
+ // // The client receives StartDirectReadPartitionSessionResponse, resets retry state,
+ // // then receives StopDirectReadPartitionSession and has to create a new retry state.
+ // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
+ // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ // .WillOnce([&]() { gotFinalStart.SetValue(); });
+ // }
+
+ // class TControlCallbacks : public IDirectReadSessionControlCallbacks {
+ // public:
+ // void ScheduleCallback(TDuration, std::function<void()> cb, TDeferredActions<false>& deferred) override {
+ // deferred.DeferCallback(cb);
+ // }
+ // };
+
+ // auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>());
+
+ // session->Start();
+ // setup.MockDirectReadProcessorFactory->Wait();
+
+ // session->AddPartitionSession({ .PartitionSessionId = partitionSessionId, .Location = {2, 3} });
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .InitResponse());
+
+ // setup.MockRetryPolicy->Delay = TDuration::MilliSeconds(1);
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, partitionSessionId));
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .StartDirectReadPartitionSessionResponse(partitionSessionId));
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, partitionSessionId));
+
+ // gotFinalStart.GetFuture().Wait();
+ // }
+
+ // Y_UNIT_TEST(PartitionSessionRetainsRetryStateOnReconnects) {
+ // /*
+ // We need to retain retry states of separate partition sessions
+ // even after reestablishing the connection to a node.
+
+ // E.g. partition session receives StopDirectReadPartitionSession
+ // and we need to send StartDirectReadPartitionSessionRequest in 5 minutes due to the retry policy.
+
+ // But in the meantime, the session loses connection to the server and reconnects within several seconds.
+
+ // We must not send that StartDirectReadPartitionSessionRequest right away, but wait ~5 minutes.
+
+ // client -> server: InitRequest
+ // client <-- server: InitResponse
+ // client -> server: StartDirectReadPartitionSessionRequest
+ // client <- server: StopDirectReadPartitionSession(OVERLOADED)
+ // note over client: Wait N seconds before sending Start again
+ // ... Connection lost, client reconnects to the server ...
+ // client -> server: InitRequest
+ // client <-- server: InitResponse
+ // note over client: Still has to wait ~N seconds
+ // client -> server: StartDirectReadPartitionSessionRequest
+ // */
+
+ // TDirectReadSessionImplTestSetup setup;
+ // setup.ReadSessionSettings.RetryPolicy(setup.MockRetryPolicy);
+
+ // auto gotFinalStart = NThreading::NewPromise();
+ // auto gotInitRequest = NThreading::NewPromise();
+ // auto calledRead = NThreading::NewPromise();
+ // TPartitionSessionId partitionSessionId = 1;
+ // auto secondProcessor = MakeIntrusive<TMockDirectReadSessionProcessor>();
+ // auto delay = TDuration::Seconds(300);
+
+ // {
+ // ::testing::InSequence sequence;
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(1))
+ // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_));
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_));
+
+ // // The client receives StopDirectReadPartitionSession, create TDirectReadSession::PartitionSessions[i].RetryState
+ // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
+ // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
+
+ // // The client loses connection, create TDirectReadSession.RetryState
+ // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
+ // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
+
+ // // The connection is lost at this point, the client tries to reconnect.
+ // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(2))
+ // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(secondProcessor); });
+
+ // EXPECT_CALL(*secondProcessor, OnInitRequest(_))
+ // .WillOnce([&]() { gotInitRequest.SetValue(); });
+
+ // // The client waits `delay` seconds before sending the StartDirectReadPartitionSessionRequest.
+
+ // EXPECT_CALL(*secondProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ // .WillOnce([&]() { gotFinalStart.SetValue(); });
+ // }
+
+ // std::function<void()> callback;
+
+ // class TControlCallbacks : public IDirectReadSessionControlCallbacks {
+ // public:
+ // TControlCallbacks(std::function<void()>& callback) : Callback(callback) {}
+ // void ScheduleCallback(TDuration, std::function<void()> cb, TDeferredActions<false>&) override {
+ // Callback = cb;
+ // }
+ // std::function<void()>& Callback;
+ // };
+
+ // auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(callback));
+
+ // session->Start();
+ // setup.MockDirectReadProcessorFactory->Wait();
+
+ // session->AddPartitionSession({ .PartitionSessionId = partitionSessionId, .Location = {2, 3} });
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .InitResponse());
+
+ // setup.MockRetryPolicy->Delay = delay;
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, partitionSessionId));
+
+ // // Besides logs, these durations don't really affect anything in tests.
+ // setup.MockRetryPolicy->Delay = TDuration::Seconds(1);
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .Failure());
+
+ // gotInitRequest.GetFuture().Wait();
+ // secondProcessor->AddServerResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .InitResponse());
+
+ // // Ensure that the callback is called after the direct session got InitResponse.
+ // setup.WaitForWorkingDirectReadSession();
+
+ // callback();
+
+ // gotFinalStart.GetFuture().Wait();
+
+ // secondProcessor->Wait();
+ // secondProcessor->Validate();
+ // }
+
+ // Y_UNIT_TEST(RetryWithoutConnectionResetsPartitionSession) {
+ // /*
+ // If there are pending StartDirectReadPartitionSession requests that were delayed due to previous errors,
+ // and the entire session then loses connection for an extended period of time (greater than the callback delays),
+ // the following process should be followed:
+
+ // When the session finally reconnects, the pending Start requests should be sent immediately.
+ // This is because their callbacks have already been fired, but the requests were not sent due to the lack of connection.
+
+ // client -> server: InitRequest
+ // client <-- server: InitResponse
+ // client -> server: StartDirectReadPartitionSessionRequest
+ // client <- server: StopDirectReadPartitionSession(OVERLOADED)
+ // note over client: Wait 1 second before sending Start again
+ // ... Connection lost ...
+ // note over client: SendStart... callback fires, resets state
+ // ... Connection reestablished in 1 minute ...
+ // client -> server: InitRequest
+ // client <-- server: InitResponse
+ // note over client: Send the Start request immediately
+ // client -> server: StartDirectReadPartitionSessionRequest
+ // */
+
+ // TDirectReadSessionImplTestSetup setup;
+ // setup.ReadSessionSettings.RetryPolicy(setup.MockRetryPolicy);
+
+ // auto gotFinalStart = NThreading::NewPromise();
+ // auto calledRead = NThreading::NewPromise();
+ // TPartitionSessionId partitionSessionId = 1;
+ // auto secondProcessor = MakeIntrusive<TMockDirectReadSessionProcessor>();
+ // auto delay = TDuration::MilliSeconds(1);
+
+ // {
+ // ::testing::InSequence sequence;
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(1))
+ // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
+ // .Times(1);
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ // .Times(1);
+
+ // // The client receives StopDirectReadPartitionSession, create TDirectReadSession::PartitionSessions[i].RetryState
+ // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
+ // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
+
+ // // The client loses connection, create TDirectReadSession.RetryState
+ // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
+ // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
+
+ // // The connection is lost at this point, the client tries to reconnect.
+ // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(2))
+ // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(secondProcessor); });
+
+ // EXPECT_CALL(*secondProcessor, OnInitRequest(_))
+ // .Times(1);
+
+ // EXPECT_CALL(*secondProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ // .WillOnce([&]() { gotFinalStart.SetValue(); });
+ // }
+
+ // std::function<void()> callback;
+
+ // class TControlCallbacks : public IDirectReadSessionControlCallbacks {
+ // public:
+ // TControlCallbacks(TDuration delay, std::function<void()>& callback) : Delay(delay), Callback(callback) {}
+ // void ScheduleCallback(TDuration d, std::function<void()> cb, TDeferredActions<false>&) override {
+ // UNIT_ASSERT_EQUAL(Delay, d);
+ // Callback = cb;
+ // }
+ // TDuration Delay;
+ // std::function<void()>& Callback;
+ // };
+
+ // auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(delay, callback));
+
+ // session->Start();
+ // setup.MockDirectReadProcessorFactory->Wait();
+
+ // session->AddPartitionSession({ .PartitionSessionId = partitionSessionId, .Location = {2, 3} });
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .InitResponse());
+
+ // setup.MockRetryPolicy->Delay = delay;
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, partitionSessionId));
+
+ // // Besides logs, these durations don't really affect anything in tests.
+ // setup.MockRetryPolicy->Delay = TDuration::Seconds(10);
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .Failure());
+
+ // // Delayed callback is fired, but there is no connection, so the partition session state changes to IDLE,
+ // // and the request should be sent after receiving InitResponse.
+ // callback();
+
+ // secondProcessor->AddServerResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .InitResponse());
+
+ // gotFinalStart.GetFuture().Wait();
+
+ // secondProcessor->Wait();
+ // secondProcessor->Validate();
+ // }
+
+// } Y_UNIT_TEST_SUITE_F(DirectReadSession)
+
+/*
+This suite tests direct read session interaction with server.
+
+It complements tests from basic_usage_ut.cpp etc, as we run them with direct read disabled/enabled.
+*/
+
+class DirectReadWithServer : public TTopicTestFixture {};
+
+TEST_F(DirectReadWithServer, Devslice) {
+ GTEST_SKIP() << "Skipping devslice test";
+
+ auto driverConfig = NYdb::TDriverConfig()
+ .SetEndpoint(std::getenv("YDB_ENDPOINT"))
+ .SetDatabase("/Root/testdb")
+ .SetLog(std::unique_ptr<TLogBackend>(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release()))
+ .SetAuthToken(std::getenv("YDB_TOKEN"));
+
+ auto driver = NYdb::TDriver(driverConfig);
+
+ auto clientSettings = TTopicClientSettings();
+ auto client = TTopicClient(driver, clientSettings);
+
+ auto settings = TReadSessionSettings()
+ .AppendTopics(TTopicReadSettings("t1").AppendPartitionIds({0}))
+ .ConsumerName("c1")
+ // .DirectRead(true)
+ ;
+
+ settings.EventHandlers_
+ .StartPartitionSessionHandler([](TReadSessionEvent::TStartPartitionSessionEvent& e) {
+ e.Confirm();
+ })
+ .StopPartitionSessionHandler([](TReadSessionEvent::TStopPartitionSessionEvent& e) {
+ e.Confirm();
+ })
+ .DataReceivedHandler([](TReadSessionEvent::TDataReceivedEvent& e) {
+ for (std::size_t i = 0; i < e.GetMessages().size(); ++i) {
+ auto& m = e.GetMessages()[i];
+ std::cerr << "Message: " << m.GetData() << std::endl;
+ m.Commit();
+ }
+ });
+
+ auto reader = client.CreateReadSession(settings);
+
+ std::this_thread::sleep_for(std::chrono::seconds(1000));
+
+ reader->Close();
+}
+
+} // namespace NYdb::NTopic::NTests
diff --git a/ydb/public/sdk/cpp/tests/integration/topic/local_partition.cpp b/ydb/public/sdk/cpp/tests/integration/topic/local_partition.cpp
new file mode 100644
index 00000000000..0bd50786e90
--- /dev/null
+++ b/ydb/public/sdk/cpp/tests/integration/topic/local_partition.cpp
@@ -0,0 +1,530 @@
+#include "setup/fixture.h"
+#include "utils/trace.h"
+
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
+
+#include <ydb/public/api/grpc/ydb_discovery_v1.grpc.pb.h>
+
+#include <library/cpp/logger/stream.h>
+
+#include <grpc++/grpc++.h>
+
+#include <format>
+#include <thread>
+
+namespace NYdb::inline Dev::NTopic::NTests {
+
+struct TYdbPqTestRetryState : IRetryPolicy::IRetryState {
+ TYdbPqTestRetryState(std::function<void ()> retryCallback, std::function<void ()> destroyCallback, const TDuration& delay)
+ : RetryDone(retryCallback)
+ , DestroyDone(destroyCallback)
+ , Delay(delay)
+ {}
+
+ TMaybe<TDuration> GetNextRetryDelay(NYdb::EStatus) override {
+ std::cerr << "Test retry state: get retry delay" << std::endl;
+ RetryDone();
+ return Delay;
+ }
+
+ std::function<void ()> RetryDone;
+ std::function<void ()> DestroyDone;
+ TDuration Delay;
+
+ ~TYdbPqTestRetryState() {
+ DestroyDone();
+ }
+};
+
+struct TYdbPqNoRetryState : IRetryPolicy::IRetryState {
+ std::atomic<bool> DelayCalled = false;
+ TMaybe<TDuration> GetNextRetryDelay(NYdb::EStatus) override {
+ auto res = DelayCalled.exchange(true);
+ EXPECT_FALSE(res);
+ return {};
+ }
+};
+
+struct TYdbPqTestRetryPolicy : IRetryPolicy {
+ TYdbPqTestRetryPolicy(const TDuration& delay = TDuration::MilliSeconds(2000))
+ : Delay(delay)
+ {
+ std::cerr << "====TYdbPqTestRetryPolicy()\n";
+ }
+
+ IRetryState::TPtr CreateRetryState() const override {
+ std::cerr << "====CreateRetryState\n";
+ if (OnFatalBreakDown.exchange(false)) {
+ return std::make_unique<TYdbPqNoRetryState>();
+ }
+ if (Initialized_.load())
+ {
+ std::cerr << "====CreateRetryState Initialized\n";
+ auto res = OnBreakDown.exchange(false);
+ EXPECT_TRUE(res);
+ for (size_t i = 0; i < 100; i++) {
+ if (CurrentRetries.load() == 0) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ EXPECT_EQ(CurrentRetries.load(), 0u);
+ }
+ auto retryCb = [this]() mutable {this->RetryDone();};
+ auto destroyCb = [this]() mutable {this->StateDestroyed();};
+ return std::make_unique<TYdbPqTestRetryState>(retryCb, destroyCb, Delay);
+ }
+
+ void RetryDone() const {
+ CurrentRetries.fetch_add(1);
+ auto expected = RetriesExpected.load();
+ if (expected > 0 && CurrentRetries.load() >= expected) {
+ std::lock_guard lock(Lock);
+ {
+ RetryPromise.SetValue();
+ }
+ RetriesExpected.store(0);
+ }
+ }
+ void StateDestroyed() const {
+ std::cerr << "====StateDestroyed\n";
+ auto expected = RepairExpected.exchange(false);
+ if (expected) {
+ std::lock_guard lock(Lock);
+ RepairPromise.SetValue();
+ }
+ }
+ void ExpectBreakDown() {
+ // Either TYdbPqTestRetryPolicy() or Initialize() should be called beforehand in order to set OnBreakDown=0
+ std::cerr << "====ExpectBreakDown\n";
+ for (std::size_t i = 0; i < 100; i++) {
+ if (!OnBreakDown.load()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ EXPECT_FALSE(OnBreakDown.load());
+ CurrentRetries.store(0);
+ OnBreakDown.store(true);
+ }
+ void ExpectFatalBreakDown() {
+ OnFatalBreakDown.store(true);
+ }
+
+ void WaitForRetries(std::uint64_t retryCount, NThreading::TPromise<void>& promise) {
+ RetriesExpected.store(retryCount);
+ std::lock_guard lock(Lock);
+ RetryPromise = promise;
+ }
+
+ void WaitForRetriesSync(std::uint64_t retryCount) {
+ NThreading::TPromise<void> retriesPromise = NThreading::NewPromise();
+ auto retriesFuture = retriesPromise.GetFuture();
+ WaitForRetries(retryCount, retriesPromise);
+ retriesFuture.Wait();
+ }
+
+ void WaitForRepair(NThreading::TPromise<void>& promise) {
+ RepairExpected.store(true);
+ std::lock_guard lock(Lock);
+ RepairPromise = promise;
+ }
+
+ void WaitForRepairSync() {
+ NThreading::TPromise<void> repairPromise = NThreading::NewPromise();
+ auto repairFuture = repairPromise.GetFuture();
+ WaitForRepair(repairPromise);
+ repairFuture.Wait();
+ }
+
+ void Initialize() {
+ Initialized_.store(true);
+ CurrentRetries.store(0);
+ }
+
+private:
+ TDuration Delay;
+ mutable std::atomic<std::uint64_t> CurrentRetries = 0;
+ mutable std::atomic<bool> Initialized_ = false;
+ mutable std::atomic<bool> OnBreakDown = false;
+ mutable std::atomic<bool> OnFatalBreakDown = false;
+ mutable NThreading::TPromise<void> RetryPromise;
+ mutable NThreading::TPromise<void> RepairPromise;
+ mutable std::atomic<std::uint64_t> RetriesExpected = 0;
+ mutable std::atomic<bool> RepairExpected = false;
+ mutable std::mutex Lock;
+};
+
+class LocalPartition : public TTopicTestFixture {
+protected:
+ TDriverConfig CreateConfig(const std::string& discoveryAddr) {
+ NYdb::TDriverConfig config = MakeDriverConfig();
+ config.SetEndpoint(discoveryAddr);
+ return config;
+ }
+
+ TWriteSessionSettings CreateWriteSessionSettings() {
+ return TWriteSessionSettings()
+ .Path(GetTopicPath())
+ .ProducerId("test-producer")
+ .PartitionId(0)
+ .DirectWriteToPartition(true);
+ }
+
+ TReadSessionSettings CreateReadSessionSettings() {
+ return TReadSessionSettings()
+ .ConsumerName("test-consumer")
+ .AppendTopics(GetTopicPath());
+ }
+
+ void WriteMessage(TTopicClient& client) {
+ std::cerr << "=== Write message" << std::endl;
+
+ auto writeSession = client.CreateSimpleBlockingWriteSession(CreateWriteSessionSettings());
+ EXPECT_TRUE(writeSession->Write("message"));
+ writeSession->Close();
+ }
+
+ void ReadMessage(TTopicClient& client, ui64 expectedCommitedOffset = 1) {
+ std::cerr << "=== Read message" << std::endl;
+
+ auto readSession = client.CreateReadSession(CreateReadSessionSettings());
+
+ std::optional<TReadSessionEvent::TEvent> event = readSession->GetEvent(true);
+ EXPECT_TRUE(event);
+ auto startPartitionSession = std::get_if<TReadSessionEvent::TStartPartitionSessionEvent>(&event.value());
+ EXPECT_TRUE(startPartitionSession) << DebugString(*event);
+
+ startPartitionSession->Confirm();
+
+ event = readSession->GetEvent(true);
+ EXPECT_TRUE(event) << DebugString(*event);
+ auto dataReceived = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&event.value());
+ EXPECT_TRUE(dataReceived) << DebugString(*event);
+
+ dataReceived->Commit();
+
+ auto& messages = dataReceived->GetMessages();
+ EXPECT_EQ(messages.size(), 1u);
+ EXPECT_EQ(messages[0].GetData(), "message");
+
+ event = readSession->GetEvent(true);
+ EXPECT_TRUE(event) << DebugString(*event);
+ auto commitOffsetAck = std::get_if<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event.value());
+ EXPECT_TRUE(commitOffsetAck) << DebugString(*event);
+ EXPECT_EQ(commitOffsetAck->GetCommittedOffset(), expectedCommitedOffset);
+ }
+};
+
+class TMockDiscoveryService: public Ydb::Discovery::V1::DiscoveryService::Service {
+public:
+ TMockDiscoveryService() {
+ int discoveryPort = 0;
+
+ grpc::ServerBuilder builder;
+ builder.AddListeningPort("0.0.0.0:0", grpc::InsecureServerCredentials(), &discoveryPort);
+ builder.RegisterService(this);
+ Server = builder.BuildAndStart();
+
+ DiscoveryAddr = "0.0.0.0:" + std::to_string(discoveryPort);
+ std::cerr << "==== TMockDiscovery server started on port " << discoveryPort << std::endl;
+ }
+
+ void SetGoodEndpoints(TTopicTestFixture& fixture) {
+ std::lock_guard lock(Lock);
+ std::cerr << "==== TMockDiscovery set good endpoint nodes " << std::endl;
+
+ auto nodeIds = fixture.GetNodeIds();
+ SetEndpointsLocked(nodeIds[0], nodeIds.size(), fixture.GetPort());
+ }
+
+ // Call this method only after locking the Lock.
+ void SetEndpointsLocked(std::uint32_t firstNodeId, std::uint32_t nodeCount, std::uint16_t port) {
+ std::cerr << "==== TMockDiscovery add endpoints, firstNodeId " << firstNodeId << ", nodeCount " << nodeCount << ", port " << port << std::endl;
+
+ MockResults.clear_endpoints();
+ if (nodeCount > 0) {
+ Ydb::Discovery::EndpointInfo* endpoint = MockResults.add_endpoints();
+ endpoint->set_address(TStringBuilder() << "localhost");
+ endpoint->set_port(port);
+ endpoint->set_node_id(firstNodeId);
+ }
+ if (nodeCount > 1) {
+ Ydb::Discovery::EndpointInfo* endpoint = MockResults.add_endpoints();
+ endpoint->set_address(TStringBuilder() << "ip6-localhost"); // name should be different
+ endpoint->set_port(port);
+ endpoint->set_node_id(firstNodeId + 1);
+ }
+ if (nodeCount > 2) {
+ EXPECT_TRUE(false) << "Unsupported count of nodes";
+ }
+ }
+
+ void SetEndpoints(std::uint32_t firstNodeId, std::uint32_t nodeCount, std::uint16_t port) {
+ std::lock_guard lock(Lock);
+ SetEndpointsLocked(firstNodeId, nodeCount, port);
+ }
+
+ grpc::Status ListEndpoints(grpc::ServerContext* context, const Ydb::Discovery::ListEndpointsRequest* request, Ydb::Discovery::ListEndpointsResponse* response) override {
+ std::lock_guard lock(Lock);
+ EXPECT_TRUE(context);
+
+ if (Delay != std::chrono::milliseconds::zero()) {
+ std::cerr << "==== Delay " << Delay << " before ListEndpoints request" << std::endl;
+ auto start = std::chrono::steady_clock::now();
+ while (start + Delay < std::chrono::steady_clock::now()) {
+ if (context->IsCancelled()) {
+ return grpc::Status::CANCELLED;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ }
+
+ std::cerr << "==== ListEndpoints request: " << request->ShortDebugString() << std::endl;
+
+ auto* op = response->mutable_operation();
+ op->set_ready(true);
+ op->set_status(Ydb::StatusIds::SUCCESS);
+ op->mutable_result()->PackFrom(MockResults);
+
+ std::cerr << "==== ListEndpoints response: " << response->ShortDebugString() << std::endl;
+
+ return grpc::Status::OK;
+ }
+
+ std::string GetDiscoveryAddr() const {
+ return DiscoveryAddr;
+ }
+
+ void SetDelay(std::chrono::milliseconds delay) {
+ Delay = delay;
+ }
+
+private:
+ Ydb::Discovery::ListEndpointsResult MockResults;
+ std::string DiscoveryAddr;
+ std::unique_ptr<grpc::Server> Server;
+ std::mutex Lock;
+
+ std::chrono::milliseconds Delay = {};
+};
+
+TEST_F(LocalPartition, Basic) {
+ TMockDiscoveryService discovery;
+ discovery.SetGoodEndpoints(*this);
+ TDriver driver(CreateConfig(discovery.GetDiscoveryAddr()));
+ TTopicClient client(driver);
+
+ WriteMessage(client);
+ ReadMessage(client);
+}
+
+TEST_F(LocalPartition, DescribeBadPartition) {
+ TMockDiscoveryService discovery;
+
+ discovery.SetGoodEndpoints(*this);
+
+ auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>();
+
+ // Set non-existing partition
+ auto writeSettings = CreateWriteSessionSettings();
+ writeSettings.RetryPolicy(retryPolicy);
+ writeSettings.PartitionId(1);
+
+ retryPolicy->Initialize();
+ retryPolicy->ExpectBreakDown();
+
+ std::cerr << "=== Create write session\n";
+ TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr())));
+ auto writeSession = client.CreateWriteSession(writeSettings);
+
+ std::cerr << "=== Wait for retries\n";
+ retryPolicy->WaitForRetriesSync(3);
+
+ std::cerr << "=== Alter partition count\n";
+ TAlterTopicSettings alterSettings;
+ alterSettings.AlterPartitioningSettings(2, 2);
+ auto alterResult = client.AlterTopic(GetTopicPath(), alterSettings).GetValueSync();
+ ASSERT_EQ(alterResult.GetStatus(), NYdb::EStatus::SUCCESS) << alterResult.GetIssues().ToString();
+
+ std::cerr << "=== Wait for repair\n";
+ retryPolicy->WaitForRepairSync();
+
+ std::cerr << "=== Close write session\n";
+ writeSession->Close();
+}
+
+TEST_F(LocalPartition, DiscoveryServiceBadPort) {
+ TMockDiscoveryService discovery;
+ discovery.SetEndpoints(9999, 2, 0);
+
+ auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>();
+
+ auto writeSettings = CreateWriteSessionSettings();
+ writeSettings.RetryPolicy(retryPolicy);
+
+ retryPolicy->Initialize();
+ retryPolicy->ExpectBreakDown();
+
+ std::cerr << "=== Create write session\n";
+ TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr())));
+ auto writeSession = client.CreateWriteSession(writeSettings);
+
+ std::cerr << "=== Wait for retries\n";
+ retryPolicy->WaitForRetriesSync(3);
+
+ discovery.SetGoodEndpoints(*this);
+
+ std::cerr << "=== Wait for repair\n";
+ retryPolicy->WaitForRepairSync();
+
+ std::cerr << "=== Close write session\n";
+ writeSession->Close();
+}
+
+TEST_F(LocalPartition, DiscoveryServiceBadNodeId) {
+ TMockDiscoveryService discovery;
+ discovery.SetEndpoints(9999, GetNodeIds().size(), GetPort());
+
+ auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>();
+
+ auto writeSettings = CreateWriteSessionSettings();
+ writeSettings.RetryPolicy(retryPolicy);
+
+ retryPolicy->Initialize();
+ retryPolicy->ExpectBreakDown();
+
+ std::cerr << "=== Create write session\n";
+ TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr())));
+ auto writeSession = client.CreateWriteSession(writeSettings);
+
+ std::cerr << "=== Wait for retries\n";
+ retryPolicy->WaitForRetriesSync(3);
+
+ discovery.SetGoodEndpoints(*this);
+
+ std::cerr << "=== Wait for repair\n";
+ retryPolicy->WaitForRepairSync();
+
+ std::cerr << "=== Close write session\n";
+ writeSession->Close();
+}
+
+TEST_F(LocalPartition, DescribeHang) {
+ TMockDiscoveryService discovery;
+ discovery.SetGoodEndpoints(*this);
+
+ auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>(std::chrono::days(1));
+
+ auto writeSettings = CreateWriteSessionSettings();
+ writeSettings.RetryPolicy(retryPolicy);
+
+ retryPolicy->Initialize();
+ retryPolicy->ExpectBreakDown();
+
+ std::cerr << "=== Create write session\n";
+ TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr())));
+ auto writeSession = client.CreateWriteSession(writeSettings);
+
+ std::cerr << "=== Close write session\n";
+ writeSession->Close();
+}
+
+TEST_F(LocalPartition, DiscoveryHang) {
+ TMockDiscoveryService discovery;
+ discovery.SetGoodEndpoints(*this);
+ discovery.SetDelay(std::chrono::days(1));
+
+ std::cerr << "=== Create write session\n";
+ TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr())));
+ auto writeSession = client.CreateWriteSession(CreateWriteSessionSettings());
+
+ std::cerr << "=== Close write session\n";
+ writeSession->Close();
+}
+
+TEST_F(LocalPartition, WithoutPartition) {
+ // Direct write without partition: happy way.
+ TMockDiscoveryService discovery;
+ discovery.SetGoodEndpoints(*this);
+ auto driverConfig = CreateConfig(discovery.GetDiscoveryAddr());
+ auto* tracingBackend = new TTracingBackend();
+ std::vector<std::unique_ptr<TLogBackend>> underlyingBackends;
+ underlyingBackends.push_back(std::make_unique<TStreamLogBackend>(&Cerr));
+ underlyingBackends.push_back(std::unique_ptr<TLogBackend>(tracingBackend));
+ driverConfig.SetLog(std::make_unique<TCompositeLogBackend>(std::move(underlyingBackends)));
+ TDriver driver(driverConfig);
+ TTopicClient client(driver);
+ auto sessionSettings = TWriteSessionSettings()
+ .Path(GetTopicPath())
+ .ProducerId("test-message-group")
+ .MessageGroupId("test-message-group")
+ .DirectWriteToPartition(true);
+ auto writeSession = client.CreateSimpleBlockingWriteSession(sessionSettings);
+ ASSERT_TRUE(writeSession->Write("message"));
+ writeSession->Close();
+
+ auto node0Id = std::to_string(GetNodeIds()[0]);
+ TExpectedTrace expected{
+ "InitRequest !partition_id !partition_with_generation",
+ "InitResponse partition_id=0 session_id",
+ "DescribePartitionRequest partition_id=0",
+ std::format("DescribePartitionResponse partition_id=0 pl_generation=1 pl_node_id={}", node0Id),
+ std::format("PreferredPartitionLocation Generation=1 NodeId={}", node0Id),
+ "InitRequest !partition_id pwg_partition_id=0 pwg_generation=1",
+ "InitResponse partition_id=0 session_id",
+ };
+ auto const events = tracingBackend->GetEvents();
+ std::cerr << "==== Events count: " << events.size() << std::endl;
+ for (auto const& event : events) {
+ std::cerr << "==== Event: " << event.Event << std::endl;
+ for (auto const& [key, value] : event.KeyValues) {
+ std::cerr << "==== " << key << "=" << value << std::endl;
+ }
+ }
+ ASSERT_TRUE(expected.Matches(events));
+}
+
+TEST_F(LocalPartition, WithoutPartitionDeadNode) {
+ // This test emulates a situation, when InitResponse directs us to an inaccessible node.
+ TMockDiscoveryService discovery;
+ discovery.SetEndpoints(GetNodeIds()[0], 1, 0);
+ auto driverConfig = CreateConfig(discovery.GetDiscoveryAddr());
+ auto* tracingBackend = new TTracingBackend();
+ std::vector<std::unique_ptr<TLogBackend>> underlyingBackends;
+ underlyingBackends.push_back(std::make_unique<TStreamLogBackend>(&Cerr));
+ underlyingBackends.push_back(std::unique_ptr<TLogBackend>(tracingBackend));
+ driverConfig.SetLog(std::make_unique<TCompositeLogBackend>(std::move(underlyingBackends)));
+ TDriver driver(driverConfig);
+ TTopicClient client(driver);
+ auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>();
+ auto sessionSettings = TWriteSessionSettings()
+ .Path(GetTopicPath())
+ .MessageGroupId("test-message-group")
+ .DirectWriteToPartition(true)
+ .PartitionId(0)
+ .RetryPolicy(retryPolicy);
+ retryPolicy->Initialize();
+ retryPolicy->ExpectBreakDown();
+ auto writeSession = client.CreateSimpleBlockingWriteSession(sessionSettings);
+
+ retryPolicy->WaitForRetriesSync(1);
+ discovery.SetGoodEndpoints(*this);
+ retryPolicy->WaitForRepairSync();
+ ASSERT_TRUE(writeSession->Close());
+
+ auto node0Id = std::to_string(GetNodeIds()[0]);
+ TExpectedTrace expected{
+ "DescribePartitionRequest partition_id=0",
+ "Error status=TRANSPORT_UNAVAILABLE",
+ "DescribePartitionRequest partition_id=0",
+ std::format("DescribePartitionResponse partition_id=0 pl_generation=1 pl_node_id={}", node0Id),
+ std::format("PreferredPartitionLocation Generation=1 NodeId={}", node0Id),
+ "InitRequest !partition_id pwg_partition_id=0 pwg_generation=1",
+ "InitResponse partition_id=0",
+ };
+ auto const events = tracingBackend->GetEvents();
+ ASSERT_TRUE(expected.Matches(events));
+}
+
+}
diff --git a/ydb/public/sdk/cpp/tests/integration/topic/setup/fixture.cpp b/ydb/public/sdk/cpp/tests/integration/topic/setup/fixture.cpp
new file mode 100644
index 00000000000..bec648a7cfa
--- /dev/null
+++ b/ydb/public/sdk/cpp/tests/integration/topic/setup/fixture.cpp
@@ -0,0 +1,92 @@
+#include "fixture.h"
+
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/discovery/discovery.h>
+
+#include <util/system/execpath.h>
+
+namespace NYdb::inline Dev::NTopic::NTests {
+
+void TTopicTestFixture::SetUp() {
+ TTopicClient client(MakeDriver());
+
+ const testing::TestInfo* const testInfo = testing::UnitTest::GetInstance()->current_test_info();
+ std::filesystem::path execPath(std::string{GetExecPath()});
+
+ std::stringstream builder;
+ builder << std::getenv("YDB_TEST_ROOT") << "/" << execPath.filename().string() << "/" << testInfo->test_suite_name() << "_" << testInfo->name();
+ TopicPath_ = builder.str();
+
+ client.DropTopic(TopicPath_).GetValueSync();
+ CreateTopic(TopicPath_);
+}
+
+void TTopicTestFixture::TearDown() {
+ DropTopic(GetTopicPath());
+}
+
+void TTopicTestFixture::CreateTopic(const std::string& path, const std::string& consumer, size_t partitionCount, std::optional<size_t> maxPartitionCount) {
+ TTopicClient client(MakeDriver());
+
+ TCreateTopicSettings topics;
+ topics
+ .BeginConfigurePartitioningSettings()
+ .MinActivePartitions(partitionCount)
+ .MaxActivePartitions(maxPartitionCount.value_or(partitionCount));
+
+ if (maxPartitionCount.has_value() && maxPartitionCount.value() > partitionCount) {
+ topics
+ .BeginConfigurePartitioningSettings()
+ .BeginConfigureAutoPartitioningSettings()
+ .Strategy(EAutoPartitioningStrategy::ScaleUp);
+ }
+
+ TConsumerSettings<TCreateTopicSettings> consumers(topics, consumer);
+ topics.AppendConsumers(consumers);
+
+ auto status = client.CreateTopic(path, topics).GetValueSync();
+ Y_ENSURE(status.IsSuccess(), status);
+}
+
+std::string TTopicTestFixture::GetTopicPath() {
+ return TopicPath_;
+}
+
+void TTopicTestFixture::DropTopic(const std::string& path) {
+ TTopicClient client(MakeDriver());
+ auto status = client.DropTopic(path).GetValueSync();
+ Y_ENSURE(status.IsSuccess(), status);
+}
+
+TDriverConfig TTopicTestFixture::MakeDriverConfig() const {
+ return TDriverConfig()
+ .SetEndpoint(std::getenv("YDB_ENDPOINT"))
+ .SetDatabase(std::getenv("YDB_DATABASE"))
+ .SetLog(std::unique_ptr<TLogBackend>(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release()));
+}
+
+TDriver TTopicTestFixture::MakeDriver() const {
+ return TDriver(MakeDriverConfig());
+}
+
+std::uint16_t TTopicTestFixture::GetPort() const {
+ auto endpoint = std::getenv("YDB_ENDPOINT");
+ Y_ENSURE(endpoint, "YDB_ENDPOINT is not set");
+
+ auto portPos = std::string(endpoint).find(":");
+ return std::stoi(std::string(endpoint).substr(portPos + 1));
+}
+
+std::vector<std::uint32_t> TTopicTestFixture::GetNodeIds() const {
+ NDiscovery::TDiscoveryClient client(MakeDriver());
+ auto result = client.ListEndpoints().GetValueSync();
+ Y_ENSURE(result.IsSuccess(), static_cast<TStatus>(result));
+
+ std::vector<std::uint32_t> nodeIds;
+ for (const auto& endpoint : result.GetEndpointsInfo()) {
+ nodeIds.push_back(endpoint.NodeId);
+ }
+
+ return nodeIds;
+}
+
+}
diff --git a/ydb/public/sdk/cpp/tests/integration/topic/setup/fixture.h b/ydb/public/sdk/cpp/tests/integration/topic/setup/fixture.h
new file mode 100644
index 00000000000..71e59189dbd
--- /dev/null
+++ b/ydb/public/sdk/cpp/tests/integration/topic/setup/fixture.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
+
+#include <gtest/gtest.h>
+
+namespace NYdb::inline Dev::NTopic::NTests {
+
+class TTopicTestFixture : public ::testing::Test {
+public:
+ void SetUp() override;
+ void TearDown() override;
+
+ void CreateTopic(const std::string& path, const std::string& consumer = "test-consumer", std::size_t partitionCount = 1,
+ std::optional<std::size_t> maxPartitionCount = std::nullopt);
+
+ std::string GetTopicPath();
+
+ void DropTopic(const std::string& path);
+
+ TDriverConfig MakeDriverConfig() const;
+
+ TDriver MakeDriver() const;
+
+ std::uint16_t GetPort() const;
+ std::vector<std::uint32_t> GetNodeIds() const;
+private:
+ std::string TopicPath_;
+};
+
+}
diff --git a/ydb/public/sdk/cpp/tests/integration/topic/setup/ya.make b/ydb/public/sdk/cpp/tests/integration/topic/setup/ya.make
new file mode 100644
index 00000000000..86ee3a8bd70
--- /dev/null
+++ b/ydb/public/sdk/cpp/tests/integration/topic/setup/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ fixture.cpp
+)
+
+PEERDIR(
+ ydb/public/sdk/cpp/src/client/discovery
+ ydb/public/sdk/cpp/src/client/topic
+ library/cpp/testing/gtest
+)
+
+END()
diff --git a/ydb/public/sdk/cpp/tests/integration/topic/trace.cpp b/ydb/public/sdk/cpp/tests/integration/topic/trace.cpp
new file mode 100644
index 00000000000..47c259189c5
--- /dev/null
+++ b/ydb/public/sdk/cpp/tests/integration/topic/trace.cpp
@@ -0,0 +1,166 @@
+#include "utils/trace.h"
+
+#include <gtest/gtest.h>
+
+namespace NYdb::inline Dev::NTopic::NTests {
+
+TEST(Trace, SkipSpaces) {
+ ASSERT_EQ(SkipSpaces(""), "");
+ ASSERT_EQ(SkipSpaces(" "), "");
+ ASSERT_EQ(SkipSpaces(" a"), "a");
+ ASSERT_EQ(SkipSpaces(" a "), "a ");
+}
+
+TEST(Trace, NextToken) {
+ ASSERT_EQ(NextToken(""), "");
+ ASSERT_EQ(NextToken(" "), "");
+ ASSERT_EQ(NextToken("a"), "a");
+ ASSERT_EQ(NextToken(" a"), "a");
+ ASSERT_EQ(NextToken(" a "), "a");
+ std::string_view b("a=1");
+ ASSERT_EQ(NextToken(b, '='), "a");
+ ASSERT_EQ(b, "1");
+}
+
+TEST(Trace, TTraceEvent) {
+ ASSERT_THROW(TTraceEvent::FromString(""), TTraceException);
+ std::string const eventName("init");
+ {
+ std::string s(eventName);
+ auto ev = TTraceEvent::FromString(s);
+ ASSERT_EQ(ev.Event, eventName);
+ ASSERT_TRUE(ev.KeyValues.empty());
+ }
+ {
+ std::string s(eventName + " a");
+ auto ev = TTraceEvent::FromString(s);
+ ASSERT_EQ(ev.Event, eventName);
+ ASSERT_EQ(ev.KeyValues.size(), 1u);
+ ASSERT_TRUE(ev.KeyValues.at("a").empty());
+ }
+ {
+ std::string s(eventName + " a b");
+ auto ev = TTraceEvent::FromString(s);
+ ASSERT_EQ(ev.Event, eventName);
+ ASSERT_EQ(ev.KeyValues.size(), 2u);
+ ASSERT_TRUE(ev.KeyValues.at("a").empty());
+ ASSERT_TRUE(ev.KeyValues.at("b").empty());
+ }
+ {
+ std::string s(eventName + " =");
+ auto ev = TTraceEvent::FromString(s);
+ ASSERT_EQ(ev.Event, eventName);
+ ASSERT_EQ(ev.KeyValues.size(), 1u);
+ ASSERT_TRUE(ev.KeyValues.at("").empty());
+ }
+ {
+ std::string s(eventName + " a=1 b");
+ auto ev = TTraceEvent::FromString(s);
+ ASSERT_EQ(ev.Event, eventName);
+ ASSERT_EQ(ev.KeyValues.size(), 2u);
+ ASSERT_EQ(ev.KeyValues.at("a"), "1");
+ ASSERT_TRUE(ev.KeyValues.at("b").empty());
+ }
+ {
+ std::string s(eventName + " a b=2");
+ auto ev = TTraceEvent::FromString(s);
+ ASSERT_EQ(ev.Event, eventName);
+ ASSERT_EQ(ev.KeyValues.size(), 2u);
+ ASSERT_TRUE(ev.KeyValues.at("a").empty());
+ ASSERT_EQ(ev.KeyValues.at("b"), "2");
+ }
+ {
+ std::string s(eventName + " a=1 b=2");
+ auto ev = TTraceEvent::FromString(s);
+ ASSERT_EQ(ev.Event, eventName);
+ ASSERT_EQ(ev.KeyValues.size(), 2u);
+ ASSERT_EQ(ev.KeyValues.at("a"), "1");
+ ASSERT_EQ(ev.KeyValues.at("b"), "2");
+ }
+ {
+ TExpectedTraceEvent expected = {eventName, {{"a", {"1"}}}, {"d"}};
+ ASSERT_FALSE(expected.Matches({"", {}}));
+ ASSERT_FALSE(expected.Matches({eventName, {}}));
+ ASSERT_FALSE(expected.Matches({eventName, {{"a", ""}}}));
+ ASSERT_FALSE(expected.Matches({eventName, {{"a", "0"}}}));
+ ASSERT_FALSE(expected.Matches({eventName, {{"c", "1"}}}));
+ ASSERT_TRUE(expected.Matches({eventName, {{"a", "1"}}}));
+ ASSERT_TRUE(expected.Matches({eventName, {{"a", "1"}, {"b", "2"}}}));
+ ASSERT_FALSE(expected.Matches({eventName, {{"a", "1"}, {"d", "4"}}})); // The "d" should NOT appear in the event.
+ }
+}
+
+TEST(Trace, TExpectedTraceEvent) {
+ ASSERT_THROW(TExpectedTraceEvent::FromString(""), TTraceException);
+ std::string const eventName("init");
+ {
+ std::string s(eventName);
+ auto ev = TExpectedTraceEvent::FromString(s);
+ ASSERT_EQ(ev.Event, eventName);
+ ASSERT_TRUE(ev.KeyValues.empty());
+ }
+ {
+ std::string s(eventName + " a");
+ auto ev = TExpectedTraceEvent::FromString(s);
+ ASSERT_EQ(ev.Event, eventName);
+ ASSERT_EQ(ev.KeyValues.size(), 1u);
+ ASSERT_TRUE(ev.KeyValues.at("a").empty());
+ }
+ {
+ std::string s(eventName + " a b");
+ auto ev = TExpectedTraceEvent::FromString(s);
+ ASSERT_EQ(ev.Event, eventName);
+ ASSERT_EQ(ev.KeyValues.size(), 2u);
+ ASSERT_TRUE(ev.KeyValues.at("a").empty());
+ ASSERT_TRUE(ev.KeyValues.at("b").empty());
+ }
+ {
+ std::string s(eventName + " =");
+ auto ev = TExpectedTraceEvent::FromString(s);
+ ASSERT_EQ(ev.Event, eventName);
+ ASSERT_EQ(ev.KeyValues.size(), 1u);
+ ASSERT_TRUE(ev.KeyValues.at("").empty());
+ }
+ {
+ std::string s(eventName + " a=1 b");
+ auto ev = TExpectedTraceEvent::FromString(s);
+ ASSERT_EQ(ev.Event, eventName);
+ ASSERT_EQ(ev.KeyValues.size(), 2u);
+ ASSERT_EQ(ev.KeyValues.at("a"), "1");
+ ASSERT_TRUE(ev.KeyValues.at("b").empty());
+ }
+ {
+ std::string s(eventName + " a b=2");
+ auto ev = TExpectedTraceEvent::FromString(s);
+ ASSERT_EQ(ev.Event, eventName);
+ ASSERT_EQ(ev.KeyValues.size(), 2u);
+ ASSERT_TRUE(ev.KeyValues.at("a").empty());
+ ASSERT_EQ(ev.KeyValues.at("b"), "2");
+ }
+ {
+ std::string s(eventName + " a=1 b=2");
+ auto ev = TExpectedTraceEvent::FromString(s);
+ ASSERT_EQ(ev.Event, eventName);
+ ASSERT_EQ(ev.KeyValues.size(), 2u);
+ ASSERT_EQ(ev.KeyValues.at("a"), "1");
+ ASSERT_EQ(ev.KeyValues.at("b"), "2");
+ }
+ {
+ std::string s(eventName + " !a");
+ auto ev = TExpectedTraceEvent::FromString(s);
+ ASSERT_EQ(ev.Event, eventName);
+ ASSERT_TRUE(ev.KeyValues.empty());
+ ASSERT_EQ(ev.DeniedKeys.size(), 1u);
+ ASSERT_EQ(ev.DeniedKeys[0], "a");
+ }
+}
+
+TEST(Trace, TExpectedTrace) {
+ TExpectedTrace expected{"A", "B"};
+ std::vector<TTraceEvent> events{{"X", {}}, {"A", {}}, {"X", {}}, {"B", {}}, {"X", {}}};
+ ASSERT_TRUE(expected.Matches(events));
+ expected = {"A", "B", "C"};
+ ASSERT_FALSE(expected.Matches(events));
+}
+
+}
diff --git a/ydb/public/sdk/cpp/tests/integration/topic/utils/trace.cpp b/ydb/public/sdk/cpp/tests/integration/topic/utils/trace.cpp
new file mode 100644
index 00000000000..12f6a3fc214
--- /dev/null
+++ b/ydb/public/sdk/cpp/tests/integration/topic/utils/trace.cpp
@@ -0,0 +1,160 @@
+#include "trace.h"
+
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/library/string_utils/helpers/helpers.h>
+
+#include <mutex>
+
+namespace NYdb::inline Dev::NTopic::NTests {
+
+std::string_view SkipSpaces(std::string_view& b) {
+ while (b.starts_with(' ')) {
+ b.remove_prefix(1);
+ }
+ return b;
+}
+
+std::string_view SkipSpaces(std::string_view&& b) {
+ return SkipSpaces(b);
+}
+
+std::string_view NextToken(std::string_view& b, char delim) {
+ SkipSpaces(b);
+ return NYdb::NUtils::NextTok(b, delim);
+}
+
+std::string_view NextToken(std::string_view&& b, char delim) {
+ return NextToken(b, delim);
+}
+
+TTraceEvent TTraceEvent::FromString(const std::string& s) {
+ std::string_view b(s);
+ std::string_view event(NextToken(b));
+ if (event.empty()) {
+ throw TTraceException("Wrong tracing log (format), event token not found");
+ }
+ std::unordered_map<std::string, std::string> keyValues;
+ for (std::string_view pair, key; ; ) {
+ pair = NextToken(b);
+ if (pair.empty()) {
+ break;
+ }
+ key = NYdb::NUtils::NextTok(pair, '=');
+ keyValues.emplace(key, pair);
+ }
+ return {std::string(event), std::move(keyValues)};
+}
+
+bool TExpectedTraceEvent::Matches(const TTraceEvent& event) const {
+ if (Event != event.Event) {
+ return false;
+ }
+ for (auto const& k : DeniedKeys) {
+ auto it = event.KeyValues.find(k);
+ if (it != end(event.KeyValues)) {
+ return false;
+ }
+ }
+ for (auto const& [expectedKey, expectedValue] : KeyValues) {
+ auto it = event.KeyValues.find(expectedKey);
+ if (it == end(event.KeyValues) || !(expectedValue.empty() || expectedValue == it->second)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+TExpectedTraceEvent TExpectedTraceEvent::FromString(const std::string& s) {
+ std::string_view b(s);
+ std::string_view event(NextToken(b));
+ if (event.empty()) {
+ throw TTraceException("Wrong tracing log format, event token not found");
+ }
+ std::unordered_map<std::string, std::string> keyValues;
+ std::vector<std::string> missingKeys;
+ for (std::string_view pair, key; ; ) {
+ pair = NextToken(b);
+ if (pair.empty()) {
+ break;
+ }
+ key = NYdb::NUtils::NextTok(pair, '=');
+ if (key.starts_with('!')) {
+ key.remove_prefix(1);
+ missingKeys.emplace_back(key);
+ } else {
+ keyValues.emplace(key, pair);
+ }
+ }
+ return {std::string(event), std::move(keyValues), std::move(missingKeys)};
+}
+
+TExpectedTrace::TExpectedTrace(std::initializer_list<std::string> expected) {
+ Expected.reserve(expected.size());
+ for (auto const& e : expected) {
+ Expected.emplace_back(TExpectedTraceEvent::FromString(e));
+ }
+}
+
+bool TExpectedTrace::Matches(std::span<TTraceEvent const> events) const {
+ std::span<TExpectedTraceEvent const> expected(Expected);
+ while (!expected.empty() && expected.size() <= events.size()) {
+ if (expected[0].Matches(events[0])) {
+ expected = expected.subspan(1);
+ }
+ events = events.subspan(1);
+ }
+ return expected.empty();
+}
+
+std::string_view SkipPrefixLog(std::string_view& b) {
+ NextToken(b); // Skip the timestamp
+ NextToken(b); // Skip the log level
+ NextToken(b); // Skip database path
+ SkipSpaces(b);
+ return b;
+}
+
+bool CleanTraceEventBuf(std::string_view& b, std::string_view traceEventMarker) {
+ SkipPrefixLog(b);
+ if (!b.starts_with(traceEventMarker)) {
+ return false;
+ }
+ while (b.ends_with('\n')) {
+ b.remove_suffix(1);
+ }
+ b.remove_prefix(traceEventMarker.size());
+ return true;
+}
+
+void TTracingBackend::WriteData(const TLogRecord& rec) {
+ std::lock_guard lg(Lock);
+ if (rec.Priority != TLOG_RESOURCES) {
+ return;
+ }
+ if (std::string_view b(rec.Data, rec.Len); CleanTraceEventBuf(b, TraceEventMarker)) {
+ std::string s(b);
+ TTraceEvent e(EventParser(s));
+ if (EventPromise.Initialized() && !EventPromise.HasValue() && ExpectedEvent.Matches(e)) {
+ EventPromise.SetValue();
+ }
+ Log.emplace_back(std::move(s));
+ Events.emplace_back(std::move(e));
+ }
+}
+
+std::vector<std::string> TTracingBackend::GetLog() const {
+ std::lock_guard lg(Lock);
+ return Log;
+}
+
+std::vector<TTraceEvent> TTracingBackend::GetEvents() const {
+ std::lock_guard lg(Lock);
+ return Events;
+}
+
+NThreading::TFuture<void> TTracingBackend::WaitForEvent(const std::string& eventName) {
+ EventPromise = NThreading::NewPromise();
+ ExpectedEvent = {eventName, {}, {}};
+ return EventPromise.GetFuture();
+}
+
+}
diff --git a/ydb/public/sdk/cpp/tests/integration/topic/utils/trace.h b/ydb/public/sdk/cpp/tests/integration/topic/utils/trace.h
new file mode 100644
index 00000000000..c500756a32d
--- /dev/null
+++ b/ydb/public/sdk/cpp/tests/integration/topic/utils/trace.h
@@ -0,0 +1,120 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/src/client/topic/common/trace_lazy.h>
+
+#include <library/cpp/logger/backend.h>
+#include <library/cpp/logger/record.h>
+#include <library/cpp/threading/future/core/future.h>
+
+#include <span>
+
+namespace NYdb::inline Dev::NTopic::NTests {
+
+class TTraceException : public std::exception {
+public:
+ TTraceException(const std::string& message)
+ : Message(message)
+ {}
+
+ const char* what() const noexcept override {
+ return Message.c_str();
+ }
+
+private:
+ std::string Message;
+};
+
+std::string_view SkipSpaces(std::string_view& b);
+std::string_view SkipSpaces(std::string_view&& b);
+
+std::string_view NextToken(std::string_view& b, char delim = ' ');
+std::string_view NextToken(std::string_view&& b, char delim = ' ');
+
+struct TTraceEvent {
+ std::string Event;
+ std::unordered_map<std::string, std::string> KeyValues;
+
+ // Expects "event [key[=value]]*" kind of string. No spaces around the = sign. Values are optional.
+ static TTraceEvent FromString(const std::string& s);
+};
+
+struct TExpectedTraceEvent {
+ std::string Event;
+ std::unordered_map<std::string, std::string> KeyValues; // These key-value pairs should be in the event as is.
+ std::vector<std::string> DeniedKeys; // These keys should NOT appear in the event.
+
+ bool Matches(TTraceEvent const& event) const;
+
+ // Expects "event [[!]key[=value]]*" kind of string. No spaces around the = sign. Values are optional.
+ // The bang symbol right before a key denies the key in events.
+ static TExpectedTraceEvent FromString(const std::string& s);
+};
+
+struct TExpectedTrace {
+ std::vector<TExpectedTraceEvent> Expected;
+
+ TExpectedTrace(std::initializer_list<TExpectedTraceEvent> expected) : Expected(expected) {}
+ TExpectedTrace(std::initializer_list<std::string> expected);
+
+ // Check if the Expected events are a subsequence of the events.
+ bool Matches(std::span<TTraceEvent const> events) const;
+};
+
+// The log formatter is configured in TDriverConfig::SetDatabase using the GetPrefixLogFormatter function.
+// SkipPrefixLog skips the prefix that currently looks like "2024-02-13T07:51:07.979754Z :INFO: [/Root]".
+// It'd be better if TTracingBackend received strings before formatter does, but I don't know how to do it.
+std::string_view SkipPrefixLog(std::string_view& b);
+
+// If possible, transforms the buffer to the form of "event [key[=value]]*".
+bool CleanTraceEventBuf(std::string_view& b, std::string_view traceEventMarker);
+
+// Log backend for tracing events. Expects "... <traceEventMarker> <event-string>" kind of strings.
+// The "<event-string>" substring is parsed by the Parser function that returns a TTraceEvent object.
+// To get the trace, call GetEvents method.
+class TTracingBackend : public TLogBackend {
+public:
+ TTracingBackend(const std::string& traceEventMarker = TRACE_EVENT_MARKER, std::function<TTraceEvent(const std::string&)> parser = TTraceEvent::FromString)
+ : TraceEventMarker(traceEventMarker)
+ , EventParser(parser) {}
+
+ // Only logs strings on TRACE log level that start with the TraceEventMarker (after we strip the log prefix).
+ void WriteData(const TLogRecord& rec) override;
+ void ReopenLog() override {}
+ ELogPriority FiltrationLevel() const override { return TLOG_RESOURCES; }
+public:
+ std::vector<std::string> GetLog() const;
+ std::vector<TTraceEvent> GetEvents() const;
+
+ // Returns a feature that is fulfilled when an event with a particular name gets logged.
+ NThreading::TFuture<void> WaitForEvent(const std::string& eventName);
+private:
+ mutable TAdaptiveLock Lock;
+ std::string TraceEventMarker; // Log strings that start with this marker are parsed and stored.
+ std::function<TTraceEvent(const std::string&)> EventParser;
+ std::vector<std::string> Log; // Stores clean strings from log records that contained TraceEventMarker.
+ std::vector<TTraceEvent> Events; // Stores parsed trace events.
+ NThreading::TPromise<void> EventPromise;
+ TExpectedTraceEvent ExpectedEvent;
+};
+
+class TCompositeLogBackend : public TLogBackend {
+public:
+ TCompositeLogBackend(std::vector<std::unique_ptr<TLogBackend>>&& underlyingBackends)
+ : UnderlyingBackends(std::move(underlyingBackends))
+ {
+ }
+
+ void WriteData(const TLogRecord& rec) override {
+ for (auto& b: UnderlyingBackends) {
+ b->WriteData(rec);
+ }
+ }
+
+ void ReopenLog() override {
+ }
+
+private:
+ std::vector<std::unique_ptr<TLogBackend>> UnderlyingBackends;
+};
+
+}
diff --git a/ydb/public/sdk/cpp/tests/integration/topic/utils/ya.make b/ydb/public/sdk/cpp/tests/integration/topic/utils/ya.make
new file mode 100644
index 00000000000..1e73608eb18
--- /dev/null
+++ b/ydb/public/sdk/cpp/tests/integration/topic/utils/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+SRCS(
+ trace.cpp
+)
+
+PEERDIR(
+ library/cpp/logger
+ library/cpp/threading/future
+ ydb/public/sdk/cpp/src/client/topic
+ ydb/public/sdk/cpp/src/library/string_utils/helpers
+)
+
+END()
diff --git a/ydb/public/sdk/cpp/tests/integration/topic/with_direct_read/ya.make b/ydb/public/sdk/cpp/tests/integration/topic/with_direct_read/ya.make
new file mode 100644
index 00000000000..34367f57b60
--- /dev/null
+++ b/ydb/public/sdk/cpp/tests/integration/topic/with_direct_read/ya.make
@@ -0,0 +1,35 @@
+GTEST(topic_direct_read_it)
+
+INCLUDE(${ARCADIA_ROOT}/ydb/public/sdk/cpp/tests/integration/tests_common.inc)
+INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)
+
+REQUIREMENTS(ram:32)
+
+FORK_SUBTESTS()
+
+IF (SANITIZER_TYPE OR WITH_VALGRIND)
+ SIZE(LARGE)
+ TAG(ya:fat)
+ELSE()
+ SIZE(MEDIUM)
+ENDIF()
+
+PEERDIR(
+ ydb/public/sdk/cpp/src/client/persqueue_public
+ ydb/public/sdk/cpp/src/client/topic
+ ydb/public/sdk/cpp/tests/integration/topic/setup
+ ydb/public/sdk/cpp/tests/integration/topic/utils
+)
+
+YQL_LAST_ABI_VERSION()
+
+ENV(PQ_EXPERIMENTAL_DIRECT_READ="1")
+
+SRCS(
+ ../basic_usage.cpp
+ ../describe_topic.cpp
+ ../direct_read.cpp
+ ../local_partition.cpp
+)
+
+END()
diff --git a/ydb/public/sdk/cpp/tests/integration/topic/ya.make b/ydb/public/sdk/cpp/tests/integration/topic/ya.make
index d10d13b6169..bff22c7c923 100644
--- a/ydb/public/sdk/cpp/tests/integration/topic/ya.make
+++ b/ydb/public/sdk/cpp/tests/integration/topic/ya.make
@@ -1,4 +1,6 @@
-GTEST()
+GTEST(topic_it)
+
+INCLUDE(${ARCADIA_ROOT}/ydb/public/sdk/cpp/tests/integration/tests_common.inc)
INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)
REQUIREMENTS(ram:32)
@@ -12,17 +14,24 @@ ELSE()
SIZE(MEDIUM)
ENDIF()
-FORK_SUBTESTS()
-
PEERDIR(
ydb/public/sdk/cpp/src/client/persqueue_public
ydb/public/sdk/cpp/src/client/topic
+ ydb/public/sdk/cpp/tests/integration/topic/setup
+ ydb/public/sdk/cpp/tests/integration/topic/utils
)
YQL_LAST_ABI_VERSION()
SRCS(
basic_usage.cpp
+ describe_topic.cpp
+ local_partition.cpp
+ trace.cpp
)
END()
+
+RECURSE_FOR_TESTS(
+ with_direct_read
+)