aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorqyryq <qyryq@ydb.tech>2025-05-28 12:12:27 +0300
committerGitHub <noreply@github.com>2025-05-28 12:12:27 +0300
commit3c6c01111a028a661c25d7ad10b9c81c755f334b (patch)
tree873b0a8d5a2368c913c4c087a5bf3fc3496304aa
parent3c803ccabbe7ffcacff2db6b7183cff15203cb10 (diff)
downloadydb-3c6c01111a028a661c25d7ad10b9c81c755f334b.tar.gz
Topic SDK direct read (#18263)
Co-authored-by: Bulat <bylatgr@gmail.com>
-rw-r--r--ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp1
-rw-r--r--ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h1
-rw-r--r--ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/read_events.h6
-rw-r--r--ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/read_session.h3
-rw-r--r--ydb/public/sdk/cpp/src/client/persqueue_public/impl/ya.make3
-rw-r--r--ydb/public/sdk/cpp/src/client/persqueue_public/ut/read_session_ut.cpp5
-rw-r--r--ydb/public/sdk/cpp/src/client/persqueue_public/ut/ut_utils/ut_utils.h21
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/impl/direct_reader.cpp1044
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/impl/direct_reader.h265
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/impl/read_session.cpp9
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/impl/read_session_impl.h99
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/impl/read_session_impl.ipp455
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/impl/topic.cpp6
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/impl/topic_impl.cpp7
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/impl/topic_impl.h6
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/impl/ya.make2
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/ut/basic_usage_ut.cpp17
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/ut/direct_read_ut.cpp2224
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/ut/with_direct_read_ut/ya.make46
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/ut/ya.make4
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/ya.make2
-rw-r--r--ydb/public/sdk/cpp/tests/integration/topic/basic_usage.cpp40
22 files changed, 4157 insertions, 109 deletions
diff --git a/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp
index cd80af0b1ec..c6e6b36622f 100644
--- a/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp
+++ b/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp
@@ -249,7 +249,6 @@ Y_UNIT_TEST_SUITE(CommitOffset) {
auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1,
r1.StartPartitionSessionEvents.back().GetPartitionSession()->GetReadSessionId());
UNIT_ASSERT(!result.IsSuccess());
-
UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 0));
UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1));
UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h
index 09611b1e07c..27176afa020 100644
--- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h
+++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h
@@ -128,6 +128,7 @@ private:
class TPartitionLocation {
public:
TPartitionLocation(const Ydb::Topic::PartitionLocation& partitionLocation);
+ TPartitionLocation(std::int32_t nodeId, std::int64_t generation);
int32_t GetNodeId() const;
int64_t GetGeneration() const;
diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/read_events.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/read_events.h
index b619abeb5dc..824ff08b81f 100644
--- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/read_events.h
+++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/read_events.h
@@ -1,10 +1,12 @@
#pragma once
#include "codecs.h"
+#include "control_plane.h"
#include "events_common.h"
#include <util/datetime/base.h>
+
namespace NYdb::inline Dev::NTopic {
//! Partition session.
@@ -42,10 +44,14 @@ public:
}
protected:
+
uint64_t PartitionSessionId;
std::string TopicPath;
std::string ReadSessionId;
uint64_t PartitionId;
+ std::optional<TPartitionLocation> Location;
+ /*TDirectReadId*/ std::int64_t NextDirectReadId = 1;
+ std::optional</*TDirectReadId*/ std::int64_t> LastDirectReadId;
};
template<>
diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/read_session.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/read_session.h
index 81d2a26e97b..e7131ffd734 100644
--- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/read_session.h
+++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/read_session.h
@@ -193,6 +193,9 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> {
//! AutoPartitioningSupport.
FLUENT_SETTING_DEFAULT(bool, AutoPartitioningSupport, false);
+ // TODO(qyryq) Uncomment when direct read is ready.
+ // FLUENT_SETTING_DEFAULT(bool, DirectRead, false);
+
//! Log.
FLUENT_SETTING_OPTIONAL(TLog, Log);
};
diff --git a/ydb/public/sdk/cpp/src/client/persqueue_public/impl/ya.make b/ydb/public/sdk/cpp/src/client/persqueue_public/impl/ya.make
index ccf8d436279..75c739588bb 100644
--- a/ydb/public/sdk/cpp/src/client/persqueue_public/impl/ya.make
+++ b/ydb/public/sdk/cpp/src/client/persqueue_public/impl/ya.make
@@ -30,7 +30,8 @@ PEERDIR(
ydb/public/sdk/cpp/src/client/topic/codecs
ydb/public/sdk/cpp/src/client/topic/common
ydb/public/sdk/cpp/src/client/topic/impl
-
+ ydb/public/sdk/cpp/src/client/scheme
+
)
END()
diff --git a/ydb/public/sdk/cpp/src/client/persqueue_public/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/src/client/persqueue_public/ut/read_session_ut.cpp
index 44e29911f41..1ed01b4c252 100644
--- a/ydb/public/sdk/cpp/src/client/persqueue_public/ut/read_session_ut.cpp
+++ b/ydb/public/sdk/cpp/src/client/persqueue_public/ut/read_session_ut.cpp
@@ -513,7 +513,6 @@ public:
std::shared_ptr<TCallbackContext<TSingleClusterReadSessionImpl>> CbContext;
std::shared_ptr<TThreadPool> ThreadPool;
::IExecutor::TPtr DefaultExecutor;
- std::shared_ptr<std::unordered_map<ECodec, THolder<NTopic::ICodec>>> ProvidedCodecs = std::make_shared<std::unordered_map<ECodec, THolder<NTopic::ICodec>>>();
};
class TReorderingExecutor : public ::IExecutor {
@@ -588,10 +587,6 @@ TReadSessionImplTestSetup::TReadSessionImplTestSetup() {
.Counters(MakeIntrusive<NYdb::NPersQueue::TReaderCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>()));
Log.SetFormatter(GetPrefixLogFormatter(""));
-
- (*ProvidedCodecs)[ECodec::GZIP] = MakeHolder<NTopic::TGzipCodec>();
- (*ProvidedCodecs)[ECodec::LZOP] = MakeHolder<NTopic::TUnsupportedCodec>();
- (*ProvidedCodecs)[ECodec::ZSTD] = MakeHolder<NTopic::TZstdCodec>();
}
TReadSessionImplTestSetup::~TReadSessionImplTestSetup() noexcept(false) {
diff --git a/ydb/public/sdk/cpp/src/client/persqueue_public/ut/ut_utils/ut_utils.h b/ydb/public/sdk/cpp/src/client/persqueue_public/ut/ut_utils/ut_utils.h
index aece8b1de6c..a6b48ae645a 100644
--- a/ydb/public/sdk/cpp/src/client/persqueue_public/ut/ut_utils/ut_utils.h
+++ b/ydb/public/sdk/cpp/src/client/persqueue_public/ut/ut_utils/ut_utils.h
@@ -14,12 +14,22 @@ using NYdb::NTopic::IAsyncExecutor;
namespace NYdb::NPersQueue::NTests {
+struct TPersQueueYdbSdkTestSetupSettings {
+ TString TestCaseName;
+ bool Start = true;
+ TVector<NKikimrServices::EServiceKikimr> LogServices = ::NPersQueue::TTestServer::LOGGED_SERVICES;
+ NActors::NLog::EPriority LogPriority = NActors::NLog::PRI_DEBUG;
+ ui32 NodeCount = NKikimr::NPersQueueTests::PQ_DEFAULT_NODE_COUNT;
+ size_t TopicPartitionsCount = 1;
+};
+
class TPersQueueYdbSdkTestSetup : public ::NPersQueue::SDKTestSetup {
THolder<NYdb::TDriver> Driver;
THolder<NYdb::NPersQueue::TPersQueueClient> PersQueueClient;
TAdaptiveLock Lock;
public:
+ // TODO(qyryq) Delete this ctor in favor of TPersQueueYdbSdkTestSetupSettings.
TPersQueueYdbSdkTestSetup(const TString& testCaseName, bool start = true,
const TVector<NKikimrServices::EServiceKikimr>& logServices = ::NPersQueue::TTestServer::LOGGED_SERVICES,
NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG,
@@ -29,6 +39,17 @@ public:
{
}
+ TPersQueueYdbSdkTestSetup(TPersQueueYdbSdkTestSetupSettings settings)
+ : SDKTestSetup(
+ settings.TestCaseName,
+ settings.Start,
+ settings.LogServices,
+ settings.LogPriority,
+ settings.NodeCount,
+ settings.TopicPartitionsCount)
+ {
+ }
+
~TPersQueueYdbSdkTestSetup() {
if (PersQueueClient) {
PersQueueClient = nullptr;
diff --git a/ydb/public/sdk/cpp/src/client/topic/impl/direct_reader.cpp b/ydb/public/sdk/cpp/src/client/topic/impl/direct_reader.cpp
new file mode 100644
index 00000000000..bb5d079d106
--- /dev/null
+++ b/ydb/public/sdk/cpp/src/client/topic/impl/direct_reader.cpp
@@ -0,0 +1,1044 @@
+#include "direct_reader.h"
+#include "read_session_impl.ipp"
+
+#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h>
+
+
+namespace NYdb::NTopic {
+
+TDirectReadClientMessage TDirectReadPartitionSession::MakeStartRequest() const {
+ TDirectReadClientMessage req;
+ auto& start = *req.mutable_start_direct_read_partition_session_request();
+ start.set_partition_session_id(PartitionSessionId);
+ start.set_last_direct_read_id(NextDirectReadId - 1);
+ start.set_generation(Location.GetGeneration());
+ return req;
+}
+
+[[nodiscard]] bool TDirectReadPartitionSession::TransitionTo(EState next) {
+ /*
+ On lost connection
+ +---------------------<----------+
+ | | |
+ | On start | |
+ | +----------------+ | |
+ v | v | |
+ ->IDLE<---DELAYED--->STARTING--->WORKING
+ ^ | |
+ Retry | | |
+ +----------<----------+
+ | on StopDRPS
+ |
+ | Retry policy denied another retry
+ v
+ Destroy read session
+ DELAYED->IDLE if callback is called when there's no connection established.
+ */
+
+ if (State == next) {
+ return true;
+ }
+
+ switch (next) {
+ case EState::IDLE: {
+ switch (State) {
+ case EState::DELAYED:
+ case EState::STARTING:
+ case EState::WORKING:
+ State = EState::IDLE;
+ break;
+ default:
+ return false;
+ }
+
+ break;
+ }
+ case EState::DELAYED: {
+ switch (State) {
+ case EState::STARTING:
+ case EState::WORKING:
+ State = EState::DELAYED;
+ break;
+ default:
+ return false;
+ }
+
+ break;
+ }
+ case EState::STARTING: {
+ switch (State) {
+ case EState::IDLE:
+ case EState::DELAYED:
+ State = EState::STARTING;
+ break;
+ default:
+ return false;
+ }
+
+ break;
+ }
+ case EState::WORKING: {
+ if (State != EState::STARTING) {
+ return false;
+ }
+
+ State = EState::WORKING;
+ RetryState = nullptr;
+
+ break;
+ }
+ }
+
+ Y_ABORT_UNLESS(State == next);
+ return true;
+}
+
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// TDirectReadSessionControlCallbacks
+
+TDirectReadSessionControlCallbacks::TDirectReadSessionControlCallbacks(TSingleClusterReadSessionContextPtr contextPtr)
+ : SingleClusterReadSessionContextPtr(contextPtr)
+ {}
+
+void TDirectReadSessionControlCallbacks::OnDirectReadDone(
+ std::shared_ptr<TLockFreeQueue<Ydb::Topic::StreamDirectReadMessage::DirectReadResponse>> responses
+ // Ydb::Topic::StreamDirectReadMessage::DirectReadResponse&& response,
+ // TDeferredActions<false>& deferred
+) {
+ if (auto s = SingleClusterReadSessionContextPtr->LockShared()) {
+ s->OnDirectReadDone(responses);
+ }
+}
+
+void TDirectReadSessionControlCallbacks::AbortSession(TSessionClosedEvent&& closeEvent) {
+ if (auto s = SingleClusterReadSessionContextPtr->LockShared()) {
+ s->AbortSession(std::move(closeEvent));
+ }
+}
+
+void TDirectReadSessionControlCallbacks::ScheduleCallback(TDuration delay, std::function<void()> callback) {
+ if (auto s = SingleClusterReadSessionContextPtr->LockShared()) {
+ s->ScheduleCallback(
+ delay,
+ [callback = std::move(callback)](bool ok) {
+ if (ok) {
+ callback();
+ }
+ }
+ );
+ }
+}
+
+void TDirectReadSessionControlCallbacks::ScheduleCallback(TDuration delay, std::function<void()> callback, TDeferredActions<false>& deferred) {
+ deferred.DeferScheduleCallback(
+ delay,
+ [callback = std::move(callback)](bool ok) {
+ if (ok) {
+ callback();
+ }
+ },
+ SingleClusterReadSessionContextPtr
+ );
+}
+
+void TDirectReadSessionControlCallbacks::StopPartitionSession(TPartitionSessionId partitionSessionId) {
+ if (auto s = SingleClusterReadSessionContextPtr->LockShared()) {
+ s->StopPartitionSession(partitionSessionId);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// TDirectReadSessionManager
+
+TDirectReadSessionManager::TDirectReadSessionManager(
+ TReadSessionId serverSessionId,
+ const NYdb::NTopic::TReadSessionSettings settings,
+ IDirectReadSessionControlCallbacks::TPtr controlCallbacks,
+ NYdbGrpc::IQueueClientContextPtr clientContext,
+ IDirectReadProcessorFactoryPtr processorFactory,
+ TLog log
+)
+ : ReadSessionSettings(settings)
+ , ServerSessionId(serverSessionId)
+ , ClientContext(clientContext)
+ , ProcessorFactory(processorFactory)
+ , ControlCallbacks(controlCallbacks)
+ , Log(log)
+ {}
+
+TDirectReadSessionManager::~TDirectReadSessionManager() {
+ Close();
+}
+
+TStringBuilder TDirectReadSessionManager::GetLogPrefix() const {
+ return TStringBuilder() << static_cast<const void*>(this) << " TDirectReadSessionManager ServerSessionId=" << ServerSessionId << " ";
+}
+
+TDirectReadSessionContextPtr TDirectReadSessionManager::CreateDirectReadSession(TNodeId nodeId) {
+ return MakeWithCallbackContext<TDirectReadSession>(
+ nodeId,
+ ServerSessionId,
+ ReadSessionSettings,
+ ControlCallbacks,
+ ClientContext->CreateContext(),
+ ProcessorFactory,
+ Log);
+}
+
+void TDirectReadSessionManager::Close() {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Close");
+
+ // TODO(qyryq) Cancel contexts, anything else?
+
+ for (auto& [_, nodeSession] : NodeSessions) {
+ if (auto s = nodeSession->LockShared()) {
+ s->Close();
+ }
+ nodeSession->Cancel();
+ }
+}
+
+void TDirectReadSessionManager::StartPartitionSession(TDirectReadPartitionSession&& partitionSession) {
+ auto nodeId = partitionSession.Location.GetNodeId();
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "StartPartitionSession " << partitionSession.PartitionSessionId << " nodeId=" << nodeId);
+ TDirectReadSessionContextPtr& session = NodeSessions[nodeId];
+ if (!session) {
+ session = CreateDirectReadSession(nodeId);
+ }
+ if (auto s = session->LockShared()) {
+ s->Start();
+ s->AddPartitionSession(std::move(partitionSession));
+ }
+ Locations.emplace(partitionSession.PartitionSessionId, partitionSession.Location);
+}
+
+// Delete a partition session from a node (TDirectReadSession), and if there are no more
+// partition sessions on the node, drop connection to it.
+void TDirectReadSessionManager::DeletePartitionSession(TPartitionSessionId partitionSessionId, TNodeSessionsMap::iterator it) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "DeletePartitionSession " << partitionSessionId);
+
+ TDirectReadSessionContextPtr directReadSessionContextPtr;
+ if (auto session = it->second->LockShared()) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "DeletePartitionSession " << partitionSessionId << " LockShared");
+ session->DeletePartitionSession(partitionSessionId);
+ Locations.erase(partitionSessionId);
+ if (session->Closed()) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "DeletePartitionSession " << partitionSessionId << " erase");
+ directReadSessionContextPtr = it->second;
+ NodeSessions.erase(it);
+ }
+ } else {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "DeletePartitionSession " << partitionSessionId << " not found in NodeSessions");
+ }
+ if (directReadSessionContextPtr) {
+ directReadSessionContextPtr->Cancel();
+ }
+}
+
+void TDirectReadSessionManager::UpdatePartitionSession(TPartitionSessionId partitionSessionId, TPartitionId partitionId, TPartitionLocation newLocation) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "UpdatePartitionSession " << partitionSessionId
+ << ", partitionId=" << partitionId
+ << ", newLocation={" << newLocation.GetNodeId() << ", " << newLocation.GetGeneration() << "}");
+ auto locIt = Locations.find(partitionSessionId);
+ if (locIt == Locations.end()) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "UpdatePartitionSession " << partitionSessionId << " not found in Locations");
+ return;
+ }
+ auto oldNodeId = locIt->second.GetNodeId();
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "UpdatePartitionSession 01 oldNodeId=" << oldNodeId << " oldGeneration=" << locIt->second.GetGeneration() << "");
+
+ auto sessionIt = NodeSessions.find(oldNodeId);
+ Y_ABORT_UNLESS(sessionIt != NodeSessions.end());
+
+ TDirectReadId next = 1;
+ std::optional<TDirectReadId> last;
+
+ if (auto session = sessionIt->second->LockShared()) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "UpdatePartitionSession 02");
+ auto ids = session->GetDirectReadIds(partitionSessionId);
+ next = ids.NextDirectReadId;
+ last = ids.LastDirectReadId;
+ } else {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "UpdatePartitionSession " << partitionSessionId << ": could not LockShared");
+ return;
+ }
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "UpdatePartitionSession 03");
+
+ // If oldLoc == newLoc and sessionIt->Empty() after deleting the partition session,
+ // we have to reconnect back to the same node as before. Maybe it's worth to add a special case here.
+ DeletePartitionSession(partitionSessionId, sessionIt);
+
+ // TODO(qyryq) std::move an old RetryState?
+ StartPartitionSession({
+ .PartitionSessionId = partitionSessionId,
+ .PartitionId = partitionId,
+ .Location = newLocation,
+ .NextDirectReadId = next,
+ .LastDirectReadId = last,
+ });
+}
+
+TDirectReadSessionContextPtr TDirectReadSessionManager::ErasePartitionSession(TPartitionSessionId partitionSessionId) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "ErasePartitionSession " << partitionSessionId);
+
+ auto locIt = Locations.find(partitionSessionId);
+ Y_ABORT_UNLESS(locIt != Locations.end());
+ auto nodeId = locIt->second.GetNodeId();
+
+ auto sessionIt = NodeSessions.find(nodeId);
+ Y_ABORT_UNLESS(sessionIt != NodeSessions.end());
+ TDirectReadSessionContextPtr directReadSessionContextPtr = sessionIt->second;
+
+ // Still need to Cancel the TCallbackContext<TDirectReadSession>.
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "ErasePartitionSession " << partitionSessionId << " erase");
+ NodeSessions.erase(sessionIt);
+ Locations.erase(partitionSessionId);
+
+ return directReadSessionContextPtr;
+}
+
+void TDirectReadSessionManager::StopPartitionSession(TPartitionSessionId partitionSessionId) {
+ auto locIt = Locations.find(partitionSessionId);
+ if (locIt == Locations.end()) {
+ // This is possible when we get StartPartitionSessionRequest, then StopPartitionSessionRequest,
+ // without user calling TStartPartitionSessionEvent::Confirm.
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "StopPartitionSession " << partitionSessionId << " not found in Locations");
+ return;
+ }
+ auto nodeId = locIt->second.GetNodeId();
+
+ auto sessionIt = NodeSessions.find(nodeId);
+ if (sessionIt == NodeSessions.end()) {
+ // Same as above.
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "StopPartitionSession " << partitionSessionId << " not found in NodeSessions");
+ return;
+ }
+
+ DeletePartitionSession(partitionSessionId, sessionIt);
+}
+
+bool TDirectReadSessionManager::StopPartitionSessionGracefully(TPartitionSessionId partitionSessionId, TDirectReadId lastDirectReadId) {
+ auto locIt = Locations.find(partitionSessionId);
+ Y_ABORT_UNLESS(locIt != Locations.end());
+
+ auto nodeSessionIt = NodeSessions.find(locIt->second.GetNodeId());
+ Y_ABORT_UNLESS(nodeSessionIt != NodeSessions.end());
+
+ if (auto nodeSession = nodeSessionIt->second->LockShared()) {
+ nodeSession->SetLastDirectReadId(partitionSessionId, lastDirectReadId);
+
+ // Delete the partition session, if we've already received the lastDirectReadId.
+ nodeSession->DeletePartitionSessionIfNeeded(partitionSessionId);
+
+ return true;
+ }
+ return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// TDirectReadSession
+
+TDirectReadSession::TDirectReadSession(
+ TNodeId nodeId,
+ TReadSessionId serverSessionId,
+ const NYdb::NTopic::TReadSessionSettings settings,
+ IDirectReadSessionControlCallbacks::TPtr controlCallbacks,
+ NYdbGrpc::IQueueClientContextPtr clientContext,
+ IDirectReadProcessorFactoryPtr processorFactory,
+ TLog log
+)
+ : ClientContext(clientContext)
+ , ReadSessionSettings(settings)
+ , ServerSessionId(serverSessionId)
+ , ProcessorFactory(processorFactory)
+ , NodeId(nodeId)
+ , IncomingMessagesForControlSession(std::make_shared<TLockFreeQueue<Ydb::Topic::StreamDirectReadMessage::DirectReadResponse>>())
+ , ControlCallbacks(controlCallbacks)
+ , State(EState::CREATED)
+ , Log(log)
+ {
+ }
+
+
+void TDirectReadSession::Start() {
+ with_lock (Lock) {
+ if (State != EState::CREATED) {
+ return;
+ }
+ }
+ Reconnect(TPlainStatus());
+}
+
+void TDirectReadSession::Close() {
+ with_lock (Lock) {
+ CloseImpl();
+ }
+}
+
+void TDirectReadSession::CloseImpl() {
+ if (State >= EState::CLOSING) {
+ return;
+ }
+ State = EState::CLOSED;
+
+ ::NYdb::NTopic::Cancel(ConnectContext);
+ ::NYdb::NTopic::Cancel(ConnectTimeoutContext);
+ ::NYdb::NTopic::Cancel(ConnectDelayContext);
+ if (Processor) {
+ Processor->Cancel();
+ }
+
+ // TODO(qyryq) Do we need to wait for something here?
+ // TODO(qyryq) Do we need a separate CLOSING state?
+}
+
+bool TDirectReadSession::Empty() const {
+ with_lock (Lock) {
+ return PartitionSessions.empty();
+ }
+}
+
+bool TDirectReadSession::Closed() const {
+ with_lock (Lock) {
+ return State >= EState::CLOSED;
+ }
+}
+
+void TDirectReadSession::AddPartitionSession(TDirectReadPartitionSession&& session) {
+ TDeferredActions<false> deferred;
+ with_lock (Lock) {
+ Y_ABORT_UNLESS(State < EState::CLOSING);
+
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "AddPartitionSession " << session.PartitionSessionId);
+ auto [it, inserted] = PartitionSessions.emplace(session.PartitionSessionId, std::move(session));
+ // TODO(qyryq) Abort? Ignore new? Replace old? Anything else?
+ Y_ABORT_UNLESS(inserted);
+
+ SendStartRequestImpl(it->second);
+ }
+}
+
+void TDirectReadSession::SetLastDirectReadId(TPartitionSessionId partitionSessionId, TDirectReadId lastDirectReadId) {
+ with_lock (Lock) {
+ auto it = PartitionSessions.find(partitionSessionId);
+ Y_ABORT_UNLESS(it != PartitionSessions.end());
+
+ if (it->second.LastDirectReadId < lastDirectReadId) {
+ it->second.LastDirectReadId = lastDirectReadId;
+ } else {
+ DeletePartitionSessionImpl(partitionSessionId);
+ }
+ }
+}
+
+TDirectReadSession::TDirectReadIds TDirectReadSession::GetDirectReadIds(TPartitionSessionId id) const {
+ std::lock_guard guard(Lock);
+ auto it = PartitionSessions.find(id);
+
+ Y_ABORT_UNLESS(it != PartitionSessions.end());
+ return {
+ .NextDirectReadId = it->second.NextDirectReadId,
+ .LastDirectReadId = it->second.LastDirectReadId,
+ };
+}
+
+void TDirectReadSession::DeletePartitionSessionIfNeeded(TPartitionSessionId partitionSessionId) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "DeletePartitionSessionIfNeeded 1 partitionSessionId=" << partitionSessionId);
+ with_lock (Lock) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "DeletePartitionSessionIfNeeded 2 partitionSessionId=" << partitionSessionId);
+
+ auto partitionSessionIt = PartitionSessions.find(partitionSessionId);
+ if (partitionSessionIt == PartitionSessions.end()) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "DeletePartitionSessionIfNeeded " << partitionSessionId << " not found");
+ return;
+ }
+
+ auto& partitionSession = partitionSessionIt->second;
+
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "DeletePartitionSessionIfNeeded 3 partitionSessionId=" << partitionSessionId << " partitionSession.LastDirectReadId=" << partitionSession.LastDirectReadId << " partitionSession.NextDirectReadId=" << partitionSession.NextDirectReadId);
+
+ if (partitionSession.LastDirectReadId && partitionSession.NextDirectReadId >= partitionSession.LastDirectReadId) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "DeletePartitionSessionIfNeeded 4 partitionSessionId=" << partitionSessionId);
+ PartitionSessions.erase(partitionSessionIt);
+
+ if (PartitionSessions.empty()) {
+ CloseImpl();
+ }
+ }
+ }
+}
+
+void TDirectReadSession::DeletePartitionSession(TPartitionSessionId partitionSessionId) {
+ with_lock (Lock) {
+ auto it = PartitionSessions.find(partitionSessionId);
+ if (it == PartitionSessions.end()) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "DeletePartitionSession " << partitionSessionId << " not found");
+ return;
+ }
+
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "DeletePartitionSession " << partitionSessionId);
+ PartitionSessions.erase(it);
+
+ if (PartitionSessions.empty()) {
+ CloseImpl();
+ } else {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "DeletePartitionSession " << partitionSessionId << " PartitionSessions is not empty");
+ }
+ }
+}
+
+void TDirectReadSession::DeletePartitionSessionImpl(TPartitionSessionId partitionSessionId) {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
+ auto it = PartitionSessions.find(partitionSessionId);
+ if (it == PartitionSessions.end()) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "DeletePartitionSessionImpl " << partitionSessionId << " not found");
+ return;
+ }
+
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "DeletePartitionSessionImpl " << partitionSessionId);
+ PartitionSessions.erase(it);
+
+ if (PartitionSessions.empty()) {
+ CloseImpl();
+ }
+}
+
+void TDirectReadSession::AbortImpl(TPlainStatus&& status) {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Abort");
+ if (State < EState::CLOSING) {
+ State = EState::CLOSED;
+ ControlCallbacks->AbortSession(std::move(status));
+ }
+}
+
+void TDirectReadSession::OnReadDone(NYdbGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration, TDeferredActions<false>& deferred) {
+ TPlainStatus errorStatus;
+ if (!grpcStatus.Ok()) {
+ errorStatus = TPlainStatus(std::move(grpcStatus));
+ }
+
+ std::optional<TPartitionSessionId> partitionSessionId;
+
+ with_lock (Lock) {
+ if (State >= EState::CLOSING) {
+ return;
+ }
+
+ if (connectionGeneration != ConnectionGeneration) {
+ // TODO(qyryq) Test it.
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "OnReadDone got message for connectionGeneration=" << connectionGeneration
+ << ", expected=" << ConnectionGeneration);
+ return;
+ }
+
+ if (!IsErrorMessage(*ServerMessage)) {
+ if (ServerMessage->server_message_case() != TDirectReadServerMessage::kDirectReadResponse) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "XXXXX subsession got message = " << ServerMessage->ShortDebugString());
+ } else {
+ const auto& data = ServerMessage->direct_read_response().partition_data();
+ const auto partitionSessionId = ServerMessage->direct_read_response().partition_session_id();
+ auto partitionSessionIt = PartitionSessions.find(partitionSessionId);
+ if (partitionSessionIt == PartitionSessions.end()) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "XXXXX subsession got message = DirectReadResponse partitionSessionId=" << partitionSessionId << " not found");
+ }
+ if (data.batches_size() == 0) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "XXXXX subsession got message = DirectReadResponse EMPTY");
+ } else {
+ const auto& firstBatch = data.batches(0);
+ const auto firstOffset = firstBatch.message_data(0).offset();
+ const auto& lastBatch = data.batches(data.batches_size() - 1);
+ const auto lastOffset = lastBatch.message_data(lastBatch.message_data_size() - 1).offset();
+ auto partitionId = partitionSessionIt == PartitionSessions.end() ? -1 : partitionSessionIt->second.PartitionId;
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "XXXXX subsession got message = DirectReadResponse"
+ << " partitionSessionId = " << partitionSessionId
+ << " partitionId = " << partitionId
+ << " directReadId = " << ServerMessage->direct_read_response().direct_read_id()
+ << " firstOffset = " << firstOffset
+ << " lastOffset = " << lastOffset);
+ }
+ }
+ }
+
+ if (errorStatus.Ok()) {
+ if (IsErrorMessage(*ServerMessage)) {
+ errorStatus = MakeErrorFromProto(*ServerMessage);
+ } else {
+ switch (ServerMessage->server_message_case()) {
+ case TDirectReadServerMessage::kInitResponse:
+ OnReadDoneImpl(std::move(*ServerMessage->mutable_init_response()), deferred);
+ break;
+ case TDirectReadServerMessage::kStartDirectReadPartitionSessionResponse:
+ OnReadDoneImpl(std::move(*ServerMessage->mutable_start_direct_read_partition_session_response()), deferred);
+ break;
+ case TDirectReadServerMessage::kStopDirectReadPartitionSession:
+ OnReadDoneImpl(std::move(*ServerMessage->mutable_stop_direct_read_partition_session()), deferred);
+ break;
+ case TDirectReadServerMessage::kDirectReadResponse:
+ partitionSessionId = ServerMessage->mutable_direct_read_response()->partition_session_id();
+ OnReadDoneImpl(std::move(*ServerMessage->mutable_direct_read_response()), deferred);
+ break;
+ case TDirectReadServerMessage::kUpdateTokenResponse:
+ OnReadDoneImpl(std::move(*ServerMessage->mutable_update_token_response()), deferred);
+ break;
+ case TDirectReadServerMessage::SERVER_MESSAGE_NOT_SET:
+ errorStatus = TPlainStatus::Internal("Server message is not set");
+ break;
+ default:
+ errorStatus = TPlainStatus::Internal("Unexpected response from server");
+ break;
+ }
+ }
+
+ if (errorStatus.Ok()) {
+ ReadFromProcessorImpl(deferred); // Read next.
+ }
+ }
+ }
+
+ if (partitionSessionId.has_value()) {
+ deferred.DeferCallback(
+ [
+ callbacks = ControlCallbacks, messages = IncomingMessagesForControlSession,
+ cbContext = SelfContext, partitionSessionId = partitionSessionId.value()
+ ]() {
+ callbacks->OnDirectReadDone(messages);
+ }
+ );
+ }
+
+ if (!errorStatus.Ok()) {
+ ReadSessionSettings.Counters_->Errors->Inc();
+
+ if (!Reconnect(errorStatus)) {
+ with_lock (Lock) {
+ AbortImpl(std::move(errorStatus));
+ }
+ }
+ }
+}
+
+void TDirectReadSession::SendStartRequestImpl(TPartitionSessionId id, bool delayedCall) {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
+ auto it = PartitionSessions.find(id);
+
+ if (it == PartitionSessions.end()) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "SendStartRequestImpl partition session not found, id=" << id);
+ return;
+ }
+
+ SendStartRequestImpl(it->second, delayedCall);
+}
+
+void TDirectReadSession::SendStartRequestImpl(TDirectReadPartitionSession& partitionSession, bool delayedCall) {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "SendStartRequestImpl partitionSession.State=" << int(partitionSession.State)
+ << " delayedCall=" << delayedCall);
+
+ bool isImmediateCall = partitionSession.State == TDirectReadPartitionSession::EState::IDLE && !delayedCall;
+ bool isDelayedCall = partitionSession.State == TDirectReadPartitionSession::EState::DELAYED && delayedCall;
+
+ if (!isImmediateCall && !isDelayedCall) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "SendStartRequestImpl bail out 0, not an immediate nor a delayed call");
+ return;
+ }
+
+ if (State < EState::WORKING) {
+ if (isDelayedCall) {
+ // It's time to send a delayed Start-request, but there is no working connection at the moment.
+ // Reset the partition session state, so the request is sent as soon as the connection is reestablished.
+ bool transitioned = partitionSession.TransitionTo(TDirectReadPartitionSession::EState::IDLE);
+ Y_ABORT_UNLESS(transitioned);
+ } // Otherwise, the session is already IDLE.
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "SendStartRequestImpl bail out 1");
+ return;
+ }
+
+ if (State > EState::WORKING) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "SendStartRequestImpl: the session is not usable anymore");
+ return;
+ }
+
+ Y_ABORT_UNLESS(State == EState::WORKING);
+
+ bool transitioned = partitionSession.TransitionTo(TDirectReadPartitionSession::EState::STARTING);
+ Y_ABORT_UNLESS(transitioned);
+ auto startRequest = partitionSession.MakeStartRequest();
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "SendStartRequestImpl send request " << startRequest.ShortDebugString());
+ WriteToProcessorImpl(std::move(startRequest));
+}
+
+void TDirectReadSession::DelayStartRequestImpl(TDirectReadPartitionSession& partitionSession, TPlainStatus&& status, TDeferredActions<false>& deferred) {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
+ Y_ABORT_UNLESS(partitionSession.State == TDirectReadPartitionSession::EState::STARTING ||
+ partitionSession.State == TDirectReadPartitionSession::EState::WORKING);
+
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "DelayStartRequestImpl");
+
+ if (!partitionSession.RetryState) {
+ partitionSession.RetryState = ReadSessionSettings.RetryPolicy_->CreateRetryState();
+ }
+
+ TMaybe<TDuration> delay = partitionSession.RetryState->GetNextRetryDelay(status.Status);
+ if (!delay.Defined()) {
+ AbortImpl(std::move(status));
+ return;
+ }
+
+ bool transitioned = partitionSession.TransitionTo(TDirectReadPartitionSession::EState::DELAYED);
+ Y_ABORT_UNLESS(transitioned);
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Send StartDirectReadPartitionSession request in " << delay);
+
+ ControlCallbacks->ScheduleCallback(
+ *delay,
+ [context = this->SelfContext, id = partitionSession.PartitionSessionId]() {
+ if (auto s = context->LockShared()) {
+ with_lock (s->Lock) {
+ s->SendStartRequestImpl(id, /* delayedCall = */ true);
+ }
+ }
+ },
+ deferred
+ );
+}
+
+void TDirectReadSession::OnReadDoneImpl(Ydb::Topic::StreamDirectReadMessage::InitResponse&& response, TDeferredActions<false>&) {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
+ Y_ABORT_UNLESS(State == EState::INITIALIZING);
+ State = EState::WORKING;
+
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Got InitResponse " << response.ShortDebugString());
+
+ RetryState = nullptr;
+
+ // Successful init. Send StartDirectReadPartitionSession requests.
+ for (auto& [_, partitionSession] : PartitionSessions) {
+ SendStartRequestImpl(partitionSession);
+ }
+}
+
+void TDirectReadSession::OnReadDoneImpl(Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionResponse&& response, TDeferredActions<false>&) {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Got StartDirectReadPartitionSessionResponse " << response.ShortDebugString());
+
+ auto partitionSessionId = response.partition_session_id();
+
+ auto it = PartitionSessions.find(partitionSessionId);
+ if (it->second.Location.GetGeneration() != response.generation()) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Got StartDirectReadPartitionSessionResponse for wrong generation "
+ << "(expected " << it->second.Location.GetGeneration()
+ << ", got " << response.generation() << ") partition_session_id=" << partitionSessionId);
+ return;
+ }
+
+ if (it == PartitionSessions.end()) {
+ // We could get a StopPartitionSessionRequest from server before processing this response.
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Got StartDirectReadPartitionSessionResponse for unknown partition session " << partitionSessionId);
+ return;
+ }
+
+ auto& partitionSession = it->second;
+
+ auto transitioned = partitionSession.TransitionTo(TDirectReadPartitionSession::EState::WORKING);
+ Y_ABORT_UNLESS(transitioned);
+}
+
+void TDirectReadSession::OnReadDoneImpl(Ydb::Topic::StreamDirectReadMessage::StopDirectReadPartitionSession&& response, TDeferredActions<false>&) {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
+ // We ignore the message and simply log it. Then wait for an UpdatePartitionSession event.
+
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Got StopDirectReadPartitionSession " << response.ShortDebugString());
+}
+
+void TDirectReadSession::OnReadDoneImpl(Ydb::Topic::StreamDirectReadMessage::DirectReadResponse&& response, TDeferredActions<false>&) {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
+ auto partitionSessionId = response.partition_session_id();
+ Y_ABORT_UNLESS(partitionSessionId == response.partition_data().partition_session_id());
+
+ auto it = PartitionSessions.find(partitionSessionId);
+
+ if (it == PartitionSessions.end()) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Got a DirectReadResponse for a partition session that we don't know about partition_session_id=" << partitionSessionId << ". Ignoring.");
+ return;
+ }
+
+ auto& partitionSession = it->second;
+
+ // LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Waiting for NextDirectReadId=" << partitionSession.NextDirectReadId << ". Got DirectReadResponse " << response.ShortDebugString());
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Waiting for NextDirectReadId=" << partitionSession.NextDirectReadId << ". Got DirectReadResponse " << response.direct_read_id());
+
+ auto directReadId = response.direct_read_id();
+
+ if (directReadId < partitionSession.NextDirectReadId) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Got a DirectReadResponse with direct_read_id=" << directReadId
+ << ", but we are waiting for direct_read_id=" << partitionSession.NextDirectReadId);
+ return;
+ }
+
+ Y_ABORT_UNLESS(directReadId == partitionSession.NextDirectReadId);
+
+ ++partitionSession.NextDirectReadId;
+
+ IncomingMessagesForControlSession->Enqueue(std::move(response));
+
+ // ControlCallbacks->OnDirectReadDone(std::move(response), deferred);
+
+ // If here we get a DirectReadResponse(direct_read_id) and after that the control session receives
+ // a StopPartitionSession command with the same direct_read_id, we need to stop it from the control session.
+}
+
+void TDirectReadSession::OnReadDoneImpl(Ydb::Topic::UpdateTokenResponse&& response, TDeferredActions<false>&) {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Got UpdateTokenResponse " << response.ShortDebugString());
+}
+
+void TDirectReadSession::WriteToProcessorImpl(TDirectReadClientMessage&& req) {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
+ if (Processor) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "XXXXX subsession send message = " << req.ShortDebugString());
+ Processor->Write(std::move(req));
+ }
+}
+
+void TDirectReadSession::ReadFromProcessorImpl(TDeferredActions<false>& deferred) {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
+ if (State >= EState::CLOSING) {
+ return;
+ }
+
+ if (Processor) {
+ ServerMessage->Clear();
+
+ Y_ABORT_UNLESS(this->SelfContext);
+
+ auto callback = [cbContext = this->SelfContext,
+ // Capture message & processor not to read in freed memory.
+ serverMessage = ServerMessage,
+ connectionGeneration = ConnectionGeneration,
+ processor = Processor](NYdbGrpc::TGrpcStatus&& grpcStatus) {
+ bool cancelContext = false;
+ TDeferredActions<false> deferred;
+ if (auto s = cbContext->LockShared()) {
+ s->OnReadDone(std::move(grpcStatus), connectionGeneration, deferred);
+ if (s->State == EState::CLOSED) {
+ cancelContext = true;
+ }
+ }
+ if (cancelContext) {
+ cbContext->Cancel();
+ }
+ };
+
+ deferred.DeferReadFromProcessor(Processor, ServerMessage.get(), std::move(callback));
+ }
+}
+
+TStringBuilder TDirectReadSession::GetLogPrefix() const {
+ return TStringBuilder() << static_cast<const void*>(this) << " TDirectReadSession ServerSessionId=" << ServerSessionId << " NodeId=" << NodeId << " ";
+}
+
+void TDirectReadSession::InitImpl(TDeferredActions<false>& deferred) {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
+ Y_ABORT_UNLESS(State == EState::CONNECTED);
+ State = EState::INITIALIZING;
+
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session");
+
+ TDirectReadClientMessage req;
+ auto& init = *req.mutable_init_request();
+ init.set_session_id(ServerSessionId);
+ init.set_consumer(ReadSessionSettings.ConsumerName_);
+
+ for (const TTopicReadSettings& topic : ReadSessionSettings.Topics_) {
+ auto* topicSettings = init.add_topics_read_settings();
+ topicSettings->set_path(topic.Path_);
+ }
+
+ WriteToProcessorImpl(std::move(req));
+ ReadFromProcessorImpl(deferred);
+}
+
+void TDirectReadSession::OnConnectTimeout(
+ const NYdbGrpc::IQueueClientContextPtr& connectTimeoutContext
+) {
+ Y_UNUSED(connectTimeoutContext);
+}
+
+void TDirectReadSession::OnConnect(
+ TPlainStatus&& status,
+ IDirectReadProcessor::TPtr&& connection,
+ const NYdbGrpc::IQueueClientContextPtr& connectContext
+) {
+ TDeferredActions<false> deferred;
+ with_lock (Lock) {
+ if (ConnectContext != connectContext) {
+ return;
+ }
+
+ ::NYdb::NTopic::Cancel(ConnectTimeoutContext);
+ ConnectContext = nullptr;
+ ConnectTimeoutContext = nullptr;
+ ConnectDelayContext = nullptr;
+
+ if (State >= EState::CLOSING) {
+ return;
+ }
+
+ if (status.Ok()) {
+ State = EState::CONNECTED;
+ Processor = std::move(connection);
+ ConnectionAttemptsDone = 0;
+ InitImpl(deferred);
+ return;
+ }
+ }
+
+ if (!status.Ok()) {
+ ReadSessionSettings.Counters_->Errors->Inc();
+ if (!Reconnect(status)) {
+ with_lock (Lock) {
+ AbortImpl(TPlainStatus(
+ status.Status,
+ MakeIssueWithSubIssues(
+ TStringBuilder() << "Failed to establish connection to server \"" << status.Endpoint << "\". Attempts done: " << ConnectionAttemptsDone,
+ status.Issues)));
+ }
+ }
+ }
+}
+
+bool TDirectReadSession::Reconnect(const TPlainStatus& status) {
+ // TODO(qyryq) Are concurrent calls possible here?
+
+ TDuration delay = TDuration::Zero();
+
+ // Previous operations contexts.
+ NYdbGrpc::IQueueClientContextPtr prevConnectContext;
+ NYdbGrpc::IQueueClientContextPtr prevConnectTimeoutContext;
+ NYdbGrpc::IQueueClientContextPtr prevConnectDelayContext;
+
+ // Callbacks
+ std::function<void(TPlainStatus&&, IDirectReadProcessor::TPtr&&)> connectCallback;
+ std::function<void(bool)> connectTimeoutCallback;
+
+ if (!status.Ok()) {
+ LOG_LAZY(Log, TLOG_ERR, GetLogPrefix() << "Got error. Status: " << status.Status
+ << ". Description: " << IssuesSingleLineString(status.Issues));
+ }
+
+ NYdbGrpc::IQueueClientContextPtr connectContext = nullptr;
+ NYdbGrpc::IQueueClientContextPtr connectTimeoutContext = nullptr;
+ NYdbGrpc::IQueueClientContextPtr connectDelayContext = nullptr;
+
+ with_lock (Lock) {
+ if (State >= EState::CLOSING) {
+ return false;
+ }
+
+ connectContext = ClientContext->CreateContext();
+ connectTimeoutContext = ClientContext->CreateContext();
+ if (!connectContext || !connectTimeoutContext) {
+ return false;
+ }
+
+ State = EState::CONNECTING;
+ for (auto& [_, partitionSession] : PartitionSessions) {
+ if (partitionSession.State != TDirectReadPartitionSession::EState::DELAYED) {
+ bool transitioned = partitionSession.TransitionTo(TDirectReadPartitionSession::EState::IDLE);
+ Y_ABORT_UNLESS(transitioned);
+ }
+ }
+
+ if (Processor) {
+ Processor->Cancel();
+ }
+
+ Processor = nullptr;
+ // TODO(qyryq) WaitingReadResponse = false;
+ ServerMessage = std::make_shared<TDirectReadServerMessage>();
+ ++ConnectionGeneration;
+
+ if (!status.Ok()) {
+ if (!RetryState) {
+ RetryState = ReadSessionSettings.RetryPolicy_->CreateRetryState();
+ }
+ if (status.Status == EStatus::BAD_REQUEST) {
+ LOG_LAZY(Log, TLOG_EMERG, GetLogPrefix() << "Got BAD_REQUEST, replace it with OVERLOADED");
+ }
+ TMaybe<TDuration> nextDelay = RetryState->GetNextRetryDelay(status.Status == EStatus::BAD_REQUEST ? EStatus::OVERLOADED : status.Status);
+ if (!nextDelay) {
+ return false;
+ }
+ delay = *nextDelay;
+ connectDelayContext = ClientContext->CreateContext();
+ if (!connectDelayContext) {
+ return false;
+ }
+ }
+
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Reconnecting direct read session to node " << NodeId << " in " << delay);
+
+ ++ConnectionAttemptsDone;
+
+ // Set new context
+ prevConnectContext = std::exchange(ConnectContext, connectContext);
+ prevConnectTimeoutContext = std::exchange(ConnectTimeoutContext, connectTimeoutContext);
+ prevConnectDelayContext = std::exchange(ConnectDelayContext, connectDelayContext);
+
+ Y_ASSERT(ConnectContext);
+ Y_ASSERT(ConnectTimeoutContext);
+ Y_ASSERT((delay == TDuration::Zero()) == !ConnectDelayContext);
+ Y_ABORT_UNLESS(this->SelfContext);
+
+ connectCallback =
+ [cbContext = this->SelfContext, connectContext]
+ (TPlainStatus&& st, IDirectReadProcessor::TPtr&& connection) {
+ if (auto self = cbContext->LockShared()) {
+ self->OnConnect(std::move(st), std::move(connection), connectContext);
+ }
+ };
+
+ connectTimeoutCallback =
+ [cbContext = this->SelfContext, connectTimeoutContext](bool ok) {
+ if (ok) {
+ if (auto self = cbContext->LockShared()) {
+ self->OnConnectTimeout(connectTimeoutContext);
+ }
+ }
+ };
+ }
+
+ // Cancel previous operations.
+ ::NYdb::NTopic::Cancel(prevConnectContext);
+ ::NYdb::NTopic::Cancel(prevConnectTimeoutContext);
+ ::NYdb::NTopic::Cancel(prevConnectDelayContext);
+
+ Y_ASSERT(connectContext);
+ Y_ASSERT(connectTimeoutContext);
+ Y_ASSERT((delay == TDuration::Zero()) == !connectDelayContext);
+ ProcessorFactory->CreateProcessor(
+ std::move(connectCallback),
+ TRpcRequestSettings::Make(ReadSessionSettings, TEndpointKey(NodeId)),
+ std::move(connectContext),
+ TDuration::Seconds(30) /* connect timeout */, // TODO: make connect timeout setting.
+ std::move(connectTimeoutContext),
+ std::move(connectTimeoutCallback),
+ delay,
+ std::move(connectDelayContext));
+ return true;
+}
+
+}
diff --git a/ydb/public/sdk/cpp/src/client/topic/impl/direct_reader.h b/ydb/public/sdk/cpp/src/client/topic/impl/direct_reader.h
new file mode 100644
index 00000000000..3bb18cf33da
--- /dev/null
+++ b/ydb/public/sdk/cpp/src/client/topic/impl/direct_reader.h
@@ -0,0 +1,265 @@
+#pragma once
+
+#include "common.h"
+
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/read_session.h>
+#include <ydb/public/sdk/cpp/src/client/topic/common/callback_context.h>
+
+#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h>
+
+#include <util/thread/lfqueue.h>
+
+
+namespace NYdb::NTopic {
+
+template <bool UseMigrationProtocol>
+class TDeferredActions;
+
+template <bool UseMigrationProtocol>
+class TSingleClusterReadSessionImpl;
+
+using TSingleClusterReadSessionContextPtr = std::shared_ptr<TCallbackContext<TSingleClusterReadSessionImpl<false>>>;
+
+using TNodeId = std::int32_t;
+using TGeneration = std::int64_t;
+using TPartitionId = std::int64_t;
+using TPartitionSessionId = std::uint64_t;
+using TReadSessionId = std::string;
+using TDirectReadId = std::int64_t;
+
+using TDirectReadServerMessage = Ydb::Topic::StreamDirectReadMessage::FromServer;
+using TDirectReadClientMessage = Ydb::Topic::StreamDirectReadMessage::FromClient;
+using IDirectReadProcessorFactory = ISessionConnectionProcessorFactory<TDirectReadClientMessage, TDirectReadServerMessage>;
+using IDirectReadProcessorFactoryPtr = std::shared_ptr<IDirectReadProcessorFactory>;
+using IDirectReadProcessor = IDirectReadProcessorFactory::IProcessor;
+
+class TDirectReadSession;
+using TDirectReadSessionContextPtr = std::shared_ptr<TCallbackContext<TDirectReadSession>>;
+
+struct IDirectReadSessionControlCallbacks {
+ using TPtr = std::shared_ptr<IDirectReadSessionControlCallbacks>;
+
+ virtual ~IDirectReadSessionControlCallbacks() {}
+ // virtual void OnDirectReadDone(Ydb::Topic::StreamDirectReadMessage::DirectReadResponse&&, TDeferredActions<false>&) {}
+ virtual void OnDirectReadDone(std::shared_ptr<TLockFreeQueue<Ydb::Topic::StreamDirectReadMessage::DirectReadResponse>>) {}
+ virtual void AbortSession(TSessionClosedEvent&&) {}
+ virtual void ScheduleCallback(TDuration, std::function<void()>) {}
+ virtual void ScheduleCallback(TDuration, std::function<void()>, TDeferredActions<false>&) {}
+
+ virtual void StopPartitionSession(TPartitionSessionId) {}
+};
+
+class TDirectReadSessionControlCallbacks : public IDirectReadSessionControlCallbacks {
+public:
+
+ TDirectReadSessionControlCallbacks(TSingleClusterReadSessionContextPtr contextPtr);
+ // void OnDirectReadDone(Ydb::Topic::StreamDirectReadMessage::DirectReadResponse&& response, TDeferredActions<false>&) override;
+ void OnDirectReadDone(std::shared_ptr<TLockFreeQueue<Ydb::Topic::StreamDirectReadMessage::DirectReadResponse>>) override;
+ void AbortSession(TSessionClosedEvent&& closeEvent) override;
+ void ScheduleCallback(TDuration delay, std::function<void()> callback) override;
+ void ScheduleCallback(TDuration delay, std::function<void()> callback, TDeferredActions<false>&) override;
+ void StopPartitionSession(TPartitionSessionId) override;
+
+private:
+
+ TSingleClusterReadSessionContextPtr SingleClusterReadSessionContextPtr;
+};
+
+class TDirectReadPartitionSession {
+public:
+ enum class EState {
+ IDLE, // The partition session has just been created. RetryState is empty.
+ DELAYED, // Got an error, SendStartRequestImpl will be called later
+ STARTING, // Sent StartDirectReadPartitionSessionRequest, waiting for response
+ WORKING // Got StartDirectReadPartitionSessionResponse
+
+ // See all possible transitions in TDirectReadPartitionSession::TransitionTo.
+ };
+
+ TPartitionSessionId PartitionSessionId;
+ TPartitionId PartitionId;
+ TPartitionLocation Location;
+ EState State = EState::IDLE;
+ IRetryPolicy::IRetryState::TPtr RetryState = {};
+
+ // The ID of the direct-read batch we want to read next.
+ TDirectReadId NextDirectReadId = 1;
+
+ // If the control session sends StopPartitionSessionRequest(graceful=true, last_direct_read_id),
+ // we need to remember the Id, read up to it, and then kill the partition session (and its direct session if it becomes empty).
+ std::optional<TDirectReadId> LastDirectReadId = std::nullopt;
+
+ TDirectReadClientMessage MakeStartRequest() const;
+ bool TransitionTo(EState);
+};
+
+namespace NTests { class TDirectReadSessionImplTestSetup; }
+
+// One TDirectReadSession instance comprises multiple TDirectReadPartitionSessions.
+// It wraps a gRPC connection to a particular node, where the partition sessions live.
+class TDirectReadSession : public TEnableSelfContext<TDirectReadSession> {
+ friend class NTests::TDirectReadSessionImplTestSetup;
+public:
+ using TSelf = TDirectReadSession;
+ using TPtr = std::shared_ptr<TSelf>;
+
+ TDirectReadSession(
+ TNodeId,
+ TReadSessionId,
+ const NYdb::NTopic::TReadSessionSettings,
+ IDirectReadSessionControlCallbacks::TPtr,
+ NYdbGrpc::IQueueClientContextPtr,
+ const IDirectReadProcessorFactoryPtr,
+ TLog
+ );
+
+ void Start();
+ void Close();
+ bool Empty() const;
+ bool Closed() const;
+
+ struct TDirectReadIds {
+ TDirectReadId NextDirectReadId;
+ std::optional<TDirectReadId> LastDirectReadId;
+ };
+ TDirectReadIds GetDirectReadIds(TPartitionSessionId) const;
+
+ void AddPartitionSession(TDirectReadPartitionSession&&);
+ void UpdatePartitionSessionGeneration(TPartitionSessionId, TPartitionLocation);
+ void SetLastDirectReadId(TPartitionSessionId, TDirectReadId);
+ void DeletePartitionSession(TPartitionSessionId);
+ void DeletePartitionSessionIfNeeded(TPartitionSessionId);
+
+private:
+
+ bool Reconnect(
+ const TPlainStatus& status
+ // TGeneration generation
+ );
+
+ void InitImpl(TDeferredActions<false>&);
+ void CloseImpl();
+
+ void WriteToProcessorImpl(TDirectReadClientMessage&& req);
+ void ReadFromProcessorImpl(TDeferredActions<false>&);
+ void OnReadDone(NYdbGrpc::TGrpcStatus&&, size_t connectionGeneration, TDeferredActions<false>&);
+
+ void OnReadDoneImpl(Ydb::Topic::StreamDirectReadMessage::InitResponse&&, TDeferredActions<false>&);
+ void OnReadDoneImpl(Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionResponse&&, TDeferredActions<false>&);
+ void OnReadDoneImpl(Ydb::Topic::StreamDirectReadMessage::DirectReadResponse&&, TDeferredActions<false>&);
+ void OnReadDoneImpl(Ydb::Topic::StreamDirectReadMessage::StopDirectReadPartitionSession&&, TDeferredActions<false>&);
+ void OnReadDoneImpl(Ydb::Topic::UpdateTokenResponse&&, TDeferredActions<false>&);
+
+ void OnConnect(
+ TPlainStatus&& st,
+ IDirectReadProcessor::TPtr&& processor,
+ const NYdbGrpc::IQueueClientContextPtr& connectContext
+ );
+
+ void OnConnectTimeout(
+ const NYdbGrpc::IQueueClientContextPtr& connectTimeoutContext
+ );
+
+ // delayedCall may be true only if the method is called from a scheduled callback.
+ void SendStartRequestImpl(TPartitionSessionId, bool delayedCall = false);
+ void SendStartRequestImpl(TDirectReadPartitionSession&, bool delayedCall = false);
+ void DelayStartRequestImpl(TDirectReadPartitionSession&, TPlainStatus&&, TDeferredActions<false>&);
+
+ void DeletePartitionSessionImpl(TPartitionSessionId);
+
+ void AbortImpl(TPlainStatus&&);
+
+ TStringBuilder GetLogPrefix() const;
+
+private:
+
+ enum class EState {
+ CREATED,
+ CONNECTING,
+ CONNECTED,
+ INITIALIZING,
+
+ WORKING,
+
+ CLOSING,
+ CLOSED
+ };
+
+private:
+ mutable TAdaptiveLock Lock;
+
+ NYdbGrpc::IQueueClientContextPtr ClientContext;
+ NYdbGrpc::IQueueClientContextPtr ConnectContext;
+ NYdbGrpc::IQueueClientContextPtr ConnectTimeoutContext;
+ NYdbGrpc::IQueueClientContextPtr ConnectDelayContext;
+ size_t ConnectionGeneration = 0;
+
+ const NYdb::NTopic::TReadSessionSettings ReadSessionSettings;
+ const TReadSessionId ServerSessionId;
+ const IDirectReadProcessorFactoryPtr ProcessorFactory;
+ const TNodeId NodeId;
+
+ std::shared_ptr<TLockFreeQueue<Ydb::Topic::StreamDirectReadMessage::DirectReadResponse>> IncomingMessagesForControlSession;
+
+ IDirectReadSessionControlCallbacks::TPtr ControlCallbacks;
+ IDirectReadProcessor::TPtr Processor;
+ std::shared_ptr<TDirectReadServerMessage> ServerMessage;
+ std::unordered_map<TPartitionSessionId, TDirectReadPartitionSession> PartitionSessions;
+ IRetryPolicy::IRetryState::TPtr RetryState = {};
+ size_t ConnectionAttemptsDone = 0;
+ EState State;
+
+ TLog Log;
+};
+
+
+// TDirectReadSessionManager is NOT thread-safe. Its methods must be used under a lock.
+class TDirectReadSessionManager {
+public:
+ using TSelf = TDirectReadSessionManager;
+ using TPtr = std::shared_ptr<TSelf>;
+
+ TDirectReadSessionManager(
+ TReadSessionId,
+ const NYdb::NTopic::TReadSessionSettings,
+ IDirectReadSessionControlCallbacks::TPtr,
+ NYdbGrpc::IQueueClientContextPtr,
+ IDirectReadProcessorFactoryPtr,
+ TLog
+ );
+
+ ~TDirectReadSessionManager();
+
+ void StartPartitionSession(TDirectReadPartitionSession&&);
+ void UpdatePartitionSession(TPartitionSessionId, TPartitionId, TPartitionLocation);
+ TDirectReadSessionContextPtr ErasePartitionSession(TPartitionSessionId);
+ void StopPartitionSession(TPartitionSessionId);
+
+ // Update LastDirectReadId in the partition session object.
+ // It will be used later to decide if we need to stop the partition session.
+ bool StopPartitionSessionGracefully(TPartitionSessionId, TDirectReadId lastDirectReadId);
+
+ void Close();
+
+private:
+
+ using TNodeSessionsMap = TMap<TNodeId, TDirectReadSessionContextPtr>;
+
+ TDirectReadSessionContextPtr CreateDirectReadSession(TNodeId);
+ void DeletePartitionSession(TPartitionSessionId id, TNodeSessionsMap::iterator it);
+
+ TStringBuilder GetLogPrefix() const;
+
+private:
+ const NYdb::NTopic::TReadSessionSettings ReadSessionSettings;
+ const TReadSessionId ServerSessionId;
+ const NYdbGrpc::IQueueClientContextPtr ClientContext;
+ const IDirectReadProcessorFactoryPtr ProcessorFactory;
+
+ IDirectReadSessionControlCallbacks::TPtr ControlCallbacks;
+ TNodeSessionsMap NodeSessions;
+ TMap<TPartitionSessionId, TPartitionLocation> Locations;
+ TLog Log;
+};
+
+}
diff --git a/ydb/public/sdk/cpp/src/client/topic/impl/read_session.cpp b/ydb/public/sdk/cpp/src/client/topic/impl/read_session.cpp
index 2ad4e46ff82..3f84d60ac6d 100644
--- a/ydb/public/sdk/cpp/src/client/topic/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/src/client/topic/impl/read_session.cpp
@@ -88,13 +88,16 @@ void TReadSession::CreateClusterSessionsImpl(TDeferredActions<false>& deferred)
Settings,
DbDriverState->Database,
SessionId,
- "",
+ "", // clusterName parameter is used by ydb_persqueue_public only
Log,
Client->CreateReadSessionConnectionProcessorFactory(),
EventsQueue,
context,
- 1,
- 1
+ 1, 1, // partitionStreamIdStart, partitionStreamIdStep parameters are used by ydb_persqueue_public only
+ [this](TDuration delay, std::function<void(bool)> cb, NYdbGrpc::IQueueClientContextPtr) {
+ Connections->ScheduleCallback(delay, cb);
+ },
+ Client->CreateDirectReadSessionConnectionProcessorFactory()
);
deferred.DeferStartSession(CbContext);
diff --git a/ydb/public/sdk/cpp/src/client/topic/impl/read_session_impl.h b/ydb/public/sdk/cpp/src/client/topic/impl/read_session_impl.h
index ca34c1bd271..3df3701ffc9 100644
--- a/ydb/public/sdk/cpp/src/client/topic/impl/read_session_impl.h
+++ b/ydb/public/sdk/cpp/src/client/topic/impl/read_session_impl.h
@@ -8,6 +8,7 @@
#include "counters_logger.h"
#include "offsets_collector.h"
#include "transaction.h"
+#include "direct_reader.h"
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/read_session.h>
#include <ydb/public/sdk/cpp/src/client/persqueue_public/include/read_session.h>
@@ -144,6 +145,11 @@ public:
DoActions();
}
+ // TODO(qyryq) Extract a separate TDeferredDirectReadActions class?
+ void DeferReadFromProcessor(const typename IDirectReadProcessor::TPtr& processor, TDirectReadServerMessage* dst, typename IDirectReadProcessor::TReadCallback callback);
+ void DeferScheduleCallback(TDuration delay, std::function<void(bool)> callback, TSingleClusterReadSessionContextPtr);
+ void DeferCallback(std::function<void()> callback);
+
void DeferReadFromProcessor(const typename IProcessor<UseMigrationProtocol>::TPtr& processor, TServerMessage<UseMigrationProtocol>* dst, typename IProcessor<UseMigrationProtocol>::TReadCallback callback);
void DeferStartExecutorTask(const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, typename IAExecutor<UseMigrationProtocol>::TFunction&& task);
void DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent);
@@ -159,6 +165,9 @@ private:
void DoActions();
void Read();
+ void DirectRead();
+ void DirectReadScheduleCallback();
+ void DirectReadCallback();
void StartExecutorTasks();
void AbortSession();
void Reconnect();
@@ -171,6 +180,26 @@ private:
TServerMessage<UseMigrationProtocol>* ReadDst = nullptr;
typename IProcessor<UseMigrationProtocol>::TReadCallback ReadCallback;
+ // Direct read.
+ struct TDirectReadDeferredActions {
+ struct TRead {
+ IDirectReadProcessor::TPtr Processor;
+ TDirectReadServerMessage* ServerMessage = nullptr;
+ IDirectReadProcessor::TReadCallback ReadCallback;
+ };
+
+ std::optional<TRead> Read;
+
+ struct TScheduledCallback {
+ std::function<void(bool)> Callback;
+ TDuration Delay;
+ TSingleClusterReadSessionContextPtr ContextPtr;
+ };
+
+ std::optional<TScheduledCallback> ScheduledCallback;
+ std::optional<std::function<void()>> Callback;
+ } DirectReadActions;
+
// Executor tasks.
std::vector<std::pair<typename IAExecutor<UseMigrationProtocol>::TPtr, typename IAExecutor<UseMigrationProtocol>::TFunction>> ExecutorsTasks;
@@ -609,8 +638,9 @@ public:
i64 partitionId,
i64 assignId,
i64 readOffset,
+ std::optional<TPartitionLocation> location,
TCallbackContextPtr<UseMigrationProtocol> cbContext)
- : Key{topicPath, "", static_cast<ui64>(partitionId)}
+ : Key{.Topic = topicPath, .Cluster = "", .Partition = static_cast<ui64>(partitionId)}
, AssignId(static_cast<ui64>(assignId))
, FirstNotReadOffset(static_cast<ui64>(readOffset))
, CbContext(std::move(cbContext))
@@ -620,6 +650,7 @@ public:
TAPartitionStream<false>::TopicPath = std::move(topicPath);
TAPartitionStream<false>::ReadSessionId = std::move(readSessionId);
TAPartitionStream<false>::PartitionId = static_cast<ui64>(partitionId);
+ TAPartitionStream<false>::Location = location;
MaxCommittedOffset = static_cast<ui64>(readOffset);
}
@@ -633,6 +664,31 @@ public:
FirstNotReadOffset = offset;
}
+ template <bool V = UseMigrationProtocol, class = std::enable_if_t<!V>>
+ std::optional<TPartitionLocation> GetLocation() const {
+ return TAPartitionStream<false>::Location;
+ }
+
+ template <bool V = UseMigrationProtocol, class = std::enable_if_t<!V>>
+ std::optional<NTopic::TDirectReadId> GetLastDirectReadId() const {
+ return TAPartitionStream<false>::LastDirectReadId;
+ }
+
+ template <bool V = UseMigrationProtocol, class = std::enable_if_t<!V>>
+ NTopic::TDirectReadId GetNextDirectReadId() const {
+ return TAPartitionStream<false>::NextDirectReadId;
+ }
+
+ template <bool V = UseMigrationProtocol, class = std::enable_if_t<!V>>
+ void SetNextDirectReadId(const i64 id) {
+ TAPartitionStream<false>::NextDirectReadId = id;
+ }
+
+ template <bool V = UseMigrationProtocol, class = std::enable_if_t<!V>>
+ void SetLastDirectReadId(const i64 id) {
+ TAPartitionStream<false>::LastDirectReadId = id;
+ }
+
void Commit(ui64 startOffset, ui64 endOffset) /*override*/;
void RequestStatus() override;
@@ -759,6 +815,10 @@ public:
std::vector<typename TADataReceivedEvent<UseMigrationProtocol>::TCompressedMessage>& compressedMessages,
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator);
+ void SetLocation(TPartitionLocation location) {
+ TAPartitionStream<false>::Location = location;
+ }
+
std::mutex& GetLock() {
return Lock;
}
@@ -1063,10 +1123,13 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
public:
using TSelf = TSingleClusterReadSessionImpl<UseMigrationProtocol>;
using TPtr = std::shared_ptr<TSelf>;
- using IProcessor = typename IReadSessionConnectionProcessorFactory<UseMigrationProtocol>::IProcessor;
-
+ using IProcessorFactory = IReadSessionConnectionProcessorFactory<UseMigrationProtocol>;
+ using IProcessorFactoryPtr = std::shared_ptr<IProcessorFactory>;
+ using IProcessor = typename IProcessorFactory::IProcessor;
+ using TScheduleCallbackFunc = std::function<void(TDuration, std::function<void(bool)>, NYdbGrpc::IQueueClientContextPtr)>;
friend class TPartitionStreamImpl<UseMigrationProtocol>;
+ friend class TDirectReadSessionControlCallbacks;
TSingleClusterReadSessionImpl(
const TAReadSessionSettings<UseMigrationProtocol>& settings,
@@ -1078,7 +1141,9 @@ public:
std::shared_ptr<TReadSessionEventsQueue<UseMigrationProtocol>> eventsQueue,
NYdbGrpc::IQueueClientContextPtr clientContext,
ui64 partitionStreamIdStart,
- ui64 partitionStreamIdStep
+ ui64 partitionStreamIdStep,
+ TScheduleCallbackFunc scheduleCallbackFunc = {},
+ IDirectReadProcessorFactoryPtr directReadProcessorFactory = {}
)
: Settings(settings)
, Database(database)
@@ -1093,6 +1158,8 @@ public:
, CookieMapping()
, ReadSizeBudget(GetCompressedDataSizeLimit())
, ReadSizeServerDelta(0)
+ , ScheduleCallbackFunc(scheduleCallbackFunc)
+ , DirectReadProcessorFactory(std::move(directReadProcessorFactory))
{
}
@@ -1148,6 +1215,8 @@ public:
void DumpStatisticsToLog(TLogElement& log);
void UpdateMemoryUsageStatistics();
+ void ScheduleCallback(TDuration timeout, std::function<void(bool)> callback);
+
TStringBuilder GetLogPrefix() const;
const TLog& GetLog() const {
@@ -1200,6 +1269,16 @@ private:
void WriteToProcessorImpl(TClientMessage<UseMigrationProtocol>&& req); // Assumes that we're under lock.
void OnReadDone(NYdbGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration);
+ // Direct Read
+ bool IsDirectRead();
+
+ // TODO(qyryq) Is it possible to revert back to the approach without TLockFreeQueue?
+ // void OnDirectReadDone(Ydb::Topic::StreamDirectReadMessage::DirectReadResponse&&, TDeferredActions<false>&);
+ void OnDirectReadDone(std::shared_ptr<TLockFreeQueue<Ydb::Topic::StreamDirectReadMessage::DirectReadResponse>>); //, TDeferredActions<false>&);
+
+ void StopPartitionSession(TPartitionSessionId);
+ void StopPartitionSessionImpl(TIntrusivePtr<TPartitionStreamImpl<false>>, bool graceful, TDeferredActions<false>&);
+
// Assumes that we're under lock.
template<typename TMessage>
inline void OnReadDoneImpl(TMessage&& msg, TDeferredActions<UseMigrationProtocol>& deferred);
@@ -1339,7 +1418,9 @@ private:
TLog Log;
ui64 NextPartitionStreamId;
ui64 PartitionStreamIdStep;
- std::shared_ptr<IReadSessionConnectionProcessorFactory<UseMigrationProtocol>> ConnectionFactory;
+
+ IProcessorFactoryPtr ConnectionFactory;
+
std::shared_ptr<TReadSessionEventsQueue<UseMigrationProtocol>> EventsQueue;
NYdbGrpc::IQueueClientContextPtr ClientContext; // Common client context.
NYdbGrpc::IQueueClientContextPtr ConnectContext;
@@ -1360,7 +1441,8 @@ private:
bool WaitingReadResponse = false;
std::shared_ptr<TServerMessage<UseMigrationProtocol>> ServerMessage; // Server message to write server response to.
std::unordered_map<ui64, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>> PartitionStreams; // assignId -> Partition stream.
- TPartitionCookieMapping CookieMapping;
+ std::optional<TDirectReadSessionManager> DirectReadSessionManager; // Only for ydb_topic
+ TPartitionCookieMapping CookieMapping; // Only for ydb_persqueue
std::deque<TDecompressionQueueItem> DecompressionQueue;
bool DataReadingSuspended = false;
@@ -1380,6 +1462,11 @@ private:
std::unordered_map<ui32, std::vector<TParentInfo>> HierarchyData;
std::unordered_set<ui64> ReadingFinishedData;
+ // Currently needed only for scheduling callbacks in direct read sessions
+ // to retry sending StartDirectReadPartitionSession requests after temporary errors.
+ TScheduleCallbackFunc ScheduleCallbackFunc;
+ IDirectReadProcessorFactoryPtr DirectReadProcessorFactory;
+
TTransactionMap Txs;
};
diff --git a/ydb/public/sdk/cpp/src/client/topic/impl/read_session_impl.ipp b/ydb/public/sdk/cpp/src/client/topic/impl/read_session_impl.ipp
index 04734efa059..4ba274479f2 100644
--- a/ydb/public/sdk/cpp/src/client/topic/impl/read_session_impl.ipp
+++ b/ydb/public/sdk/cpp/src/client/topic/impl/read_session_impl.ipp
@@ -4,6 +4,10 @@
#include "read_session_impl.h"
#undef INCLUDE_READ_SESSION_IMPL_H
+#include "direct_reader.h"
+
+// #include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h>
+
#include <ydb/public/sdk/cpp/src/client/topic/common/log_lazy.h>
#define INCLUDE_YDB_INTERNAL_H
@@ -27,6 +31,7 @@
namespace NYdb::inline Dev::NTopic {
static const bool RangesMode = !std::string{std::getenv("PQ_OFFSET_RANGES_MODE") ? std::getenv("PQ_OFFSET_RANGES_MODE") : ""}.empty();
+static const bool ExperimentalDirectRead = !std::string{std::getenv("PQ_EXPERIMENTAL_DIRECT_READ") ? std::getenv("PQ_EXPERIMENTAL_DIRECT_READ") : ""}.empty();
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -334,6 +339,13 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain
++ConnectionAttemptsDone;
+ if constexpr (!UseMigrationProtocol) {
+ if (DirectReadSessionManager) {
+ DirectReadSessionManager->Close();
+ DirectReadSessionManager.reset();
+ }
+ }
+
// Set new context
prevConnectContext = std::exchange(ConnectContext, connectContext);
prevConnectTimeoutContext = std::exchange(ConnectTimeoutContext, connectTimeoutContext);
@@ -499,14 +511,23 @@ inline void TSingleClusterReadSessionImpl<true>::InitImpl(TDeferredActions<true>
}
template<>
+inline bool TSingleClusterReadSessionImpl<false>::IsDirectRead() {
+ // TODO(qyryq) Replace this return with the next one when direct read is ready for production.
+ return ExperimentalDirectRead;
+ // return Settings.DirectRead_;
+}
+
+template<>
inline void TSingleClusterReadSessionImpl<false>::InitImpl(TDeferredActions<false>& deferred) {
Y_ABORT_UNLESS(Lock.IsLocked());
LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session");
+
TClientMessage<false> req;
auto& init = *req.mutable_init_request();
init.set_consumer(TStringType{Settings.ConsumerName_});
init.set_auto_partitioning_support(Settings.AutoPartitioningSupport_);
+ init.set_direct_read(IsDirectRead());
for (const TTopicReadSettings& topic : Settings.Topics_) {
auto* topicSettings = init.add_topics_read_settings();
@@ -593,9 +614,8 @@ std::string GetCluster(const TPartitionStreamImpl<UseMigrationProtocol>* partiti
template<bool UseMigrationProtocol>
bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::IsActualPartitionStreamImpl(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream) {
Y_ABORT_UNLESS(Lock.IsLocked());
- auto actualPartitionStreamIt = PartitionStreams.find(partitionStream->GetAssignId());
- return actualPartitionStreamIt != PartitionStreams.end()
- && GetPartitionStreamId(actualPartitionStreamIt->second.Get()) == GetPartitionStreamId(partitionStream);
+ auto it = PartitionStreams.find(partitionStream->GetAssignId());
+ return it != PartitionStreams.end() && GetPartitionStreamId(it->second.Get()) == GetPartitionStreamId(partitionStream);
}
template<bool UseMigrationProtocol>
@@ -636,18 +656,33 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream
if (commitOffset) {
startRead.set_commit_offset(*commitOffset);
}
+ WriteToProcessorImpl(std::move(req));
} else {
+ auto partitionSessionId = partitionStream->GetAssignId();
auto& startRead = *req.mutable_start_partition_session_response();
- startRead.set_partition_session_id(partitionStream->GetAssignId());
+ startRead.set_partition_session_id(partitionSessionId);
if (readOffset) {
startRead.set_read_offset(*readOffset);
}
if (commitOffset) {
startRead.set_commit_offset(*commitOffset);
}
- }
- WriteToProcessorImpl(std::move(req));
+ WriteToProcessorImpl(std::move(req));
+
+ if (IsDirectRead()) {
+ Y_ABORT_UNLESS(DirectReadSessionManager);
+
+ auto location = partitionStream->GetLocation();
+ Y_ABORT_UNLESS(location);
+
+ DirectReadSessionManager->StartPartitionSession({
+ .PartitionSessionId = static_cast<TPartitionSessionId>(partitionSessionId),
+ .PartitionId = static_cast<TPartitionId>(partitionStream->GetPartitionId()),
+ .Location = *location,
+ });
+ }
+ }
}
template<bool UseMigrationProtocol>
@@ -705,6 +740,9 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream
} else {
auto& released = *req.mutable_stop_partition_session_response();
released.set_partition_session_id(partitionStream->GetAssignId());
+
+ // TODO(qyryq) Client must pass graceful value unchanged from the StopPartitionSessionRequest.
+ released.set_graceful(true);
}
WriteToProcessorImpl(std::move(req));
@@ -877,6 +915,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnReadDone(NYdbGrpc::T
if (connectionGeneration != ConnectionGeneration) {
return; // Message from previous connection. Ignore.
}
+
if (errorStatus.Ok()) {
if (IsErrorMessage(*ServerMessage)) {
errorStatus = MakeErrorFromProto(*ServerMessage);
@@ -1216,26 +1255,9 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
template <>
template <>
inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
- Ydb::Topic::StreamReadMessage::InitResponse&& msg,
- TDeferredActions<false>& deferred) {
-
- Y_ABORT_UNLESS(Lock.IsLocked());
- Y_UNUSED(deferred);
-
- RetryState = nullptr;
- ReadSessionId = msg.session_id();
-
- LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id());
-
- // Successful init. Do nothing.
- ContinueReadingDataImpl();
-}
-
-template <>
-template <>
-inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
Ydb::Topic::StreamReadMessage::ReadResponse&& msg,
- TDeferredActions<false>& deferred) {
+ TDeferredActions<false>& deferred
+) {
Y_ABORT_UNLESS(Lock.IsLocked());
if (Closing || Aborting) {
@@ -1252,12 +1274,21 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
for (TPartitionData<false>& partitionData : *msg.mutable_partition_data()) {
auto partitionStreamIt = PartitionStreams.find(partitionData.partition_session_id());
if (partitionStreamIt == PartitionStreams.end()) {
- ++*Settings.Counters_->Errors;
- BreakConnectionAndReconnectImpl(EStatus::INTERNAL_ERROR,
- TStringBuilder() << "Got unexpected partition stream data message. "
- << "PartitionSessionId: " << partitionData.partition_session_id(),
- deferred);
- return;
+ if (IsDirectRead()) {
+ // If we have a timeline like the next one, ignore the partition stream data message:
+ // 1. A subsession receives DirectReadResponse.
+ // 2. The control session receives StopPartitionSessionRequest and deletes the partition session from PartitionStreams.
+ // 3. The subsession calls OnDirectReadDone which calls this method.
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Got unexpected partition stream data message. PartitionSessionId: " << partitionData.partition_session_id());
+ continue;
+ } else {
+ ++*Settings.Counters_->Errors;
+ BreakConnectionAndReconnectImpl(EStatus::INTERNAL_ERROR,
+ TStringBuilder() << "Got unexpected partition stream data message. "
+ << "PartitionSessionId: " << partitionData.partition_session_id(),
+ deferred);
+ return;
+ }
}
const TIntrusivePtr<TPartitionStreamImpl<false>>& partitionStream = partitionStreamIt->second;
@@ -1316,42 +1347,200 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
}
template <>
+inline void TSingleClusterReadSessionImpl<false>::StopPartitionSessionImpl(
+ TIntrusivePtr<TPartitionStreamImpl<false>> partitionStream, bool graceful, TDeferredActions<false>& deferred
+) {
+ auto partitionSessionId = partitionStream->GetAssignId();
+
+ if (IsDirectRead()) {
+ Y_ABORT_UNLESS(DirectReadSessionManager);
+ DirectReadSessionManager->StopPartitionSession(partitionSessionId);
+ }
+
+ bool pushRes = true;
+
+ if (graceful) {
+ auto committedOffset = partitionStream->GetMaxCommittedOffset();
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "XXXXX PushEvent 1422 TStopPartitionSessionEvent");
+ pushRes = EventsQueue->PushEvent(
+ partitionStream,
+ // TODO(qyryq) Is it safe to use GetMaxCommittedOffset here instead of StopPartitionSessionRequest.commmitted_offset?
+ TReadSessionEvent::TStopPartitionSessionEvent(std::move(partitionStream), committedOffset),
+ deferred);
+ } else {
+ // partitionStream->ConfirmDestroy();
+ TClientMessage<false> req;
+ auto& released = *req.mutable_stop_partition_session_response();
+ released.set_partition_session_id(partitionStream->GetAssignId());
+ WriteToProcessorImpl(std::move(req));
+ PartitionStreams.erase(partitionSessionId);
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "XXXXX PushEvent 1435 TPartitionSessionClosedEvent");
+ pushRes = EventsQueue->PushEvent(
+ partitionStream,
+ TReadSessionEvent::TPartitionSessionClosedEvent(partitionStream, TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost),
+ deferred);
+ }
+
+ if (!pushRes) {
+ AbortImpl();
+ }
+}
+
+template <>
+inline void TSingleClusterReadSessionImpl<false>::OnDirectReadDone(
+ std::shared_ptr<TLockFreeQueue<Ydb::Topic::StreamDirectReadMessage::DirectReadResponse>> responses
+ // Ydb::Topic::StreamDirectReadMessage::DirectReadResponse&& response,
+ // TDeferredActions<false>& deferred
+) {
+ TDeferredActions<false> deferred;
+ with_lock (Lock) {
+ Ydb::Topic::StreamDirectReadMessage::DirectReadResponse response;
+ if (!responses->Dequeue(&response)) {
+ return;
+ }
+
+ {
+ // Send DirectReadAck.
+ TClientMessage<false> req;
+ auto& ack = *req.mutable_direct_read_ack();
+ ack.set_direct_read_id(response.direct_read_id());
+ ack.set_partition_session_id(response.partition_session_id());
+ WriteToProcessorImpl(std::move(req));
+ }
+
+ auto partitionStreamIt = PartitionStreams.find(response.partition_session_id());
+ if (partitionStreamIt == PartitionStreams.end()) {
+ LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Got DirectReadResponse for unknown partition session id: " << response.partition_session_id() << ".");
+ return;
+ }
+
+ auto& partitionStream = partitionStreamIt->second;
+ partitionStream->SetNextDirectReadId(response.direct_read_id() + 1);
+
+ auto stopPartitionSession = [&](){
+ // After we get a StopPartitionSessionRequest(graceful=true), LastDirectReadId is defined.
+ // In this case we're waiting for the DirectReadResponse(direct_read_id=LastDirectReadId) and then close the subsession.
+
+ auto lastId = partitionStream->GetLastDirectReadId();
+ if (lastId && lastId <= response.direct_read_id() + 1) {
+ this->StopPartitionSessionImpl(partitionStream, true, deferred);
+ }
+ };
+
+ if (!response.has_partition_data() || response.partition_data().batches_size() == 0) {
+ // Sometimes the server might send an empty DirectReadResponse with a non-zero bytes_size, that we should take into account.
+ stopPartitionSession();
+ ReadSizeBudget += response.bytes_size();
+ ReadSizeServerDelta -= response.bytes_size();
+ WaitingReadResponse = false;
+ ContinueReadingDataImpl();
+ return;
+ }
+
+ Ydb::Topic::StreamReadMessage::ReadResponse r;
+ r.set_bytes_size(response.bytes_size());
+ auto* data = r.add_partition_data();
+ data->Swap(response.mutable_partition_data());
+ OnReadDoneImpl(std::move(r), deferred);
+ stopPartitionSession();
+ }
+}
+
+template <>
+inline void TSingleClusterReadSessionImpl<false>::ScheduleCallback(TDuration timeout, std::function<void(bool)> callback) {
+ // TODO(qyryq) Pass context ptr?
+ ScheduleCallbackFunc(timeout, callback, nullptr);
+}
+
+template <>
+inline void TSingleClusterReadSessionImpl<false>::StopPartitionSession(TPartitionSessionId partitionSessionId) {
+ TDeferredActions<false> deferred;
+ with_lock (Lock) {
+ auto partitionStreamIt = PartitionStreams.find(partitionSessionId);
+ if (partitionStreamIt == PartitionStreams.end()) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Wanted to stop partition stream id=" << partitionSessionId
+ << ", but no such id was found");
+ return;
+ }
+ StopPartitionSessionImpl(partitionStreamIt->second, /* graceful= */ true, deferred);
+ }
+}
+
+template <>
+template <>
+inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
+ Ydb::Topic::StreamReadMessage::InitResponse&& msg,
+ TDeferredActions<false>&
+) {
+ Y_ABORT_UNLESS(Lock.IsLocked());
+
+ RetryState = nullptr;
+ ReadSessionId = msg.session_id();
+
+ LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Got InitResponse. ReadSessionId: " << ReadSessionId);
+
+ if (IsDirectRead()) {
+ Y_ABORT_UNLESS(!DirectReadSessionManager);
+ DirectReadSessionManager.emplace(
+ ReadSessionId,
+ Settings,
+ std::make_shared<TDirectReadSessionControlCallbacks>(this->SelfContext),
+ ClientContext->CreateContext(),
+ DirectReadProcessorFactory,
+ Log
+ );
+ }
+
+ ContinueReadingDataImpl();
+}
+
+template <>
template <>
inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
Ydb::Topic::StreamReadMessage::StartPartitionSessionRequest&& msg,
- TDeferredActions<false>& deferred) {
+ TDeferredActions<false>& deferred
+) {
Y_ABORT_UNLESS(Lock.IsLocked());
- auto partitionStream = MakeIntrusive<TPartitionStreamImpl<false>>(
- NextPartitionStreamId,
- msg.partition_session().path(),
- ReadSessionId,
- msg.partition_session().partition_id(),
- msg.partition_session().partition_session_id(),
- msg.committed_offset(),
- SelfContext);
- NextPartitionStreamId += PartitionStreamIdStep;
+ // For DirectRead the message MUST have partition location.
+ Y_ABORT_UNLESS(!IsDirectRead() || msg.has_partition_location());
+
+ auto partitionSessionId = msg.partition_session().partition_session_id();
// Renew partition stream.
- TIntrusivePtr<TPartitionStreamImpl<false>>& currentPartitionStream = PartitionStreams[partitionStream->GetAssignId()];
- if (currentPartitionStream) {
+ TIntrusivePtr<TPartitionStreamImpl<false>>& partitionStream = PartitionStreams[partitionSessionId];
+ if (partitionStream) {
bool pushRes = EventsQueue->PushEvent(
- currentPartitionStream,
- TReadSessionEvent::TPartitionSessionClosedEvent(
- currentPartitionStream, TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost),
+ partitionStream,
+ TReadSessionEvent::TPartitionSessionClosedEvent(
+ partitionStream, TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost),
deferred);
+
if (!pushRes) {
AbortImpl();
return;
}
}
- currentPartitionStream = partitionStream;
+
+ partitionStream = MakeIntrusive<TPartitionStreamImpl<false>>(
+ NextPartitionStreamId,
+ msg.partition_session().path(),
+ ReadSessionId,
+ msg.partition_session().partition_id(),
+ partitionSessionId,
+ msg.committed_offset(),
+ msg.has_partition_location() ? std::optional<TPartitionLocation>(msg.partition_location()) : std::nullopt,
+ SelfContext);
+
+ NextPartitionStreamId += PartitionStreamIdStep;
// Send event to user.
- bool pushRes = EventsQueue->PushEvent(partitionStream,
- TReadSessionEvent::TStartPartitionSessionEvent(
- partitionStream, msg.committed_offset(), msg.partition_offsets().end()),
- deferred);
+ bool pushRes = EventsQueue->PushEvent(
+ partitionStream,
+ TReadSessionEvent::TStartPartitionSessionEvent(
+ partitionStream, msg.committed_offset(), msg.partition_offsets().end()),
+ deferred);
+
if (!pushRes) {
AbortImpl();
return;
@@ -1362,53 +1551,80 @@ template <>
template <>
inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
Ydb::Topic::StreamReadMessage::UpdatePartitionSession&& msg,
- TDeferredActions<false>& deferred) {
+ TDeferredActions<false>&
+) {
Y_ABORT_UNLESS(Lock.IsLocked());
- Y_UNUSED(deferred);
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "UpdatePartitionSession " << msg.DebugString());
- auto partitionStreamIt = PartitionStreams.find(msg.partition_session_id());
- if (partitionStreamIt == PartitionStreams.end()) {
+ auto partitionSessionId = msg.partition_session_id();
+ auto it = PartitionStreams.find(partitionSessionId);
+ if (it == PartitionStreams.end()) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Wanted to update partition_session_id: " << partitionSessionId
+ << ", but no such id was found");
return;
}
- //TODO: update generation/nodeid info
+
+ Y_ABORT_UNLESS(it->second->GetAssignId() == static_cast<unsigned long>(partitionSessionId));
+
+ // TODO(qyryq) Do we need to store generation/nodeid info in TSingleClusterReadSessionImpl?
+ if (IsDirectRead()) {
+ Y_ABORT_UNLESS(DirectReadSessionManager);
+ it->second->SetLocation(msg.partition_location());
+ DirectReadSessionManager->UpdatePartitionSession(
+ partitionSessionId,
+ static_cast<TPartitionId>(it->second->GetPartitionId()),
+ msg.partition_location()
+ );
+ }
}
template <>
template <>
inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
Ydb::Topic::StreamReadMessage::StopPartitionSessionRequest&& msg,
- TDeferredActions<false>& deferred) {
+ TDeferredActions<false>& deferred
+) {
Y_ABORT_UNLESS(Lock.IsLocked());
- auto partitionStreamIt = PartitionStreams.find(msg.partition_session_id());
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "StopPartitionSessionRequest " << msg.DebugString());
+
+ auto partitionSessionId = msg.partition_session_id();
+
+ auto partitionStreamIt = PartitionStreams.find(partitionSessionId);
if (partitionStreamIt == PartitionStreams.end()) {
+ LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Server wants us to stop partition session id=" << partitionSessionId
+ << ", but it's not found");
return;
}
+
TIntrusivePtr<TPartitionStreamImpl<false>> partitionStream = partitionStreamIt->second;
- bool pushRes = true;
- if (!msg.graceful()) {
- PartitionStreams.erase(msg.partition_session_id());
- pushRes = EventsQueue->PushEvent(partitionStream,
- TReadSessionEvent::TPartitionSessionClosedEvent(
- partitionStream, TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost),
- deferred);
- } else {
- pushRes = EventsQueue->PushEvent(
- partitionStream,
- TReadSessionEvent::TStopPartitionSessionEvent(std::move(partitionStream), msg.committed_offset()),
- deferred);
- }
- if (!pushRes) {
- AbortImpl();
+
+ if (IsDirectRead() && msg.graceful()) {
+ // Keep reading DirectReadResponses until we get the one with direct_read_id == last_direct_read_id.
+ // Only then we send the TStopPartitionSessionEvent to the user.
+
+ partitionStream->SetLastDirectReadId(msg.last_direct_read_id());
+
+ if (msg.last_direct_read_id() <= partitionStream->GetNextDirectReadId()) {
+ // There are two cases when we need to call StopPartitionSessionImpl:
+ // 1. We have received the last DirectReadResponse.
+ // 2. We have received the StopPartitionSessionRequest(graceful=true) after we received a corresponding DirectReadResponse.
+ // This is the second case.
+ StopPartitionSessionImpl(partitionStreamIt->second, true, deferred);
+ }
+
return;
}
+
+ StopPartitionSessionImpl(partitionStreamIt->second, msg.graceful(), deferred);
}
template <>
template <>
inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
Ydb::Topic::StreamReadMessage::EndPartitionSession&& msg,
- TDeferredActions<false>& deferred) {
+ TDeferredActions<false>& deferred
+) {
Y_ABORT_UNLESS(Lock.IsLocked());
auto partitionStreamIt = PartitionStreams.find(msg.partition_session_id());
@@ -1431,6 +1647,11 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
partitionStream->GetPartitionSessionId());
}
+ if (IsDirectRead()) {
+ Y_ABORT_UNLESS(DirectReadSessionManager);
+ DirectReadSessionManager->StopPartitionSession(msg.partition_session_id());
+ }
+
bool pushRes = EventsQueue->PushEvent(
partitionStream,
TReadSessionEvent::TEndPartitionSessionEvent(std::move(partitionStream), std::move(adjacentPartitionIds), std::move(childPartitionIds)),
@@ -1652,6 +1873,13 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::AbortImpl() {
if (Processor) {
Processor->Cancel();
}
+
+ if constexpr (!UseMigrationProtocol) {
+ if (DirectReadSessionManager) {
+ DirectReadSessionManager->Close();
+ DirectReadSessionManager.reset();
+ }
+ }
}
}
@@ -2796,13 +3024,6 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
}
i64 minOffset = Max<i64>();
i64 maxOffset = 0;
- const i64 partition_id = [parent](){
- if constexpr (UseMigrationProtocol) {
- return parent->ServerMessage.partition();
- } else {
- return parent->ServerMessage.partition_session_id();
- }
- }();
i64 dataProcessed = 0;
size_t messagesProcessed = 0;
for (const TMessageRange& messages : Messages) {
@@ -2848,11 +3069,20 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
}
}
}
+
if (auto session = parent->CbContext->LockShared()) {
+ const i64 partition_id = [parent](){
+ if constexpr (UseMigrationProtocol) {
+ return parent->ServerMessage.partition();
+ } else {
+ return parent->ServerMessage.partition_session_id();
+ }
+ }();
LOG_LAZY(session->GetLog(), TLOG_DEBUG, TStringBuilder() << "Decompression task done. Partition/PartitionSessionId: "
<< partition_id << " (" << minOffset << "-"
<< maxOffset << ")");
}
+
Y_ASSERT(dataProcessed == SourceDataSize);
parent->OnDataDecompressed(SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed);
@@ -2895,6 +3125,36 @@ void TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>::OnUserRetrievedE
// TDeferredActions
template<bool UseMigrationProtocol>
+void TDeferredActions<UseMigrationProtocol>::DeferReadFromProcessor(
+ const IDirectReadProcessor::TPtr& processor,
+ TDirectReadServerMessage* dst,
+ IDirectReadProcessor::TReadCallback callback
+) {
+ Y_ASSERT(!DirectReadActions.Read);
+ DirectReadActions.Read = {
+ .Processor = processor,
+ .ServerMessage = dst,
+ .ReadCallback = std::move(callback),
+ };
+}
+
+template<bool UseMigrationProtocol>
+void TDeferredActions<UseMigrationProtocol>::DeferScheduleCallback(TDuration delay, std::function<void(bool)> callback, TSingleClusterReadSessionContextPtr contextPtr) {
+ Y_ASSERT(!DirectReadActions.ScheduledCallback);
+ DirectReadActions.ScheduledCallback = {
+ .Callback = std::move(callback),
+ .Delay = delay,
+ .ContextPtr = contextPtr,
+ };
+}
+
+template<bool UseMigrationProtocol>
+void TDeferredActions<UseMigrationProtocol>::DeferCallback(std::function<void()> callback) {
+ Y_ASSERT(!DirectReadActions.Callback);
+ DirectReadActions.Callback = std::move(callback);
+}
+
+template<bool UseMigrationProtocol>
void TDeferredActions<UseMigrationProtocol>::DeferReadFromProcessor(const typename IProcessor<UseMigrationProtocol>::TPtr& processor,
TServerMessage<UseMigrationProtocol>* dst,
typename IProcessor<UseMigrationProtocol>::TReadCallback callback)
@@ -2960,6 +3220,9 @@ void TDeferredActions<UseMigrationProtocol>::DeferDestroyDecompressionInfos(std:
template<bool UseMigrationProtocol>
void TDeferredActions<UseMigrationProtocol>::DoActions() {
Read();
+ DirectRead();
+ DirectReadScheduleCallback();
+ DirectReadCallback();
StartExecutorTasks();
AbortSession();
Reconnect();
@@ -2986,6 +3249,36 @@ void TDeferredActions<UseMigrationProtocol>::Read() {
}
template<bool UseMigrationProtocol>
+void TDeferredActions<UseMigrationProtocol>::DirectRead() {
+ auto& read = DirectReadActions.Read;
+ if (read) {
+ Y_ASSERT(read->Processor);
+ Y_ASSERT(read->ReadCallback);
+ read->Processor->Read(read->ServerMessage, std::move(read->ReadCallback));
+ }
+}
+
+template<bool UseMigrationProtocol>
+void TDeferredActions<UseMigrationProtocol>::DirectReadScheduleCallback() {
+ auto& scheduled = DirectReadActions.ScheduledCallback;
+ if (scheduled) {
+ Y_ASSERT(scheduled->Callback);
+ Y_ASSERT(scheduled->ContextPtr);
+ if (auto s = scheduled->ContextPtr->LockShared()) {
+ s->ScheduleCallback(scheduled->Delay, scheduled->Callback);
+ }
+ }
+}
+
+template<bool UseMigrationProtocol>
+void TDeferredActions<UseMigrationProtocol>::DirectReadCallback() {
+ auto& callback = DirectReadActions.Callback;
+ if (callback) {
+ (*callback)();
+ }
+}
+
+template<bool UseMigrationProtocol>
void TDeferredActions<UseMigrationProtocol>::StartExecutorTasks() {
for (auto&& [executor, task] : ExecutorsTasks) {
executor->Post(std::move(task));
diff --git a/ydb/public/sdk/cpp/src/client/topic/impl/topic.cpp b/ydb/public/sdk/cpp/src/client/topic/impl/topic.cpp
index 7471c12276f..17097025f08 100644
--- a/ydb/public/sdk/cpp/src/client/topic/impl/topic.cpp
+++ b/ydb/public/sdk/cpp/src/client/topic/impl/topic.cpp
@@ -408,6 +408,12 @@ TPartitionLocation::TPartitionLocation(const Ydb::Topic::PartitionLocation& part
{
}
+TPartitionLocation::TPartitionLocation(std::int32_t nodeId, std::int64_t generation)
+ : NodeId_(nodeId)
+ , Generation_(generation)
+{
+}
+
int32_t TPartitionLocation::GetNodeId() const {
return NodeId_;
}
diff --git a/ydb/public/sdk/cpp/src/client/topic/impl/topic_impl.cpp b/ydb/public/sdk/cpp/src/client/topic/impl/topic_impl.cpp
index a58223414f2..684131630cb 100644
--- a/ydb/public/sdk/cpp/src/client/topic/impl/topic_impl.cpp
+++ b/ydb/public/sdk/cpp/src/client/topic/impl/topic_impl.cpp
@@ -75,4 +75,11 @@ std::shared_ptr<TTopicClient::TImpl::IWriteSessionConnectionProcessorFactory> TT
return CreateConnectionProcessorFactory<TService, TRequest, TResponse>(&TService::Stub::AsyncStreamWrite, Connections_, DbDriverState_);
}
+std::shared_ptr<TTopicClient::TImpl::IDirectReadSessionConnectionProcessorFactory> TTopicClient::TImpl::CreateDirectReadSessionConnectionProcessorFactory() {
+ using TService = Ydb::Topic::V1::TopicService;
+ using TRequest = Ydb::Topic::StreamDirectReadMessage::FromClient;
+ using TResponse = Ydb::Topic::StreamDirectReadMessage::FromServer;
+ return CreateConnectionProcessorFactory<TService, TRequest, TResponse>(&TService::Stub::AsyncStreamDirectRead, Connections_, DbDriverState_);
+}
+
}
diff --git a/ydb/public/sdk/cpp/src/client/topic/impl/topic_impl.h b/ydb/public/sdk/cpp/src/client/topic/impl/topic_impl.h
index 5c2d927ea2f..44d7774264a 100644
--- a/ydb/public/sdk/cpp/src/client/topic/impl/topic_impl.h
+++ b/ydb/public/sdk/cpp/src/client/topic/impl/topic_impl.h
@@ -343,6 +343,12 @@ public:
std::shared_ptr<IWriteSessionConnectionProcessorFactory> CreateWriteSessionConnectionProcessorFactory();
+ using IDirectReadSessionConnectionProcessorFactory =
+ ISessionConnectionProcessorFactory<Ydb::Topic::StreamDirectReadMessage::FromClient,
+ Ydb::Topic::StreamDirectReadMessage::FromServer>;
+
+ std::shared_ptr<IDirectReadSessionConnectionProcessorFactory> CreateDirectReadSessionConnectionProcessorFactory();
+
NYdbGrpc::IQueueClientContextPtr CreateContext() {
return Connections_->CreateContext();
}
diff --git a/ydb/public/sdk/cpp/src/client/topic/impl/ya.make b/ydb/public/sdk/cpp/src/client/topic/impl/ya.make
index 01dcf5631ff..29f2bf1f28f 100644
--- a/ydb/public/sdk/cpp/src/client/topic/impl/ya.make
+++ b/ydb/public/sdk/cpp/src/client/topic/impl/ya.make
@@ -5,6 +5,8 @@ SRCS(
common.cpp
counters_logger.h
deferred_commit.cpp
+ direct_reader.h
+ direct_reader.cpp
event_handlers.cpp
offsets_collector.cpp
proto_accessor.cpp
diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/basic_usage_ut.cpp
index 283fafee492..94ebf6a6775 100644
--- a/ydb/public/sdk/cpp/src/client/topic/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/src/client/topic/ut/basic_usage_ut.cpp
@@ -20,6 +20,10 @@
#include <future>
+
+static const bool EnableDirectRead = !std::string{std::getenv("PQ_EXPERIMENTAL_DIRECT_READ") ? std::getenv("PQ_EXPERIMENTAL_DIRECT_READ") : ""}.empty();
+
+
namespace NYdb::NTopic::NTests {
void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSessionSettings writeSettings, const std::string& message, ui32 count, TTopicSdkTestSetup& setup, TIntrusivePtr<TManagedExecutor> decompressor) {
@@ -96,6 +100,10 @@ void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSess
Y_UNIT_TEST_SUITE(BasicUsage) {
Y_UNIT_TEST(ReadWithoutConsumerWithRestarts) {
+ if (EnableDirectRead) {
+ // TODO(qyryq) Enable the test when LOGBROKER-9364 is done.
+ return;
+ }
TTopicSdkTestSetup setup(TEST_CASE_NAME);
auto compressor = new TSyncExecutor();
auto decompressor = CreateThreadPoolManagedExecutor(1);
@@ -107,7 +115,9 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
.WithoutConsumer()
.MaxMemoryUsageBytes(1_MB)
.DecompressionExecutor(decompressor)
- .AppendTopics(topic);
+ .AppendTopics(topic)
+ // .DirectRead(EnableDirectRead)
+ ;
TWriteSessionSettings writeSettings;
writeSettings
@@ -133,7 +143,9 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
.ConsumerName(TEST_CONSUMER)
.MaxMemoryUsageBytes(1_MB)
.DecompressionExecutor(decompressor)
- .AppendTopics(TEST_TOPIC);
+ .AppendTopics(TEST_TOPIC)
+ // .DirectRead(EnableDirectRead)
+ ;
TWriteSessionSettings writeSettings;
writeSettings
@@ -186,6 +198,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
UNIT_ASSERT_VALUES_EQUAL(stats->GetEndOffset(), count);
}
+
} // Y_UNIT_TEST_SUITE(BasicUsage)
} // namespace
diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/direct_read_ut.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/direct_read_ut.cpp
new file mode 100644
index 00000000000..f5154b24ab4
--- /dev/null
+++ b/ydb/public/sdk/cpp/src/client/topic/ut/direct_read_ut.cpp
@@ -0,0 +1,2224 @@
+#include "ut_utils/topic_sdk_test_setup.h"
+#include <ydb/public/sdk/cpp/src/client/persqueue_public/ut/ut_utils/ut_utils.h>
+
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
+
+#include <ydb/public/sdk/cpp/src/client/persqueue_public/persqueue.h>
+
+#include <ydb/public/sdk/cpp/src/client/topic/impl/common.h>
+#include <ydb/public/sdk/cpp/src/client/topic/common/executor_impl.h>
+#include <ydb/public/sdk/cpp/src/client/persqueue_public/impl/write_session.h>
+#include <ydb/public/sdk/cpp/src/client/topic/impl/write_session.h>
+
+#include <library/cpp/retry/retry_policy.h>
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/testing/unittest/tests_data.h>
+#include <library/cpp/threading/future/future.h>
+#include <library/cpp/threading/future/async.h>
+
+#include <library/cpp/testing/gmock_in_unittest/gmock.h>
+#include <algorithm>
+#include <future>
+
+using namespace ::testing; // Google mock.
+
+
+#define UNIT_ASSERT_EVENT_TYPE(event, type) \
+ UNIT_ASSERT_C( \
+ std::holds_alternative<type>(event), \
+ "Real event got: " << DebugString(event)) \
+ /**/
+
+#define UNIT_ASSERT_NOT_EVENT_TYPE(event, type) \
+ UNIT_ASSERT_C( \
+ !std::holds_alternative<type>(event), \
+ "Real event got: " << DebugString(event)) \
+ /**/
+
+
+namespace NYdb::NTopic::NTests {
+
+namespace {
+ const char* SERVER_SESSION_ID = "server-session-id-1";
+}
+
+
+template <class TRequest, class TResponse>
+struct TMockProcessorFactory : public ISessionConnectionProcessorFactory<TRequest, TResponse> {
+ using IFactory = ISessionConnectionProcessorFactory<TRequest, TResponse>;
+
+ virtual ~TMockProcessorFactory() {
+ Wait();
+ }
+
+ void CreateProcessor( // ISessionConnectionProcessorFactory method.
+ typename IFactory::TConnectedCallback callback,
+ const TRpcRequestSettings& requestSettings,
+ NYdbGrpc::IQueueClientContextPtr connectContext,
+ TDuration connectTimeout,
+ NYdbGrpc::IQueueClientContextPtr connectTimeoutContext,
+ typename IFactory::TConnectTimeoutCallback connectTimeoutCallback,
+ TDuration connectDelay,
+ NYdbGrpc::IQueueClientContextPtr connectDelayOperationContext) override
+ {
+ UNIT_ASSERT_C(!ConnectedCallback, "Only one connect at a time is expected");
+ UNIT_ASSERT_C(!ConnectTimeoutCallback, "Only one connect at a time is expected");
+ ConnectedCallback = callback;
+ ConnectTimeoutCallback = connectTimeoutCallback;
+
+ Y_UNUSED(requestSettings);
+ // TODO Check requestSettings.PreferredEndpoint.GetNodeId()?
+ UNIT_ASSERT(connectContext);
+ UNIT_ASSERT(connectTimeout);
+ UNIT_ASSERT(connectTimeoutContext);
+ UNIT_ASSERT(connectTimeoutCallback);
+ UNIT_ASSERT(!connectDelay || connectDelayOperationContext);
+
+ OnCreateProcessor(++CreateCallsCount);
+ }
+
+ // Handler is called in CreateProcessor() method after parameter validation.
+ MOCK_METHOD(void, OnCreateProcessor, (size_t callNumber)); // 1-based
+
+ // Actions to use in OnCreateProcessor handler:
+ void CreateProcessor(typename IFactory::IProcessor::TPtr processor) { // Success.
+ UNIT_ASSERT(ConnectedCallback);
+ auto cb = std::move(ConnectedCallback);
+ ConnectedCallback = nullptr;
+ ConnectTimeoutCallback = nullptr;
+ with_lock (Lock) {
+ CallbackFutures.push(std::async(std::launch::async, std::move(cb), TPlainStatus(), processor));
+ }
+ }
+
+ void FailCreation(EStatus status = EStatus::INTERNAL_ERROR, const TString& message = {}) { // Fail.
+ UNIT_ASSERT(ConnectedCallback);
+ auto cb = std::move(ConnectedCallback);
+ ConnectedCallback = nullptr;
+ ConnectTimeoutCallback = nullptr;
+ with_lock (Lock) {
+ CallbackFutures.push(std::async(std::launch::async, std::move(cb), TPlainStatus(status, message), nullptr));
+ }
+ }
+
+ void Timeout() { // Timeout.
+ UNIT_ASSERT(ConnectTimeoutCallback);
+ auto cb = std::move(ConnectTimeoutCallback);
+ ConnectedCallback = nullptr;
+ ConnectTimeoutCallback = nullptr;
+ with_lock (Lock) {
+ CallbackFutures.push(std::async(std::launch::async, std::move(cb), true));
+ }
+ }
+
+ void CreateAndThenTimeout(typename IFactory::IProcessor::TPtr processor) {
+ UNIT_ASSERT(ConnectedCallback);
+ UNIT_ASSERT(ConnectTimeoutCallback);
+ auto cb2 = [cbt = std::move(ConnectTimeoutCallback), cb = std::move(ConnectedCallback), processor]() mutable {
+ cb(TPlainStatus(), std::move(processor));
+ cbt(true);
+ };
+ ConnectedCallback = nullptr;
+ ConnectTimeoutCallback = nullptr;
+ with_lock (Lock) {
+ CallbackFutures.push(std::async(std::launch::async, std::move(cb2)));
+ }
+ }
+
+ void FailAndThenTimeout(EStatus status = EStatus::INTERNAL_ERROR, const TString& message = {}) {
+ UNIT_ASSERT(ConnectedCallback);
+ UNIT_ASSERT(ConnectTimeoutCallback);
+ auto cb2 = [cbt = std::move(ConnectTimeoutCallback), cb = std::move(ConnectedCallback), status, message]() mutable {
+ cb(TPlainStatus(status, message), nullptr);
+ cbt(true);
+ };
+ ConnectedCallback = nullptr;
+ ConnectTimeoutCallback = nullptr;
+ with_lock (Lock) {
+ CallbackFutures.push(std::async(std::launch::async, std::move(cb2)));
+ }
+ }
+
+ void TimeoutAndThenCreate(typename IFactory::IProcessor::TPtr processor) {
+ UNIT_ASSERT(ConnectedCallback);
+ UNIT_ASSERT(ConnectTimeoutCallback);
+ auto cb2 = [cbt = std::move(ConnectTimeoutCallback), cb = std::move(ConnectedCallback), processor]() mutable {
+ cbt(true);
+ cb(TPlainStatus(), std::move(processor));
+ };
+ ConnectedCallback = nullptr;
+ ConnectTimeoutCallback = nullptr;
+ with_lock (Lock) {
+ CallbackFutures.push(std::async(std::launch::async, std::move(cb2)));
+ }
+ }
+
+ void Wait() {
+ std::queue<std::future<void>> futuresQueue;
+ with_lock (Lock) {
+ CallbackFutures.swap(futuresQueue);
+ }
+ while (!futuresQueue.empty()) {
+ futuresQueue.front().wait();
+ futuresQueue.pop();
+ }
+ }
+
+ void Validate() {
+ UNIT_ASSERT(CallbackFutures.empty());
+ ConnectedCallback = nullptr;
+ ConnectTimeoutCallback = nullptr;
+ }
+
+ std::atomic<size_t> CreateCallsCount = 0;
+
+private:
+ TAdaptiveLock Lock;
+ typename IFactory::TConnectedCallback ConnectedCallback;
+ typename IFactory::TConnectTimeoutCallback ConnectTimeoutCallback;
+ std::queue<std::future<void>> CallbackFutures;
+};
+
+
+struct TStartPartitionSessionRequest {
+ TPartitionId PartitionId;
+ TPartitionSessionId PartitionSessionId;
+ TNodeId NodeId;
+ TGeneration Generation;
+};
+
+struct TStopPartitionSessionRequest {
+ TPartitionSessionId PartitionSessionId;
+ bool Graceful;
+ i64 CommittedOffset;
+ TDirectReadId LastDirectReadId;
+};
+
+
+struct TMockReadSessionProcessor : public TMockProcessorFactory<Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer>::IProcessor {
+ // Request to read.
+ struct TClientReadInfo {
+ TReadCallback Callback;
+ Ydb::Topic::StreamReadMessage::FromServer* Dst;
+
+ operator bool() const {
+ return Dst != nullptr;
+ }
+ };
+
+ // Response from server.
+ struct TServerReadInfo {
+ NYdbGrpc::TGrpcStatus Status;
+ Ydb::Topic::StreamReadMessage::FromServer Response;
+
+ TServerReadInfo& Failure(grpc::StatusCode status = grpc::StatusCode::UNAVAILABLE, const TString& message = {}, bool internal = false) {
+ Status.GRpcStatusCode = status;
+ Status.InternalError = internal;
+ Status.Msg = message;
+ return *this;
+ }
+
+ TServerReadInfo& InitResponse(const TString& sessionId) {
+ Response.mutable_init_response()->set_session_id(sessionId);
+ return *this;
+ }
+
+ TServerReadInfo& StartPartitionSessionRequest(TStartPartitionSessionRequest request) {
+ auto* req = Response.mutable_start_partition_session_request();
+
+ auto* session = req->mutable_partition_session();
+ session->set_partition_session_id(request.PartitionSessionId);
+ session->set_partition_id(request.PartitionId);
+
+ auto* location = req->mutable_partition_location();
+ location->set_node_id(request.NodeId);
+ location->set_generation(request.Generation);
+
+ return *this;
+ }
+
+ TServerReadInfo& StopPartitionSession(TStopPartitionSessionRequest request) {
+ auto* req = Response.mutable_stop_partition_session_request();
+ req->set_partition_session_id(request.PartitionSessionId);
+ req->set_graceful(request.Graceful);
+ req->set_committed_offset(request.CommittedOffset);
+ req->set_last_direct_read_id(request.LastDirectReadId);
+ return *this;
+ }
+
+ };
+
+ ~TMockReadSessionProcessor() {
+ Wait();
+ }
+
+ void Cancel() override {
+ }
+
+ void ReadInitialMetadata(std::unordered_multimap<std::string, std::string>* metadata, TReadCallback callback) override {
+ Y_UNUSED(metadata);
+ Y_UNUSED(callback);
+ UNIT_ASSERT_C(false, "This method is not expected to be called");
+ }
+
+ void Finish(TReadCallback callback) override {
+ Y_UNUSED(callback);
+ UNIT_ASSERT_C(false, "This method is not expected to be called");
+ }
+
+ void AddFinishedCallback(TReadCallback callback) override {
+ Y_UNUSED(callback);
+ UNIT_ASSERT_C(false, "This method is not expected to be called");
+ }
+
+ void Read(Ydb::Topic::StreamReadMessage::FromServer* response, TReadCallback callback) override {
+ with_lock (Lock) {
+ UNIT_ASSERT(!ActiveRead);
+ ActiveRead.Callback = std::move(callback);
+ ActiveRead.Dst = response;
+ if (!ReadResponses.empty()) {
+ StartProcessReadImpl();
+ }
+ }
+ }
+
+ void StartProcessReadImpl() {
+ CallbackFutures.push(std::async(std::launch::async, &TMockReadSessionProcessor::ProcessRead, this));
+ }
+
+ void Write(Ydb::Topic::StreamReadMessage::FromClient&& request, TWriteCallback callback) override {
+ UNIT_ASSERT(!callback); // Read session doesn't set callbacks.
+ using FromClient = Ydb::Topic::StreamReadMessage_FromClient;
+
+ switch (request.client_message_case()) {
+ case FromClient::kInitRequest:
+ OnInitRequest(request.init_request());
+ break;
+ case FromClient::kReadRequest:
+ OnReadRequest(request.read_request());
+ break;
+ case FromClient::kCommitOffsetRequest:
+ OnCommitOffsetRequest(request.commit_offset_request());
+ break;
+ case FromClient::kDirectReadAck:
+ OnDirectReadAck(request.direct_read_ack());
+ break;
+ case FromClient::kStartPartitionSessionResponse:
+ OnStartPartitionSessionResponse(request.start_partition_session_response());
+ break;
+ case FromClient::kStopPartitionSessionResponse:
+ OnStopPartitionSessionResponse(request.stop_partition_session_response());
+ break;
+ case FromClient::CLIENT_MESSAGE_NOT_SET:
+ UNIT_ASSERT_C(false, "Invalid request");
+ break;
+ default:
+ Y_UNREACHABLE();
+ }
+ }
+ MOCK_METHOD(void, OnInitRequest, (const Ydb::Topic::StreamReadMessage::InitRequest&), ());
+ MOCK_METHOD(void, OnReadRequest, (const Ydb::Topic::StreamReadMessage::ReadRequest&), ());
+ MOCK_METHOD(void, OnDirectReadAck, (const Ydb::Topic::StreamReadMessage::DirectReadAck&), ());
+ MOCK_METHOD(void, OnCommitOffsetRequest, (const Ydb::Topic::StreamReadMessage::CommitOffsetRequest&), ());
+ MOCK_METHOD(void, OnStartPartitionSessionResponse, (const Ydb::Topic::StreamReadMessage::StartPartitionSessionResponse&), ());
+ MOCK_METHOD(void, OnStopPartitionSessionResponse, (const Ydb::Topic::StreamReadMessage::StopPartitionSessionResponse&), ());
+
+ void Wait() {
+ std::queue<std::future<void>> callbackFutures;
+ with_lock (Lock) {
+ CallbackFutures.swap(callbackFutures);
+ }
+
+ while (!callbackFutures.empty()) {
+ callbackFutures.front().wait();
+ callbackFutures.pop();
+ }
+ }
+
+ void Validate() {
+ with_lock (Lock) {
+ UNIT_ASSERT(ReadResponses.empty());
+ UNIT_ASSERT(CallbackFutures.empty());
+
+ ActiveRead = TClientReadInfo{};
+ }
+ }
+
+ void ProcessRead() {
+ NYdbGrpc::TGrpcStatus status;
+ TReadCallback callback;
+ with_lock (Lock) {
+ if (ActiveRead) {
+ *ActiveRead.Dst = ReadResponses.front().Response;
+ ActiveRead.Dst = nullptr;
+ status = std::move(ReadResponses.front().Status);
+ ReadResponses.pop();
+ callback = std::move(ActiveRead.Callback);
+ }
+ }
+ if (callback) {
+ callback(std::move(status));
+ }
+ }
+
+ void AddServerResponse(TServerReadInfo result) {
+ NYdbGrpc::TGrpcStatus status;
+ TReadCallback callback;
+ with_lock (Lock) {
+ ReadResponses.emplace(std::move(result));
+ if (ActiveRead) {
+ *ActiveRead.Dst = ReadResponses.front().Response;
+ ActiveRead.Dst = nullptr;
+ status = std::move(ReadResponses.front().Status);
+ ReadResponses.pop();
+ callback = std::move(ActiveRead.Callback);
+ }
+ }
+ if (callback) {
+ callback(std::move(status));
+ }
+ }
+
+ TAdaptiveLock Lock;
+ TClientReadInfo ActiveRead;
+ std::queue<TServerReadInfo> ReadResponses;
+ std::queue<std::future<void>> CallbackFutures;
+};
+
+struct TMockDirectReadSessionProcessor : public TMockProcessorFactory<TDirectReadClientMessage, TDirectReadServerMessage>::IProcessor {
+ // Request to read.
+ struct TClientReadInfo {
+ TReadCallback Callback;
+ TDirectReadServerMessage* Dst;
+
+ operator bool() const {
+ return Dst != nullptr;
+ }
+ };
+
+ // Response from server.
+ struct TServerReadInfo {
+ NYdbGrpc::TGrpcStatus Status;
+ TDirectReadServerMessage Response;
+
+ TServerReadInfo& Failure(grpc::StatusCode status = grpc::StatusCode::UNAVAILABLE, const TString& message = {}, bool internal = false) {
+ Status.GRpcStatusCode = status;
+ Status.InternalError = internal;
+ Status.Msg = message;
+ return *this;
+ }
+
+ TServerReadInfo& InitResponse() {
+ Response.mutable_init_response();
+ return *this;
+ }
+
+ TServerReadInfo& StartDirectReadPartitionSessionResponse(TPartitionSessionId partitionSessionId) {
+ auto* resp = Response.mutable_start_direct_read_partition_session_response();
+ resp->set_partition_session_id(partitionSessionId);
+ return *this;
+ }
+
+ TServerReadInfo& StopDirectReadPartitionSession(Ydb::StatusIds::StatusCode status, TPartitionSessionId partitionSessionId) {
+ auto* req = Response.mutable_stop_direct_read_partition_session();
+ req->set_status(status);
+ req->set_partition_session_id(partitionSessionId);
+ return *this;
+ }
+
+ // Data helpers.
+ TServerReadInfo& PartitionData(TPartitionSessionId partitionSessionId, TDirectReadId directReadId, ui64 bytesSize = 0) {
+ auto* response = Response.mutable_direct_read_response();
+ response->set_partition_session_id(partitionSessionId);
+ response->set_direct_read_id(directReadId);
+ response->set_bytes_size(bytesSize);
+ response->mutable_partition_data()->set_partition_session_id(partitionSessionId);
+ return *this;
+ }
+
+ TServerReadInfo& Batch(
+ const TString& producerId,
+ Ydb::Topic::Codec codec,
+ TInstant writeTimestamp = TInstant::MilliSeconds(42),
+ const std::vector<std::pair<TString, TString>>& writeSessionMeta = {}
+ ) {
+ auto* batch = Response.mutable_direct_read_response()->mutable_partition_data()->add_batches();
+ batch->set_producer_id(producerId);
+ batch->set_codec(codec);
+ *batch->mutable_written_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(writeTimestamp.MilliSeconds());
+ auto* meta = batch->mutable_write_session_meta();
+ for (auto&& [k, v] : writeSessionMeta) {
+ (*meta)[k] = v;
+ }
+ return *this;
+ }
+
+ TServerReadInfo& Message(
+ ui64 offset,
+ const TString& data,
+ ui64 seqNo = 1,
+ TInstant createdAt = TInstant::MilliSeconds(42),
+ i64 uncompressedSize = 135,
+ const TString& messageGroupId = "",
+ const std::vector<std::pair<TString, TString>>& meta = {}
+ ) {
+ const int lastBatch = Response.direct_read_response().partition_data().batches_size();
+ UNIT_ASSERT(lastBatch > 0);
+ auto* batch = Response.mutable_direct_read_response()->mutable_partition_data()->mutable_batches(lastBatch - 1);
+ auto* req = batch->add_message_data();
+ req->set_offset(offset);
+ req->set_seq_no(seqNo);
+ *req->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(createdAt.MilliSeconds());
+ req->set_data(data);
+ req->set_message_group_id(messageGroupId);
+ req->set_uncompressed_size(uncompressedSize);
+ for (auto&& [k, v] : meta) {
+ auto* pair = req->add_metadata_items();
+ pair->set_key(k);
+ pair->set_value(v);
+ }
+ return *this;
+ }
+ };
+
+ virtual ~TMockDirectReadSessionProcessor() {
+ Wait();
+ }
+
+ void Cancel() override {
+ }
+
+ void ReadInitialMetadata(std::unordered_multimap<std::string, std::string>* metadata, TReadCallback callback) override {
+ Y_UNUSED(metadata);
+ Y_UNUSED(callback);
+ UNIT_ASSERT_C(false, "This method is not expected to be called");
+ }
+
+ void Finish(TReadCallback callback) override {
+ Y_UNUSED(callback);
+ UNIT_ASSERT_C(false, "This method is not expected to be called");
+ }
+
+ void AddFinishedCallback(TReadCallback callback) override {
+ Y_UNUSED(callback);
+ UNIT_ASSERT_C(false, "This method is not expected to be called");
+ }
+
+ void Read(TDirectReadServerMessage* response, TReadCallback callback) override {
+ NYdbGrpc::TGrpcStatus status;
+ TReadCallback cb;
+ with_lock (Lock) {
+ Cerr << (TStringBuilder() << "XXXXX Read 1 " << response->DebugString() << "\n");
+ UNIT_ASSERT(!ActiveRead);
+ ActiveRead.Callback = std::move(callback);
+ ActiveRead.Dst = response;
+ if (!ReadResponses.empty()) {
+ Cerr << (TStringBuilder() << "XXXXX Read 2 " << response->DebugString() << "\n");
+ *ActiveRead.Dst = ReadResponses.front().Response;
+ ActiveRead.Dst = nullptr;
+ status = std::move(ReadResponses.front().Status);
+ ReadResponses.pop();
+ cb = std::move(ActiveRead.Callback);
+ }
+ }
+ if (cb) {
+ Cerr << (TStringBuilder() << "XXXXX Read 3 " << response->DebugString() << "\n");
+ cb(std::move(status));
+ }
+ }
+
+ void StartProcessReadImpl() {
+ CallbackFutures.push(std::async(std::launch::async, &TMockDirectReadSessionProcessor::ProcessRead, this));
+ }
+
+ void Write(TDirectReadClientMessage&& request, TWriteCallback callback) override {
+ UNIT_ASSERT(!callback); // Read session doesn't set callbacks.
+ switch (request.client_message_case()) {
+ case TDirectReadClientMessage::kInitRequest:
+ OnInitRequest(request.init_request());
+ break;
+ case TDirectReadClientMessage::kStartDirectReadPartitionSessionRequest:
+ OnStartDirectReadPartitionSessionRequest(request.start_direct_read_partition_session_request());
+ break;
+ case TDirectReadClientMessage::kUpdateTokenRequest:
+ OnUpdateTokenRequest(request.update_token_request());
+ break;
+ case TDirectReadClientMessage::CLIENT_MESSAGE_NOT_SET:
+ UNIT_ASSERT_C(false, "Invalid request");
+ break;
+ }
+ }
+
+ MOCK_METHOD(void, OnInitRequest, (const Ydb::Topic::StreamDirectReadMessage::InitRequest&), ());
+ MOCK_METHOD(void, OnStartDirectReadPartitionSessionRequest, (const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest&), ());
+ MOCK_METHOD(void, OnUpdateTokenRequest, (const Ydb::Topic::UpdateTokenRequest&), ());
+
+ void Wait() {
+ std::queue<std::future<void>> callbackFutures;
+ with_lock (Lock) {
+ CallbackFutures.swap(callbackFutures);
+ }
+
+ while (!callbackFutures.empty()) {
+ callbackFutures.front().wait();
+ callbackFutures.pop();
+ }
+ }
+
+ void Validate() {
+ Cerr << "XXXXX Validate\n";
+ with_lock (Lock) {
+ UNIT_ASSERT(ReadResponses.empty());
+ UNIT_ASSERT(CallbackFutures.empty());
+
+ ActiveRead = TClientReadInfo{};
+ }
+ }
+
+ void ProcessRead() {
+ Cerr << "XXXXX ProcessRead\n";
+ NYdbGrpc::TGrpcStatus status;
+ TReadCallback callback;
+ // GotActiveRead.GetFuture().Wait();
+ with_lock (Lock) {
+ *ActiveRead.Dst = ReadResponses.front().Response;
+ ActiveRead.Dst = nullptr;
+ status = std::move(ReadResponses.front().Status);
+ ReadResponses.pop();
+ callback = std::move(ActiveRead.Callback);
+ }
+ callback(std::move(status));
+ }
+
+ void AddServerResponse(TServerReadInfo result) {
+ NYdbGrpc::TGrpcStatus status;
+ TReadCallback callback;
+ with_lock (Lock) {
+ Cerr << (TStringBuilder() << "XXXXX AddServerResponse 1 " << result.Response.DebugString() << "\n");
+ ReadResponses.emplace(std::move(result));
+ if (ActiveRead) {
+ Cerr << (TStringBuilder() << "XXXXX AddServerResponse 2\n");
+ *ActiveRead.Dst = ReadResponses.front().Response;
+ ActiveRead.Dst = nullptr;
+ status = std::move(ReadResponses.front().Status);
+ ReadResponses.pop();
+ callback = std::move(ActiveRead.Callback);
+ }
+ }
+ if (callback) {
+ Cerr << (TStringBuilder() << "XXXXX AddServerResponse 3\n");
+ callback(std::move(status));
+ }
+ }
+
+ TAdaptiveLock Lock;
+ // NThreading::TPromise<void> GotActiveRead = NThreading::NewPromise();
+ TClientReadInfo ActiveRead;
+ std::queue<TServerReadInfo> ReadResponses;
+ std::queue<std::future<void>> CallbackFutures;
+};
+
+class TMockRetryPolicy : public IRetryPolicy {
+public:
+ MOCK_METHOD(IRetryPolicy::IRetryState::TPtr, CreateRetryState, (), (const, override));
+ TMaybe<TDuration> Delay;
+};
+
+class TMockRetryState : public IRetryPolicy::IRetryState {
+public:
+ TMockRetryState(std::shared_ptr<TMockRetryPolicy> policy)
+ : Policy(policy) {}
+
+ TMaybe<TDuration> GetNextRetryDelay(EStatus) {
+ return Policy->Delay;
+ }
+private:
+ std::shared_ptr<TMockRetryPolicy> Policy;
+};
+
+// Class for testing read session impl with mocks.
+class TDirectReadSessionImplTestSetup {
+public:
+ // Types
+ using IDirectReadSessionConnectionProcessorFactory = ISessionConnectionProcessorFactory<TDirectReadClientMessage, TDirectReadServerMessage>;
+ using TMockDirectReadProcessorFactory = TMockProcessorFactory<TDirectReadClientMessage, TDirectReadServerMessage>;
+ using TMockReadProcessorFactory = TMockProcessorFactory<Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer>;
+
+ struct TFakeContext : public NYdbGrpc::IQueueClientContext {
+ IQueueClientContextPtr CreateContext() override {
+ return std::make_shared<TFakeContext>();
+ }
+
+ grpc::CompletionQueue* CompletionQueue() override {
+ UNIT_ASSERT_C(false, "This method is not expected to be called");
+ return nullptr;
+ }
+
+ bool IsCancelled() const override {
+ UNIT_ASSERT_C(false, "This method is not expected to be called");
+ return false;
+ }
+
+ bool Cancel() override {
+ return false;
+ }
+
+ void SubscribeCancel(std::function<void()>) override {
+ UNIT_ASSERT_C(false, "This method is not expected to be called");
+ }
+ };
+
+ // Methods
+ TDirectReadSessionImplTestSetup();
+ ~TDirectReadSessionImplTestSetup() noexcept(false); // Performs extra validation and UNIT_ASSERTs
+
+ TSingleClusterReadSessionImpl<false>* GetControlSession();
+ TDirectReadSession* GetDirectReadSession(IDirectReadSessionControlCallbacks::TPtr);
+ void WaitForWorkingDirectReadSession();
+
+ std::shared_ptr<TReadSessionEventsQueue<false>> GetEventsQueue();
+ IExecutor::TPtr GetDefaultExecutor();
+
+ void SuccessfulInit(bool flag = true);
+
+ void AddControlResponse(TMockReadSessionProcessor::TServerReadInfo&);
+ void AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo&);
+
+ // Assertions.
+ void AssertNoEvents();
+
+public:
+ // Members
+ TReadSessionSettings ReadSessionSettings;
+ TLog Log = CreateLogBackend("cerr");
+ std::shared_ptr<TReadSessionEventsQueue<false>> EventsQueue;
+ std::shared_ptr<TFakeContext> FakeContext = std::make_shared<TFakeContext>();
+ std::shared_ptr<TMockRetryPolicy> MockRetryPolicy = std::make_shared<TMockRetryPolicy>();
+ std::shared_ptr<TMockReadProcessorFactory> MockReadProcessorFactory = std::make_shared<TMockReadProcessorFactory>();
+ std::shared_ptr<TMockDirectReadProcessorFactory> MockDirectReadProcessorFactory = std::make_shared<TMockDirectReadProcessorFactory>();
+ TIntrusivePtr<TMockReadSessionProcessor> MockReadProcessor = MakeIntrusive<TMockReadSessionProcessor>();
+ TIntrusivePtr<TMockDirectReadSessionProcessor> MockDirectReadProcessor = MakeIntrusive<TMockDirectReadSessionProcessor>();
+
+ TSingleClusterReadSessionImpl<false>::TPtr SingleClusterReadSession;
+ TSingleClusterReadSessionContextPtr SingleClusterReadSessionContextPtr;
+
+ TDirectReadSessionManager::TPtr DirectReadSessionManagerPtr;
+ TDirectReadSession::TPtr DirectReadSessionPtr;
+ TDirectReadSessionContextPtr DirectReadSessionContextPtr;
+
+ std::shared_ptr<TThreadPool> ThreadPool;
+ IExecutor::TPtr DefaultExecutor;
+};
+
+TDirectReadSessionImplTestSetup::TDirectReadSessionImplTestSetup() {
+ ReadSessionSettings
+ // .DirectRead(true)
+ .AppendTopics({"TestTopic"})
+ .ConsumerName("TestConsumer")
+ .RetryPolicy(NYdb::NTopic::IRetryPolicy::GetFixedIntervalPolicy(TDuration::MilliSeconds(10)))
+ .Counters(MakeIntrusive<NYdb::NTopic::TReaderCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>()));
+
+ Log.SetFormatter(GetPrefixLogFormatter(""));
+}
+
+TDirectReadSessionImplTestSetup::~TDirectReadSessionImplTestSetup() noexcept(false) {
+ if (!std::uncaught_exceptions()) { // Exiting from test successfully. Check additional expectations.
+ MockReadProcessorFactory->Wait();
+ MockReadProcessor->Wait();
+
+ MockReadProcessorFactory->Validate();
+ MockReadProcessor->Validate();
+
+ MockDirectReadProcessorFactory->Wait();
+ MockDirectReadProcessor->Wait();
+
+ MockDirectReadProcessorFactory->Validate();
+ MockDirectReadProcessor->Validate();
+ }
+
+ if (SingleClusterReadSessionContextPtr) {
+ if (auto session = SingleClusterReadSessionContextPtr->LockShared()) {
+ session->Close({});
+ }
+ SingleClusterReadSessionContextPtr->Cancel();
+ }
+
+ if (DirectReadSessionContextPtr) {
+ if (auto session = DirectReadSessionContextPtr->LockShared()) {
+ session->Close();
+ }
+ DirectReadSessionContextPtr->Cancel();
+ }
+
+ SingleClusterReadSession = nullptr;
+
+ if (ThreadPool) {
+ ThreadPool->Stop();
+ }
+}
+
+void TDirectReadSessionImplTestSetup::AddControlResponse(TMockReadSessionProcessor::TServerReadInfo& response) {
+ MockReadProcessor->AddServerResponse(response);
+}
+
+void TDirectReadSessionImplTestSetup::AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo& response) {
+ MockDirectReadProcessor->AddServerResponse(response);
+}
+
+void TDirectReadSessionImplTestSetup::SuccessfulInit(bool hasInitRequest) {
+ EXPECT_CALL(*MockReadProcessorFactory, OnCreateProcessor(1))
+ .WillOnce([&](){ MockReadProcessorFactory->CreateProcessor(MockReadProcessor); });
+ if (hasInitRequest) {
+ EXPECT_CALL(*MockReadProcessor, OnInitRequest(_));
+ }
+ AddControlResponse(TMockReadSessionProcessor::TServerReadInfo().InitResponse("session-1"));
+ GetControlSession()->Start();
+ MockReadProcessorFactory->Wait();
+ MockReadProcessor->Wait();
+}
+
+std::shared_ptr<TReadSessionEventsQueue<false>> TDirectReadSessionImplTestSetup::GetEventsQueue() {
+ if (!EventsQueue) {
+ EventsQueue = std::make_shared<TReadSessionEventsQueue<false>>(ReadSessionSettings);
+ }
+ return EventsQueue;
+}
+
+void TDirectReadSessionImplTestSetup::AssertNoEvents() {
+ std::optional<TReadSessionEvent::TEvent> event = GetEventsQueue()->GetEvent(false);
+ UNIT_ASSERT(!event);
+}
+
+IExecutor::TPtr TDirectReadSessionImplTestSetup::GetDefaultExecutor() {
+ if (!DefaultExecutor) {
+ ThreadPool = std::make_shared<TThreadPool>();
+ ThreadPool->Start(1);
+ DefaultExecutor = CreateThreadPoolExecutorAdapter(ThreadPool);
+ }
+ return DefaultExecutor;
+}
+
+TSingleClusterReadSessionImpl<false>* TDirectReadSessionImplTestSetup::GetControlSession() {
+ if (!SingleClusterReadSession) {
+ if (!ReadSessionSettings.DecompressionExecutor_) {
+ ReadSessionSettings.DecompressionExecutor(GetDefaultExecutor());
+ }
+ if (!ReadSessionSettings.EventHandlers_.HandlersExecutor_) {
+ ReadSessionSettings.EventHandlers_.HandlersExecutor(GetDefaultExecutor());
+ }
+ SingleClusterReadSessionContextPtr = MakeWithCallbackContext<TSingleClusterReadSessionImpl<false>>(
+ ReadSessionSettings,
+ "db",
+ "client-session-id-1",
+ "",
+ Log,
+ MockReadProcessorFactory,
+ GetEventsQueue(),
+ FakeContext,
+ 1,
+ 1,
+ TSingleClusterReadSessionImpl<false>::TScheduleCallbackFunc {},
+ MockDirectReadProcessorFactory);
+ SingleClusterReadSession = SingleClusterReadSessionContextPtr->TryGet();
+ }
+ return SingleClusterReadSession.get();
+}
+
+TDirectReadSession* TDirectReadSessionImplTestSetup::GetDirectReadSession(IDirectReadSessionControlCallbacks::TPtr controlCallbacks) {
+ if (!DirectReadSessionPtr) {
+ DirectReadSessionContextPtr = MakeWithCallbackContext<TDirectReadSession>(
+ TNodeId(1),
+ SERVER_SESSION_ID,
+ ReadSessionSettings,
+ controlCallbacks,
+ FakeContext,
+ MockDirectReadProcessorFactory,
+ Log);
+ DirectReadSessionPtr = DirectReadSessionContextPtr->TryGet();
+ }
+ return DirectReadSessionPtr.get();
+}
+
+void TDirectReadSessionImplTestSetup::WaitForWorkingDirectReadSession() {
+ while (DirectReadSessionPtr->State != TDirectReadSession::EState::WORKING) {
+ Sleep(TDuration::MilliSeconds(10));
+ }
+}
+
+class TDirectReadTestsFixture : public NUnitTest::TBaseFixture {
+ void SetUp(NUnitTest::TTestContext&) override {
+ }
+};
+
+Y_UNIT_TEST_SUITE_F(DirectReadWithClient, TDirectReadTestsFixture) {
+
+ /*
+ This suite tests direct read mode only through IReadSession, without using internal classes.
+ */
+
+ Y_UNIT_TEST(OneMessage) {
+ /*
+ The simplest case: write one message and read it back.
+ */
+
+ TTopicSdkTestSetup setup(TEST_CASE_NAME);
+ TTopicClient client = setup.MakeClient();
+
+ {
+ // Write a message:
+
+ auto settings = TWriteSessionSettings()
+ .Path(TEST_TOPIC)
+ .ProducerId(TEST_MESSAGE_GROUP_ID)
+ .MessageGroupId(TEST_MESSAGE_GROUP_ID);
+ auto writer = client.CreateSimpleBlockingWriteSession(settings);
+ UNIT_ASSERT(writer->Write("message"));
+ writer->Close();
+ }
+
+ {
+ // Read the message:
+
+ auto settings = TReadSessionSettings()
+ .ConsumerName(TEST_CONSUMER)
+ .AppendTopics(TEST_TOPIC)
+ // .DirectRead(true)
+ ;
+ auto reader = client.CreateReadSession(settings);
+
+ {
+ // Start partition session:
+ auto event = reader->GetEvent(true);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TStartPartitionSessionEvent);
+ std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event).Confirm();
+ }
+
+ {
+ // Receive the message and commit.
+ auto event = reader->GetEvent(true);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
+ auto& dataReceived = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
+ auto& messages = dataReceived.GetMessages();
+ UNIT_ASSERT_EQUAL(messages.size(), 1);
+ dataReceived.Commit();
+ }
+
+ {
+ // Get commit ack.
+ auto event = reader->GetEvent(true);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TCommitOffsetAcknowledgementEvent);
+ }
+ }
+ }
+
+ Y_UNIT_TEST(ManyMessages) {
+ /*
+ Write many messages and read them back.
+
+ Don't compress messages and set MaxMemoryUsageBytes for the reader to 1MB,
+ so the server sends multiple DirectReadResponses.
+ */
+
+ TTopicSdkTestSetup setup(TEST_CASE_NAME, TTopicSdkTestSetup::MakeServerSettings(), false);
+ constexpr size_t partitionCount = 2;
+ size_t messageCount = 100;
+ size_t totalMessageCount = partitionCount * messageCount;
+ setup.CreateTopic(std::string(TEST_TOPIC), std::string(TEST_CONSUMER), partitionCount);
+ TTopicClient client = setup.MakeClient();
+
+ TString message(950_KB, 'x');
+
+ // Write messages to all partitions:
+ for (size_t partitionId = 0; partitionId < partitionCount; ++partitionId) {
+ auto settings = TWriteSessionSettings()
+ .Path(TEST_TOPIC)
+ .Codec(ECodec::RAW)
+ .PartitionId(partitionId)
+ .ProducerId(TEST_MESSAGE_GROUP_ID)
+ .MessageGroupId(TEST_MESSAGE_GROUP_ID);
+
+ auto writer = client.CreateSimpleBlockingWriteSession(settings);
+ for (size_t i = 0; i < messageCount; ++i) {
+ UNIT_ASSERT(writer->Write(message));
+ }
+ writer->Close();
+ }
+
+ std::atomic<bool> work = true;
+
+ auto killer = std::thread([&]() {
+ while (work.load()) {
+ std::this_thread::sleep_for(std::chrono::seconds(5));
+ // setup.GetServer().KillTopicPqrbTablet(setup.GetTopicPath());
+ }
+ });
+
+ {
+ // Read messages:
+
+ size_t gotMessages = 0;
+ std::array<size_t, partitionCount> committedOffset{};
+ auto settings = TReadSessionSettings()
+ .ConsumerName(TEST_CONSUMER)
+ .AppendTopics(TEST_TOPIC)
+ .MaxMemoryUsageBytes(1_MB)
+ // .DirectRead(GetEnv("DIRECT", "0") == "1")
+ ;
+
+ std::shared_ptr<IReadSession> reader;
+
+ settings.EventHandlers_.SimpleDataHandlers(
+ [&](NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& e) {
+ gotMessages += e.GetMessages().size();
+ Cerr << "XXXXX gotMessages: " << gotMessages << " partition_id: " << e.GetPartitionSession()->GetPartitionId() << "\n";
+ e.Commit();
+ });
+
+ settings.EventHandlers_.CommitOffsetAcknowledgementHandler(
+ [&](NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent& e) {
+ auto partitionId = e.GetPartitionSession()->GetPartitionId();
+ committedOffset[partitionId] = e.GetCommittedOffset();
+ Cerr << "XXXXX committedOffset: ";
+ for (auto offset : committedOffset) {
+ Cerr << offset << " ";
+ }
+ Cerr << Endl;
+ if (std::ranges::all_of(committedOffset, [&](size_t offset) { return offset == messageCount; })) {
+ reader->Close();
+ }
+ });
+
+ reader = client.CreateReadSession(settings);
+
+ reader->GetEvent(/*block = */true);
+
+ UNIT_ASSERT_EQUAL(gotMessages, totalMessageCount);
+ }
+
+ work.store(false);
+ killer.join();
+ }
+} // Y_UNIT_TEST_SUITE_F(DirectReadWithClient)
+
+
+Y_UNIT_TEST_SUITE_F(DirectReadWithControlSession, TDirectReadTestsFixture) {
+
+ /*
+ This suite tests direct read sessions together with a control session.
+ */
+
+ void SuccessfulInitImpl(bool thenTimeout) {
+ TDirectReadSessionImplTestSetup setup;
+ setup.ReadSessionSettings
+ .MaxLag(TDuration::Seconds(32))
+ .ReadFromTimestamp(TInstant::Seconds(42));
+
+ setup.ReadSessionSettings.Topics_[0]
+ .ReadFromTimestamp(TInstant::Seconds(146))
+ .AppendPartitionIds(100)
+ .AppendPartitionIds(101);
+
+ {
+ ::testing::InSequence seq;
+
+ EXPECT_CALL(*setup.MockReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&](){
+ if (thenTimeout) {
+ setup.MockReadProcessorFactory->CreateAndThenTimeout(setup.MockReadProcessor);
+ } else {
+ setup.MockReadProcessorFactory->CreateProcessor(setup.MockReadProcessor);
+ }
+ });
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnInitRequest(_))
+ .WillOnce(Invoke([&setup](const Ydb::Topic::StreamReadMessage::InitRequest& req) {
+ UNIT_ASSERT_STRINGS_EQUAL(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
+ UNIT_ASSERT(req.direct_read());
+ UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings_size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
+ UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings(0).read_from().seconds(), 146);
+ UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings(0).partition_ids_size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings(0).partition_ids(0), 100);
+ UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings(0).partition_ids(1), 101);
+ }));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_));
+ }
+
+ setup.GetControlSession()->Start();
+ setup.MockReadProcessorFactory->Wait();
+
+ setup.AddControlResponse(TMockReadSessionProcessor::TServerReadInfo().InitResponse("session id"));
+
+ setup.AssertNoEvents();
+ }
+
+ Y_UNIT_TEST(Init) {
+ SuccessfulInitImpl(true);
+ SuccessfulInitImpl(false);
+ }
+
+ Y_UNIT_TEST(StopPartitionSessionGracefully) {
+ auto const startPartitionSessionRequest = TStartPartitionSessionRequest{
+ .PartitionId = 1,
+ .PartitionSessionId = 2,
+ .NodeId = 3,
+ .Generation = 4,
+ };
+
+ auto const stopPartitionSessionRequest = TStopPartitionSessionRequest{
+ .PartitionSessionId = 2,
+ .Graceful = true,
+ .CommittedOffset = 0,
+ .LastDirectReadId = 5,
+ };
+
+ TDirectReadSessionImplTestSetup setup;
+ setup.ReadSessionSettings.Topics_[0].AppendPartitionIds(startPartitionSessionRequest.PartitionId);
+
+ {
+ {
+ ::testing::InSequence seq;
+
+ EXPECT_CALL(*setup.MockReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&]() {
+ setup.MockReadProcessorFactory->CreateProcessor(setup.MockReadProcessor);
+ });
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnInitRequest(_))
+ .WillOnce(Invoke([&](const Ydb::Topic::StreamReadMessage::InitRequest& req) {
+ UNIT_ASSERT(req.direct_read());
+ UNIT_ASSERT_EQUAL(req.topics_read_settings_size(), 1);
+ UNIT_ASSERT_EQUAL(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
+ UNIT_ASSERT_EQUAL(req.topics_read_settings(0).partition_ids_size(), 1);
+ UNIT_ASSERT_EQUAL(req.topics_read_settings(0).partition_ids(0), startPartitionSessionRequest.PartitionId);
+ }));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnStartPartitionSessionResponse(_))
+ .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamReadMessage::StartPartitionSessionResponse& resp) {
+ UNIT_ASSERT_EQUAL(resp.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
+ }));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnDirectReadAck(_))
+ .Times(4);
+ }
+
+ // There are two sequences, because OnCreateProcessor from the second sequence may be called
+ // before OnStartPartitionSessionResponse from the first sequence.
+
+ {
+ ::testing::InSequence sequence;
+
+ EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&]() {
+ setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor);
+ });
+
+ EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
+ .WillOnce(Invoke([&setup](const Ydb::Topic::StreamDirectReadMessage::InitRequest& req) {
+ UNIT_ASSERT_EQUAL(req.session_id(), SERVER_SESSION_ID);
+ UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings_size(), setup.ReadSessionSettings.Topics_.size());
+ UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
+ UNIT_ASSERT_VALUES_EQUAL(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
+ }));
+
+ EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest& request) {
+ UNIT_ASSERT_VALUES_EQUAL(request.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
+ UNIT_ASSERT_VALUES_EQUAL(request.generation(), startPartitionSessionRequest.Generation);
+ }));
+
+ // Expect OnReadRequest in case it is called before the test ends.
+ // TODO(qyryq) Fix number, not 10.
+ EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_)).Times(AtMost(10));
+ }
+ }
+
+ setup.GetControlSession()->Start();
+ setup.MockReadProcessorFactory->Wait();
+ setup.AddControlResponse(TMockReadSessionProcessor::TServerReadInfo().InitResponse(SERVER_SESSION_ID));
+ setup.AddControlResponse(TMockReadSessionProcessor::TServerReadInfo().StartPartitionSessionRequest(startPartitionSessionRequest));
+
+ {
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TStartPartitionSessionEvent);
+ std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event).Confirm();
+ }
+
+ setup.AddControlResponse(TMockReadSessionProcessor::TServerReadInfo()
+ .StopPartitionSession(stopPartitionSessionRequest));
+
+ setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ .InitResponse());
+
+ setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ .StartDirectReadPartitionSessionResponse(startPartitionSessionRequest.PartitionSessionId));
+
+ size_t offset = 0, i = 0;
+
+ // Verify that the session receives data sent to direct read session:
+ for (size_t directReadId = 1; directReadId < stopPartitionSessionRequest.LastDirectReadId; ++directReadId) {
+ auto resp = TMockDirectReadSessionProcessor::TServerReadInfo()
+ .PartitionData(startPartitionSessionRequest.PartitionSessionId, directReadId)
+ // TODO(qyryq) Test with compression!
+ // .Batch("producer-id-1", Ydb::Topic::Codec::CODEC_ZSTD);
+ .Batch("producer-id-1", Ydb::Topic::Codec::CODEC_RAW);
+
+ resp.Message(offset, TStringBuilder() << "message-" << offset, offset);
+ ++offset;
+ resp.Message(offset, TStringBuilder() << "message-" << offset, offset);
+ ++offset;
+ setup.AddDirectReadResponse(resp);
+
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
+ auto& e = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
+ i += e.GetMessagesCount();
+ }
+
+ while (i < offset) {
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
+ auto& e = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
+ i += e.GetMessagesCount();
+ }
+
+ {
+ // Verify that the session receives TStopPartitionSessionEvent(graceful=true) after data was received:
+
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TStopPartitionSessionEvent);
+ auto e = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&*event);
+ e->Confirm();
+ }
+
+ {
+ // Verify that the session receives TPartitionSessionClosedEvent after data was received:
+
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TPartitionSessionClosedEvent);
+ // auto e = std::get_if<TReadSessionEvent::TPartitionSessionClosedEvent>(&*event);
+ }
+
+ setup.AssertNoEvents();
+
+ // ::testing::Mock::VerifyAndClear(setup.MockDirectReadProcessorFactory);
+ // ::testing::Mock::VerifyAndClear(setup.MockDirectReadProcessor);
+ }
+
+ Y_UNIT_TEST(StopPartitionSession) {
+ auto const startPartitionSessionRequest = TStartPartitionSessionRequest{
+ .PartitionId = 1,
+ .PartitionSessionId = 2,
+ .NodeId = 3,
+ .Generation = 4,
+ };
+
+ TDirectReadSessionImplTestSetup setup;
+ setup.ReadSessionSettings.Topics_[0].AppendPartitionIds(startPartitionSessionRequest.PartitionId);
+
+ {
+ {
+ ::testing::InSequence seq;
+
+ EXPECT_CALL(*setup.MockReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&]() {
+ setup.MockReadProcessorFactory->CreateProcessor(setup.MockReadProcessor);
+ });
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnInitRequest(_))
+ .WillOnce(Invoke([&](const Ydb::Topic::StreamReadMessage::InitRequest& req) {
+ UNIT_ASSERT(req.direct_read());
+ UNIT_ASSERT_EQUAL(req.topics_read_settings_size(), 1);
+ UNIT_ASSERT_EQUAL(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
+ UNIT_ASSERT_EQUAL(req.topics_read_settings(0).partition_ids_size(), 1);
+ UNIT_ASSERT_EQUAL(req.topics_read_settings(0).partition_ids(0), startPartitionSessionRequest.PartitionId);
+ }));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnStartPartitionSessionResponse(_))
+ .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamReadMessage::StartPartitionSessionResponse& resp) {
+ UNIT_ASSERT_EQUAL(resp.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
+ }));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnDirectReadAck(_))
+ .Times(4);
+ }
+
+ // There are two sequences, because OnCreateProcessor from the second sequence may be called
+ // before OnStartPartitionSessionResponse from the first sequence.
+
+ {
+ ::testing::InSequence sequence;
+
+ EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&]() {
+ setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor);
+ });
+
+ EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
+ .WillOnce(Invoke([&setup](const Ydb::Topic::StreamDirectReadMessage::InitRequest& req) {
+ UNIT_ASSERT_EQUAL(req.session_id(), SERVER_SESSION_ID);
+ UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings_size(), setup.ReadSessionSettings.Topics_.size());
+ UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
+ UNIT_ASSERT_VALUES_EQUAL(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
+ }));
+
+ EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest& request) {
+ UNIT_ASSERT_VALUES_EQUAL(request.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
+ UNIT_ASSERT_VALUES_EQUAL(request.generation(), startPartitionSessionRequest.Generation);
+ }));
+
+ // Expect OnReadRequest in case it is called before the test ends.
+ // TODO(qyryq) Fix number, not 10.
+ EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_)).Times(AtMost(10));
+ }
+ }
+
+ setup.GetControlSession()->Start();
+ {
+ auto r = TMockReadSessionProcessor::TServerReadInfo();
+ setup.AddControlResponse(r.InitResponse(SERVER_SESSION_ID));
+ }
+
+ {
+ auto r = TMockReadSessionProcessor::TServerReadInfo();
+ setup.AddControlResponse(r.StartPartitionSessionRequest(startPartitionSessionRequest));
+ }
+
+ {
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TStartPartitionSessionEvent);
+ std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event).Confirm();
+ }
+
+ {
+ auto r = TMockDirectReadSessionProcessor::TServerReadInfo();
+ setup.AddDirectReadResponse(r.InitResponse());
+ }
+
+ {
+ auto r = TMockDirectReadSessionProcessor::TServerReadInfo();
+ setup.AddDirectReadResponse(r.StartDirectReadPartitionSessionResponse(startPartitionSessionRequest.PartitionSessionId));
+ }
+
+ i64 offset = 0, i = 0;
+
+ // Verify that the session receives data sent to direct read session:
+ for (size_t directReadId = 1; directReadId < 5; ++directReadId) {
+ auto resp = TMockDirectReadSessionProcessor::TServerReadInfo();
+ resp.PartitionData(startPartitionSessionRequest.PartitionSessionId, directReadId)
+ // TODO(qyryq) Test with compression!
+ // .Batch("producer-id-1", Ydb::Topic::Codec::CODEC_ZSTD);
+ .Batch("producer-id-1", Ydb::Topic::Codec::CODEC_RAW);
+
+ resp.Message(offset, TStringBuilder() << "message-" << offset, offset);
+ ++offset;
+ resp.Message(offset, TStringBuilder() << "message-" << offset, offset);
+ ++offset;
+ setup.AddDirectReadResponse(resp);
+
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
+ auto& e = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
+ i += e.GetMessagesCount();
+ e.Commit();
+ }
+
+ while (i < offset) {
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
+ auto& e = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
+ i += e.GetMessagesCount();
+ }
+
+ {
+ auto r = TMockReadSessionProcessor::TServerReadInfo();
+ setup.AddControlResponse(
+ r.StopPartitionSession({
+ .PartitionSessionId = 2,
+ .Graceful = false,
+ .CommittedOffset = offset,
+ }));
+ }
+
+ // TODO(qyryq) Send some bogus events from server, the client should ignore them.
+
+ {
+ // Verify that the session receives TStopPartitionSessionEvent after data was received:
+
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TPartitionSessionClosedEvent);
+ // auto e = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&*event);
+ // UNIT_ASSERT(!e.Graceful);
+ // UNIT_ASSERT(e.CommittedOffset == offset);
+ }
+
+ setup.MockReadProcessorFactory->Wait();
+ setup.MockDirectReadProcessorFactory->Wait();
+
+ setup.AssertNoEvents();
+ }
+
+ Y_UNIT_TEST(EmptyDirectReadResponse) {
+ // Sometimes the server might send a DirectReadResponse with no data, but with bytes_size value > 0.
+ // It can happen, if the server tried to send DirectReadResponse, but did not succeed,
+ // and in the meantime the messages that should had been sent have been rotated by retention period,
+ // and do not exist anymore. To keep ReadSizeBudget bookkeeping correct, the server still sends the an DirectReadResponse,
+ // and SDK should process it correctly: basically it should immediately send a ReadRequest(bytes_size=DirectReadResponse.bytes_size).
+
+ auto const startPartitionSessionRequest = TStartPartitionSessionRequest{
+ .PartitionId = 1,
+ .PartitionSessionId = 2,
+ .NodeId = 3,
+ .Generation = 4,
+ };
+
+ i64 bytesSize = 12345;
+
+ TDirectReadSessionImplTestSetup setup;
+ setup.ReadSessionSettings.Topics_[0].AppendPartitionIds(startPartitionSessionRequest.PartitionId);
+
+ {
+ {
+ ::testing::InSequence seq;
+
+ EXPECT_CALL(*setup.MockReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&]() {
+ setup.MockReadProcessorFactory->CreateProcessor(setup.MockReadProcessor);
+ });
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnInitRequest(_))
+ .WillOnce(Invoke([&](const Ydb::Topic::StreamReadMessage::InitRequest& req) {
+ UNIT_ASSERT(req.direct_read());
+ UNIT_ASSERT_EQUAL(req.topics_read_settings_size(), 1);
+ UNIT_ASSERT_EQUAL(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
+ UNIT_ASSERT_EQUAL(req.topics_read_settings(0).partition_ids_size(), 1);
+ UNIT_ASSERT_EQUAL(req.topics_read_settings(0).partition_ids(0), startPartitionSessionRequest.PartitionId);
+ }));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnStartPartitionSessionResponse(_))
+ .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamReadMessage::StartPartitionSessionResponse& resp) {
+ UNIT_ASSERT_EQUAL(resp.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
+ }));
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnDirectReadAck(_))
+ .Times(1);
+
+ EXPECT_CALL(*setup.MockReadProcessor, OnReadRequest(_))
+ .WillOnce(Invoke([&](const Ydb::Topic::StreamReadMessage::ReadRequest& req) {
+ UNIT_ASSERT_EQUAL(req.bytes_size(), bytesSize);
+ }));
+ }
+
+ // There are two sequences, because OnCreateProcessor from the second sequence may be called
+ // before OnStartPartitionSessionResponse from the first sequence.
+
+ {
+ ::testing::InSequence sequence;
+
+ EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&]() {
+ setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor);
+ });
+
+ EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
+ .WillOnce(Invoke([&setup](const Ydb::Topic::StreamDirectReadMessage::InitRequest& req) {
+ UNIT_ASSERT_EQUAL(req.session_id(), SERVER_SESSION_ID);
+ UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings_size(), setup.ReadSessionSettings.Topics_.size());
+ UNIT_ASSERT_VALUES_EQUAL(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
+ UNIT_ASSERT_VALUES_EQUAL(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
+ }));
+
+ EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ .WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest& request) {
+ UNIT_ASSERT_VALUES_EQUAL(request.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
+ UNIT_ASSERT_VALUES_EQUAL(request.generation(), startPartitionSessionRequest.Generation);
+ }));
+ }
+ }
+
+ setup.GetControlSession()->Start();
+ {
+ auto r = TMockReadSessionProcessor::TServerReadInfo();
+ setup.AddControlResponse(r.InitResponse(SERVER_SESSION_ID));
+ }
+
+ {
+ auto r = TMockReadSessionProcessor::TServerReadInfo();
+ setup.AddControlResponse(r.StartPartitionSessionRequest(startPartitionSessionRequest));
+ }
+
+ {
+ std::optional<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TStartPartitionSessionEvent);
+ std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event).Confirm();
+ }
+
+ {
+ auto r = TMockDirectReadSessionProcessor::TServerReadInfo();
+ setup.AddDirectReadResponse(r.InitResponse());
+ }
+
+ {
+ auto r = TMockDirectReadSessionProcessor::TServerReadInfo();
+ setup.AddDirectReadResponse(r.StartDirectReadPartitionSessionResponse(startPartitionSessionRequest.PartitionSessionId));
+ }
+
+ i64 directReadId = 1;
+
+ auto resp = TMockDirectReadSessionProcessor::TServerReadInfo();
+ resp.PartitionData(startPartitionSessionRequest.PartitionSessionId, directReadId, bytesSize);
+ setup.AddDirectReadResponse(resp);
+
+ setup.MockReadProcessorFactory->Wait();
+ setup.MockDirectReadProcessorFactory->Wait();
+
+ setup.AssertNoEvents();
+ }
+
+} // Y_UNIT_TEST_SUITE_F(DirectReadWithControlSession)
+
+
+Y_UNIT_TEST_SUITE_F(DirectReadSession, TDirectReadTestsFixture) {
+
+ /*
+ This suite tests TDirectReadSession in isolation, without control session.
+ */
+
+ Y_UNIT_TEST(InitAndStartPartitionSession) {
+ /*
+ Create DirectRead processor, send InitRequest, StartDirectReadPartitionSessionRequest.
+ */
+
+ TDirectReadSessionImplTestSetup setup;
+
+ auto gotStart = NThreading::NewPromise();
+
+ TPartitionSessionId partitionSessionId = 1;
+
+ class TControlCallbacks : public IDirectReadSessionControlCallbacks {};
+ auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>());
+
+ {
+ ::testing::InSequence seq;
+
+ EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
+
+ EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
+ .WillOnce(Invoke([&](const Ydb::Topic::StreamDirectReadMessage::InitRequest& req) {
+ UNIT_ASSERT_EQUAL(req.session_id(), SERVER_SESSION_ID);
+ UNIT_ASSERT_EQUAL(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
+ }));
+
+ EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ .WillOnce(Invoke([&](const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest& req) {
+ UNIT_ASSERT_EQUAL(req.partition_session_id(), static_cast<i64>(partitionSessionId));
+ gotStart.SetValue();
+ }));
+ }
+
+ session->Start();
+
+ setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ .InitResponse());
+
+ session->AddPartitionSession({ .PartitionSessionId = partitionSessionId, .Location = {2, 3} });
+
+ gotStart.GetFuture().Wait();
+ }
+
+ Y_UNIT_TEST(NoRetryDirectReadSession) {
+ /*
+ If the session cannot establish a connection, and the retry policy does not allow to make another retry,
+ the session should be aborted and the client should receive TSessionClosedEvent.
+ */
+
+ TDirectReadSessionImplTestSetup setup;
+ setup.ReadSessionSettings.RetryPolicy(NYdb::NTopic::IRetryPolicy::GetNoRetryPolicy());
+
+ auto gotClosedEvent = NThreading::NewPromise();
+
+ EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ .WillOnce([&]() { setup.MockDirectReadProcessorFactory->FailCreation(); });
+
+ class TControlCallbacks : public IDirectReadSessionControlCallbacks {
+ public:
+ TControlCallbacks(NThreading::TPromise<void>& gotClosedEvent) : GotClosedEvent(gotClosedEvent) {}
+ void AbortSession(TSessionClosedEvent&&) override { GotClosedEvent.SetValue(); }
+ NThreading::TPromise<void>& GotClosedEvent;
+ };
+
+ auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(gotClosedEvent));
+
+ session->Start();
+ setup.MockDirectReadProcessorFactory->Wait();
+ gotClosedEvent.GetFuture().Wait();
+ }
+
+ Y_UNIT_TEST(RetryDirectReadSession) {
+ /*
+ If the retry policy allows retries, keep trying to establish connection.
+ */
+ TDirectReadSessionImplTestSetup setup;
+ size_t nRetries = 2;
+ setup.ReadSessionSettings.RetryPolicy(NYdb::NTopic::IRetryPolicy::GetFixedIntervalPolicy(
+ TDuration::MilliSeconds(1), TDuration::MilliSeconds(1), nRetries));
+
+ auto gotClosedEvent = NThreading::NewPromise();
+
+ ON_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ .WillByDefault([&]() { setup.MockDirectReadProcessorFactory->FailCreation(); });
+
+ EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ .Times(1 + nRetries); // First call + N retries.
+
+ class TControlCallbacks : public IDirectReadSessionControlCallbacks {
+ public:
+ TControlCallbacks(NThreading::TPromise<void>& gotClosedEvent) : GotClosedEvent(gotClosedEvent) {}
+ void AbortSession(TSessionClosedEvent&&) override { GotClosedEvent.SetValue(); }
+ NThreading::TPromise<void>& GotClosedEvent;
+ };
+
+ auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(gotClosedEvent));
+ session->Start();
+ setup.MockDirectReadProcessorFactory->Wait();
+
+ gotClosedEvent.GetFuture().Wait();
+ }
+
+ // Y_UNIT_TEST(NoRetryPartitionSession) {
+ // /*
+ // If we get a StopDirectReadPartitionSession event, and the retry policy does not allow to send another Start-request,
+ // the session should be aborted and the client should receive TSessionClosedEvent.
+ // */
+ // TDirectReadSessionImplTestSetup setup;
+ // setup.ReadSessionSettings.RetryPolicy(NYdb::NTopic::IRetryPolicy::GetNoRetryPolicy());
+
+ // auto gotClosedEvent = NThreading::NewPromise();
+
+ // {
+ // ::testing::InSequence seq;
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_));
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_));
+ // }
+
+ // class TControlCallbacks : public IDirectReadSessionControlCallbacks {
+ // public:
+ // TControlCallbacks(NThreading::TPromise<void>& gotClosedEvent) : GotClosedEvent(gotClosedEvent) {}
+ // void AbortSession(TSessionClosedEvent&&) override { GotClosedEvent.SetValue(); }
+ // NThreading::TPromise<void>& GotClosedEvent;
+ // };
+
+ // auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(gotClosedEvent));
+ // session->Start();
+ // setup.MockDirectReadProcessorFactory->Wait();
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .InitResponse());
+
+ // session->AddPartitionSession({ .PartitionSessionId = 1, .Location = {2, 3} });
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, TPartitionSessionId(1)));
+
+ // gotClosedEvent.GetFuture().Wait();
+ // }
+
+ // Y_UNIT_TEST(RetryPartitionSession) {
+ // /*
+ // Keep sending Start-requests until the retry policy denies next retry.
+ // */
+ // TDirectReadSessionImplTestSetup setup;
+ // size_t nRetries = 2;
+ // setup.ReadSessionSettings.RetryPolicy(NYdb::NTopic::IRetryPolicy::GetFixedIntervalPolicy(
+ // TDuration::MilliSeconds(1), TDuration::MilliSeconds(1), nRetries));
+
+ // auto gotClosedEvent = NThreading::NewPromise();
+
+ // {
+ // ::testing::InSequence seq;
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_));
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ // .Times(1 + nRetries);
+ // }
+
+ // class TControlCallbacks : public IDirectReadSessionControlCallbacks {
+ // public:
+ // TControlCallbacks(NThreading::TPromise<void>& gotClosedEvent) : GotClosedEvent(gotClosedEvent) {}
+ // void AbortSession(TSessionClosedEvent&&) override { GotClosedEvent.SetValue(); }
+ // void ScheduleCallback(TDuration, std::function<void()> cb, TDeferredActions<false>& deferred) override {
+ // deferred.DeferCallback(cb);
+ // }
+ // NThreading::TPromise<void>& GotClosedEvent;
+ // };
+
+ // auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(gotClosedEvent));
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .InitResponse());
+
+ // session->Start();
+ // setup.MockDirectReadProcessorFactory->Wait();
+
+ // TPartitionSessionId partitionSessionId = 1;
+
+ // session->AddPartitionSession({ .PartitionSessionId = partitionSessionId, .Location = {2, 3} });
+
+ // for (size_t i = 0; i < 1 + nRetries; ++i) {
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, partitionSessionId));
+ // }
+
+ // gotClosedEvent.GetFuture().Wait();
+ // }
+
+ // Y_UNIT_TEST(ResetRetryStateOnSuccess) {
+ // /*
+ // Test that the client creates a new retry state on the first error after a successful response.
+
+ // With the default retry policy (exponential backoff), retry delays grow after each unsuccessful request.
+ // After the first successful request retry state should be reset, so the delay after another unsuccessful request will be small.
+
+ // E.g. if the exponential backoff policy is used, and minDelay is 1ms, and scaleFactor is 1000, then the following should happen:
+
+ // client -> server: InitRequest
+ // client <-- server: InitResponse
+ // client -> server: StartDirectReadPartitionSessionRequest
+ // client <- server: StopDirectReadPartitionSession(OVERLOADED)
+ // note over client: Wait 1 ms
+ // client -> server: StartDirectReadPartitionSessionRequest
+ // client <-- server: StartDirectReadPartitionSessionResponse
+ // note over client: Reset RetryState
+ // client <- server: StopDirectReadPartitionSession(OVERLOADED)
+ // note over client: Wait 1 ms, not 1 second
+ // client -> server: StartDirectReadPartitionSessionRequest
+ // */
+
+ // TDirectReadSessionImplTestSetup setup;
+ // setup.ReadSessionSettings.RetryPolicy(setup.MockRetryPolicy);
+
+ // auto gotFinalStart = NThreading::NewPromise();
+ // TPartitionSessionId partitionSessionId = 1;
+
+ // {
+ // ::testing::InSequence sequence;
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(_))
+ // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_));
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_));
+
+ // // The client receives StopDirectReadPartitionSession, create TDirectReadSession::PartitionSessions[i].RetryState
+ // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
+ // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_));
+
+ // // The client receives StartDirectReadPartitionSessionResponse, resets retry state,
+ // // then receives StopDirectReadPartitionSession and has to create a new retry state.
+ // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
+ // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ // .WillOnce([&]() { gotFinalStart.SetValue(); });
+ // }
+
+ // class TControlCallbacks : public IDirectReadSessionControlCallbacks {
+ // public:
+ // void ScheduleCallback(TDuration, std::function<void()> cb, TDeferredActions<false>& deferred) override {
+ // deferred.DeferCallback(cb);
+ // }
+ // };
+
+ // auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>());
+
+ // session->Start();
+ // setup.MockDirectReadProcessorFactory->Wait();
+
+ // session->AddPartitionSession({ .PartitionSessionId = partitionSessionId, .Location = {2, 3} });
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .InitResponse());
+
+ // setup.MockRetryPolicy->Delay = TDuration::MilliSeconds(1);
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, partitionSessionId));
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .StartDirectReadPartitionSessionResponse(partitionSessionId));
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, partitionSessionId));
+
+ // gotFinalStart.GetFuture().Wait();
+ // }
+
+ // Y_UNIT_TEST(PartitionSessionRetainsRetryStateOnReconnects) {
+ // /*
+ // We need to retain retry states of separate partition sessions
+ // even after reestablishing the connection to a node.
+
+ // E.g. partition session receives StopDirectReadPartitionSession
+ // and we need to send StartDirectReadPartitionSessionRequest in 5 minutes due to the retry policy.
+
+ // But in the meantime, the session loses connection to the server and reconnects within several seconds.
+
+ // We must not send that StartDirectReadPartitionSessionRequest right away, but wait ~5 minutes.
+
+ // client -> server: InitRequest
+ // client <-- server: InitResponse
+ // client -> server: StartDirectReadPartitionSessionRequest
+ // client <- server: StopDirectReadPartitionSession(OVERLOADED)
+ // note over client: Wait N seconds before sending Start again
+ // ... Connection lost, client reconnects to the server ...
+ // client -> server: InitRequest
+ // client <-- server: InitResponse
+ // note over client: Still has to wait ~N seconds
+ // client -> server: StartDirectReadPartitionSessionRequest
+ // */
+
+ // TDirectReadSessionImplTestSetup setup;
+ // setup.ReadSessionSettings.RetryPolicy(setup.MockRetryPolicy);
+
+ // auto gotFinalStart = NThreading::NewPromise();
+ // auto gotInitRequest = NThreading::NewPromise();
+ // auto calledRead = NThreading::NewPromise();
+ // TPartitionSessionId partitionSessionId = 1;
+ // auto secondProcessor = MakeIntrusive<TMockDirectReadSessionProcessor>();
+ // auto delay = TDuration::Seconds(300);
+
+ // {
+ // ::testing::InSequence sequence;
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(1))
+ // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_));
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_));
+
+ // // The client receives StopDirectReadPartitionSession, create TDirectReadSession::PartitionSessions[i].RetryState
+ // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
+ // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
+
+ // // The client loses connection, create TDirectReadSession.RetryState
+ // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
+ // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
+
+ // // The connection is lost at this point, the client tries to reconnect.
+ // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(2))
+ // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(secondProcessor); });
+
+ // EXPECT_CALL(*secondProcessor, OnInitRequest(_))
+ // .WillOnce([&]() { gotInitRequest.SetValue(); });
+
+ // // The client waits `delay` seconds before sending the StartDirectReadPartitionSessionRequest.
+
+ // EXPECT_CALL(*secondProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ // .WillOnce([&]() { gotFinalStart.SetValue(); });
+ // }
+
+ // std::function<void()> callback;
+
+ // class TControlCallbacks : public IDirectReadSessionControlCallbacks {
+ // public:
+ // TControlCallbacks(std::function<void()>& callback) : Callback(callback) {}
+ // void ScheduleCallback(TDuration, std::function<void()> cb, TDeferredActions<false>&) override {
+ // Callback = cb;
+ // }
+ // std::function<void()>& Callback;
+ // };
+
+ // auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(callback));
+
+ // session->Start();
+ // setup.MockDirectReadProcessorFactory->Wait();
+
+ // session->AddPartitionSession({ .PartitionSessionId = partitionSessionId, .Location = {2, 3} });
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .InitResponse());
+
+ // setup.MockRetryPolicy->Delay = delay;
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, partitionSessionId));
+
+ // // Besides logs, these durations don't really affect anything in tests.
+ // setup.MockRetryPolicy->Delay = TDuration::Seconds(1);
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .Failure());
+
+ // gotInitRequest.GetFuture().Wait();
+ // secondProcessor->AddServerResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .InitResponse());
+
+ // // Ensure that the callback is called after the direct session got InitResponse.
+ // setup.WaitForWorkingDirectReadSession();
+
+ // callback();
+
+ // gotFinalStart.GetFuture().Wait();
+
+ // secondProcessor->Wait();
+ // secondProcessor->Validate();
+ // }
+
+ // Y_UNIT_TEST(RetryWithoutConnectionResetsPartitionSession) {
+ // /*
+ // If there are pending StartDirectReadPartitionSession requests that were delayed due to previous errors,
+ // and the entire session then loses connection for an extended period of time (greater than the callback delays),
+ // the following process should be followed:
+
+ // When the session finally reconnects, the pending Start requests should be sent immediately.
+ // This is because their callbacks have already been fired, but the requests were not sent due to the lack of connection.
+
+ // client -> server: InitRequest
+ // client <-- server: InitResponse
+ // client -> server: StartDirectReadPartitionSessionRequest
+ // client <- server: StopDirectReadPartitionSession(OVERLOADED)
+ // note over client: Wait 1 second before sending Start again
+ // ... Connection lost ...
+ // note over client: SendStart... callback fires, resets state
+ // ... Connection reestablished in 1 minute ...
+ // client -> server: InitRequest
+ // client <-- server: InitResponse
+ // note over client: Send the Start request immediately
+ // client -> server: StartDirectReadPartitionSessionRequest
+ // */
+
+ // TDirectReadSessionImplTestSetup setup;
+ // setup.ReadSessionSettings.RetryPolicy(setup.MockRetryPolicy);
+
+ // auto gotFinalStart = NThreading::NewPromise();
+ // auto calledRead = NThreading::NewPromise();
+ // TPartitionSessionId partitionSessionId = 1;
+ // auto secondProcessor = MakeIntrusive<TMockDirectReadSessionProcessor>();
+ // auto delay = TDuration::MilliSeconds(1);
+
+ // {
+ // ::testing::InSequence sequence;
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(1))
+ // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(setup.MockDirectReadProcessor); });
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
+ // .Times(1);
+
+ // EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ // .Times(1);
+
+ // // The client receives StopDirectReadPartitionSession, create TDirectReadSession::PartitionSessions[i].RetryState
+ // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
+ // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
+
+ // // The client loses connection, create TDirectReadSession.RetryState
+ // EXPECT_CALL(*setup.MockRetryPolicy, CreateRetryState())
+ // .WillOnce(Return(std::make_unique<TMockRetryState>(setup.MockRetryPolicy)));
+
+ // // The connection is lost at this point, the client tries to reconnect.
+ // EXPECT_CALL(*setup.MockDirectReadProcessorFactory, OnCreateProcessor(2))
+ // .WillOnce([&]() { setup.MockDirectReadProcessorFactory->CreateProcessor(secondProcessor); });
+
+ // EXPECT_CALL(*secondProcessor, OnInitRequest(_))
+ // .Times(1);
+
+ // EXPECT_CALL(*secondProcessor, OnStartDirectReadPartitionSessionRequest(_))
+ // .WillOnce([&]() { gotFinalStart.SetValue(); });
+ // }
+
+ // std::function<void()> callback;
+
+ // class TControlCallbacks : public IDirectReadSessionControlCallbacks {
+ // public:
+ // TControlCallbacks(TDuration delay, std::function<void()>& callback) : Delay(delay), Callback(callback) {}
+ // void ScheduleCallback(TDuration d, std::function<void()> cb, TDeferredActions<false>&) override {
+ // UNIT_ASSERT_EQUAL(Delay, d);
+ // Callback = cb;
+ // }
+ // TDuration Delay;
+ // std::function<void()>& Callback;
+ // };
+
+ // auto session = setup.GetDirectReadSession(std::make_shared<TControlCallbacks>(delay, callback));
+
+ // session->Start();
+ // setup.MockDirectReadProcessorFactory->Wait();
+
+ // session->AddPartitionSession({ .PartitionSessionId = partitionSessionId, .Location = {2, 3} });
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .InitResponse());
+
+ // setup.MockRetryPolicy->Delay = delay;
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .StopDirectReadPartitionSession(Ydb::StatusIds::OVERLOADED, partitionSessionId));
+
+ // // Besides logs, these durations don't really affect anything in tests.
+ // setup.MockRetryPolicy->Delay = TDuration::Seconds(10);
+
+ // setup.AddDirectReadResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .Failure());
+
+ // // Delayed callback is fired, but there is no connection, so the partition session state changes to IDLE,
+ // // and the request should be sent after receiving InitResponse.
+ // callback();
+
+ // secondProcessor->AddServerResponse(TMockDirectReadSessionProcessor::TServerReadInfo()
+ // .InitResponse());
+
+ // gotFinalStart.GetFuture().Wait();
+
+ // secondProcessor->Wait();
+ // secondProcessor->Validate();
+ // }
+
+} // Y_UNIT_TEST_SUITE_F(DirectReadSession)
+
+
+Y_UNIT_TEST_SUITE(DirectReadWithServer) {
+
+ /*
+ This suite tests direct read session interaction with server.
+
+ It complements tests from basic_usage_ut.cpp etc, as we run them with direct read disabled/enabled.
+ */
+
+ Y_UNIT_TEST(KillPQTablet) {
+ /*
+ A read session should keep working if a partition tablet is killed and moved to another node.
+ */
+ auto setup = TTopicSdkTestSetup(TEST_CASE_NAME);
+ auto client = setup.MakeClient();
+ auto nextMessageId = 0;
+
+ auto writeMessages = [&](size_t n) {
+ auto writer = client.CreateSimpleBlockingWriteSession(TWriteSessionSettings()
+ .Path(TEST_TOPIC)
+ .MessageGroupId(TEST_MESSAGE_GROUP_ID)
+ .ProducerId(TEST_MESSAGE_GROUP_ID));
+
+ for (size_t i = 0; i < n; ++i, ++nextMessageId) {
+ auto res = writer->Write(TStringBuilder() << "message-" << nextMessageId);
+ UNIT_ASSERT(res);
+ }
+
+ writer->Close();
+ };
+
+ writeMessages(1);
+
+ auto gotFirstMessage = NThreading::NewPromise();
+ auto gotSecondMessage = NThreading::NewPromise();
+
+ auto readerSettings = TReadSessionSettings()
+ .ConsumerName(TEST_CONSUMER)
+ .AppendTopics(TEST_TOPIC)
+ // .DirectRead(true)
+ ;
+
+ TIntrusivePtr<TPartitionSession> partitionSession;
+
+ readerSettings.EventHandlers_
+ .DataReceivedHandler([&](TReadSessionEvent::TDataReceivedEvent& e) {
+ switch (e.GetMessages()[0].GetSeqNo()) {
+ case 1:
+ gotFirstMessage.SetValue();
+ break;
+ case 2:
+ gotSecondMessage.SetValue();
+ break;
+ }
+ e.Commit();
+ })
+ .StartPartitionSessionHandler([&](TReadSessionEvent::TStartPartitionSessionEvent& e) {
+ e.Confirm();
+ partitionSession = e.GetPartitionSession();
+ })
+ .StopPartitionSessionHandler([&](TReadSessionEvent::TStopPartitionSessionEvent& e) {
+ e.Confirm();
+ })
+ ;
+
+ auto reader = client.CreateReadSession(readerSettings);
+
+ gotFirstMessage.GetFuture().Wait();
+
+ auto getPartitionGeneration = [&client]() {
+ auto description = client.DescribePartition(TEST_TOPIC, 0, TDescribePartitionSettings().IncludeLocation(true)).GetValueSync();
+ return description.GetPartitionDescription().GetPartition().GetPartitionLocation()->GetGeneration();
+ };
+
+ auto firstGenerationId = getPartitionGeneration();
+
+ setup.GetServer().KillTopicPqTablets(setup.GetTopicPath());
+
+ while (firstGenerationId == getPartitionGeneration()) {
+ Sleep(TDuration::MilliSeconds(100));
+ }
+
+ writeMessages(1);
+
+ gotSecondMessage.GetFuture().Wait();
+
+ reader->Close();
+ }
+
+ Y_UNIT_TEST(KillPQRBTablet) {
+ /*
+ A read session should keep working if a balancer tablet is killed and moved to another node.
+ */
+ // TODO
+ return;
+ auto setup = TTopicSdkTestSetup(TEST_CASE_NAME);
+ auto client = setup.MakeClient();
+ auto nextMessageId = 0;
+
+ auto writeMessages = [&](size_t n) {
+ auto writer = client.CreateSimpleBlockingWriteSession(TWriteSessionSettings()
+ .Path(TEST_TOPIC)
+ .MessageGroupId(TEST_MESSAGE_GROUP_ID)
+ .ProducerId(TEST_MESSAGE_GROUP_ID));
+
+ for (size_t i = 0; i < n; ++i, ++nextMessageId) {
+ auto res = writer->Write(TStringBuilder() << "message-" << nextMessageId);
+ UNIT_ASSERT(res);
+ }
+
+ writer->Close();
+ };
+
+ writeMessages(1);
+
+ auto gotFirstMessage = NThreading::NewPromise();
+ auto gotSecondMessage = NThreading::NewPromise();
+
+ auto readerSettings = TReadSessionSettings()
+ .ConsumerName(TEST_CONSUMER)
+ .AppendTopics(TEST_TOPIC)
+ // .DirectRead(true)
+ ;
+
+ TIntrusivePtr<TPartitionSession> partitionSession;
+
+ readerSettings.EventHandlers_
+ .DataReceivedHandler([&](TReadSessionEvent::TDataReceivedEvent& e) {
+ switch (e.GetMessages()[0].GetSeqNo()) {
+ case 1:
+ gotFirstMessage.SetValue();
+ break;
+ case 2:
+ gotSecondMessage.SetValue();
+ break;
+ }
+ e.Commit();
+ })
+ .StartPartitionSessionHandler([&](TReadSessionEvent::TStartPartitionSessionEvent& e) {
+ e.Confirm();
+ partitionSession = e.GetPartitionSession();
+ })
+ ;
+
+ auto reader = client.CreateReadSession(readerSettings);
+
+ gotFirstMessage.GetFuture().Wait();
+
+ auto getPartitionGeneration = [&client]() {
+ auto description = client.DescribePartition(TEST_TOPIC, 0, TDescribePartitionSettings().IncludeLocation(true)).GetValueSync();
+ return description.GetPartitionDescription().GetPartition().GetPartitionLocation()->GetGeneration();
+ };
+
+ auto firstGenerationId = getPartitionGeneration();
+
+ setup.GetServer().KillTopicPqrbTablet(setup.GetTopicPath());
+
+ while (firstGenerationId == getPartitionGeneration()) {
+ Sleep(TDuration::MilliSeconds(100));
+ }
+
+ writeMessages(1);
+
+ gotSecondMessage.GetFuture().Wait();
+
+ reader->Close();
+ }
+
+ Y_UNIT_TEST(Devslice) {
+ return;
+ auto driverConfig = NYdb::TDriverConfig()
+ .SetEndpoint(GetEnv("ENDPOINT"))
+ .SetDatabase("/Root/testdb")
+ .SetLog(std::unique_ptr<TLogBackend>(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release()))
+ .SetAuthToken(GetEnv("YDB_TOKEN"));
+
+ auto driver = NYdb::TDriver(driverConfig);
+
+ auto clientSettings = TTopicClientSettings();
+ auto client = TTopicClient(driver, clientSettings);
+
+ auto settings = TReadSessionSettings()
+ .AppendTopics(TTopicReadSettings("t1").AppendPartitionIds({0}))
+ .ConsumerName("c1")
+ // .DirectRead(true)
+ ;
+
+ settings.EventHandlers_
+ .StartPartitionSessionHandler([](TReadSessionEvent::TStartPartitionSessionEvent& e) {
+ e.Confirm();
+ })
+ .StopPartitionSessionHandler([](TReadSessionEvent::TStopPartitionSessionEvent& e) {
+ e.Confirm();
+ })
+ .DataReceivedHandler([](TReadSessionEvent::TDataReceivedEvent& e) {
+ for (ui32 i = 0; i < e.GetMessages().size(); ++i) {
+ auto& m = e.GetMessages()[i];
+ Cerr << (TStringBuilder() << "Message: " << m.GetData() << Endl);
+ m.Commit();
+ }
+ });
+
+ auto reader = client.CreateReadSession(settings);
+
+ Sleep(TDuration::Seconds(1000));
+
+ reader->Close();
+ }
+
+} // Y_UNIT_TEST_SUITE_F(DirectReadWithServer)
+
+} // namespace NYdb::NTopic::NTests
diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/with_direct_read_ut/ya.make b/ydb/public/sdk/cpp/src/client/topic/ut/with_direct_read_ut/ya.make
new file mode 100644
index 00000000000..619fa4650a6
--- /dev/null
+++ b/ydb/public/sdk/cpp/src/client/topic/ut/with_direct_read_ut/ya.make
@@ -0,0 +1,46 @@
+UNITTEST_FOR(ydb/public/sdk/cpp/src/client/topic)
+
+IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND)
+ SIZE(LARGE)
+ TAG(ya:fat)
+ELSE()
+ SIZE(MEDIUM)
+ENDIF()
+
+FORK_SUBTESTS()
+
+PEERDIR(
+ ydb/core/testlib/default
+ library/cpp/testing/gmock_in_unittest
+ ydb/public/lib/json_value
+ ydb/public/lib/yson_value
+ ydb/public/sdk/cpp/src/client/driver
+ ydb/public/sdk/cpp/src/client/topic/ut/ut_utils
+ ydb/public/sdk/cpp/src/client/persqueue_public/ut/ut_utils
+ ydb/core/persqueue/ut/common
+ ydb/core/tx/schemeshard/ut_helpers
+)
+
+YQL_LAST_ABI_VERSION()
+
+ENV(PQ_EXPERIMENTAL_DIRECT_READ="1")
+
+SRCDIR(
+ ydb/public/sdk/cpp/src/client/topic/ut
+ ydb/public/sdk/cpp/src/client/topic
+)
+
+SRCS(
+ basic_usage_ut.cpp
+ describe_topic_ut.cpp
+ direct_read_ut.cpp
+ local_partition_ut.cpp
+ topic_to_table_ut.cpp
+)
+
+RESOURCE(
+ ydb/public/sdk/cpp/src/client/topic/ut/resources/topic_A_partition_0_v24-4-2.dat topic_A_partition_0_v24-4-2.dat
+ ydb/public/sdk/cpp/src/client/topic/ut/resources/topic_A_partition_1_v24-4-2.dat topic_A_partition_1_v24-4-2.dat
+)
+
+END()
diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/ya.make b/ydb/public/sdk/cpp/src/client/topic/ut/ya.make
index 4a66f0ed197..161d1615e1e 100644
--- a/ydb/public/sdk/cpp/src/client/topic/ut/ya.make
+++ b/ydb/public/sdk/cpp/src/client/topic/ut/ya.make
@@ -49,3 +49,7 @@ RESOURCE(
)
END()
+
+RECURSE_FOR_TESTS(
+ with_direct_read_ut
+)
diff --git a/ydb/public/sdk/cpp/src/client/topic/ya.make b/ydb/public/sdk/cpp/src/client/topic/ya.make
index edd8aff0aa6..5cf0cbcbce5 100644
--- a/ydb/public/sdk/cpp/src/client/topic/ya.make
+++ b/ydb/public/sdk/cpp/src/client/topic/ya.make
@@ -13,7 +13,7 @@ PEERDIR(
ydb/public/sdk/cpp/src/client/proto
ydb/public/sdk/cpp/src/client/driver
ydb/public/sdk/cpp/src/client/table
-
+
ydb/public/api/grpc
ydb/public/api/grpc/draft
ydb/public/api/protos
diff --git a/ydb/public/sdk/cpp/tests/integration/topic/basic_usage.cpp b/ydb/public/sdk/cpp/tests/integration/topic/basic_usage.cpp
index 9a1d6866dc1..d6360431362 100644
--- a/ydb/public/sdk/cpp/tests/integration/topic/basic_usage.cpp
+++ b/ydb/public/sdk/cpp/tests/integration/topic/basic_usage.cpp
@@ -12,6 +12,10 @@
#include <future>
+
+static const bool EnableDirectRead = !std::string{std::getenv("PQ_EXPERIMENTAL_DIRECT_READ") ? std::getenv("PQ_EXPERIMENTAL_DIRECT_READ") : ""}.empty();
+
+
namespace NYdb::NPersQueue::NTests {
class TSimpleWriteSessionTestAdapter {
@@ -301,7 +305,9 @@ TEST_F(BasicUsage, WriteRead) {
{
auto readSettings = TReadSessionSettings()
.ConsumerName("test-consumer")
- .AppendTopics(GetTopicPath());
+ .AppendTopics(GetTopicPath())
+ // .DirectRead(EnableDirectRead)
+ ;
auto readSession = client.CreateReadSession(readSettings);
auto event = readSession->GetEvent(true);
@@ -337,7 +343,9 @@ TEST_F(BasicUsage, MaxByteSizeEqualZero) {
auto readSettings = TReadSessionSettings()
.ConsumerName("test-consumer")
- .AppendTopics(GetTopicPath());
+ .AppendTopics(GetTopicPath())
+ // .DirectRead(EnableDirectRead)
+ ;
auto readSession = client.CreateReadSession(readSettings);
auto event = readSession->GetEvent(true);
@@ -406,7 +414,9 @@ TEST_F(BasicUsage, WriteAndReadSomeMessagesWithSyncCompression) {
readSettings
.ConsumerName("test-consumer")
.MaxMemoryUsageBytes(1_MB)
- .AppendTopics(GetTopicPath());
+ .AppendTopics(GetTopicPath())
+ // .DirectRead(EnableDirectRead)
+ ;
std::cerr << "Session was created" << std::endl;
@@ -477,7 +487,9 @@ TEST_F(BasicUsage, SessionNotDestroyedWhileCompressionInFlight) {
.ConsumerName("test-consumer")
.MaxMemoryUsageBytes(1_MB)
.AppendTopics(GetTopicPath())
- .DecompressionExecutor(stepByStepExecutor);
+ .DecompressionExecutor(stepByStepExecutor)
+ // .DirectRead(EnableDirectRead)
+ ;
auto f = std::async(std::launch::async,
[readSettings, writeSettings, &topicClient,
@@ -591,7 +603,9 @@ TEST_F(BasicUsage, SessionNotDestroyedWhileUserEventHandlingInFlight) {
auto readSettings = TReadSessionSettings()
.ConsumerName("test-consumer")
.MaxMemoryUsageBytes(1_MB)
- .AppendTopics(GetTopicPath());
+ .AppendTopics(GetTopicPath())
+ // .DirectRead(EnableDirectRead)
+ ;
readSettings.EventHandlers_
.HandlersExecutor(stepByStepExecutor);
@@ -721,7 +735,9 @@ TEST_F(BasicUsage, ReadSessionCorrectClose) {
.MaxMemoryUsageBytes(1_MB)
.Decompress(false)
.RetryPolicy(NYdb::NTopic::IRetryPolicy::GetNoRetryPolicy())
- .AppendTopics(GetTopicPath());
+ .AppendTopics(GetTopicPath())
+ // .DirectRead(EnableDirectRead)
+ ;
readSettings.EventHandlers_.SimpleDataHandlers(
[]
@@ -767,7 +783,9 @@ TEST_F(BasicUsage, ConfirmPartitionSessionWithCommitOffset) {
// Read messages:
auto settings = NTopic::TReadSessionSettings()
.ConsumerName("test-consumer")
- .AppendTopics(GetTopicPath());
+ .AppendTopics(GetTopicPath())
+ // .DirectRead(EnableDirectRead)
+ ;
TTopicClient client(driver);
auto reader = client.CreateReadSession(settings);
@@ -873,7 +891,9 @@ TEST_F(BasicUsage, TWriteSession_WriteEncoded) {
auto readSettings = TReadSessionSettings()
.ConsumerName("test-consumer")
- .AppendTopics(GetTopicPath());
+ .AppendTopics(GetTopicPath())
+ // .DirectRead(EnableDirectRead)
+ ;
std::shared_ptr<IReadSession> readSession = client.CreateReadSession(readSettings);
std::uint32_t readMessageCount = 0;
while (readMessageCount < 4) {
@@ -1050,7 +1070,9 @@ TEST_F(TSettingsValidation, ValidateSettingsFailOnStart) {
auto readSettings = TReadSessionSettings()
.ConsumerName("test-consumer")
.MaxMemoryUsageBytes(0)
- .AppendTopics(GetTopicPath());
+ .AppendTopics(GetTopicPath())
+ // .DirectRead(EnableDirectRead)
+ ;
auto readSession = client.CreateReadSession(readSettings);
auto event = readSession->GetEvent(true);