diff options
author | ildar-khisam <ikhis@ydb.tech> | 2022-08-08 17:40:23 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2022-08-08 17:40:23 +0300 |
commit | 74dcc580b7a2011a5bb44e20d1f4fa2a5275e0f9 (patch) | |
tree | dfd57c61efb15cd49f0ad0977f2239fc08927387 | |
parent | f1fd089af7313d638ec11bb409a57c024bf32fa5 (diff) | |
download | ydb-74dcc580b7a2011a5bb44e20d1f4fa2a5275e0f9.tar.gz |
implement read session for topic sdk
implement read session for topic sdk
44 files changed, 4505 insertions, 2149 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index eba8c4ae19..754a6f6a77 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -1428,3 +1428,5 @@ add_subdirectory(ydb/library/yql/providers/s3/range_helpers/ut) add_subdirectory(ydb/library/yql/udfs/common/stat/ut) add_subdirectory(ydb/library/yql/udfs/common/topfreq/ut) add_subdirectory(ydb/public/sdk/cpp/client/extensions/discovery_mutator/ut) +add_subdirectory(ydb/public/sdk/cpp/examples/topic_reader/eventloop) +add_subdirectory(ydb/public/sdk/cpp/examples/topic_reader/simple) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index e26d89c0d0..21266f9470 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -1435,3 +1435,5 @@ add_subdirectory(ydb/library/yql/providers/s3/range_helpers/ut) add_subdirectory(ydb/library/yql/udfs/common/stat/ut) add_subdirectory(ydb/library/yql/udfs/common/topfreq/ut) add_subdirectory(ydb/public/sdk/cpp/client/extensions/discovery_mutator/ut) +add_subdirectory(ydb/public/sdk/cpp/examples/topic_reader/eventloop) +add_subdirectory(ydb/public/sdk/cpp/examples/topic_reader/simple) diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index 938c720d90..c9ca774d29 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -347,7 +347,7 @@ message StreamReadMessage { google.protobuf.Timestamp created_at = 3; // Compressed client message body. bytes data = 5; - // Uncompressed size of client message body. + // Uncompressed size of client message body. // sent as is from WriteRequest, without check on server side. May be empty (for writes from old client) or wrong (if bug in writer). // Use it for optimization purposes only, don't trust it. int64 uncompressed_size = 6; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp index e0e9fd6857..5aa49006b8 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp @@ -54,6 +54,10 @@ ERetryErrorClass GetRetryErrorClassV2(EStatus status) { } } +TString IssuesSingleLineString(const NYql::TIssues& issues) { + return SubstGlobalCopy(issues.ToString(), '\n', ' '); +} + void Cancel(NGrpc::IQueueClientContextPtr& context) { if (context) { context->Cancel(); @@ -70,26 +74,6 @@ NYql::TIssues MakeIssueWithSubIssues(const TString& description, const NYql::TIs return issues; } -size_t CalcDataSize(const TReadSessionEvent::TEvent& event) { - if (const TReadSessionEvent::TDataReceivedEvent* dataEvent = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&event)) { - size_t len = 0; - if (dataEvent->IsCompressedMessages()) { - for (const auto& msg : dataEvent->GetCompressedMessages()) { - len += msg.GetData().size(); - } - } else { - for (const auto& msg : dataEvent->GetMessages()) { - if (!msg.HasException()) { - len += msg.GetData().size(); - } - } - } - return len; - } else { - return 0; - } -} - static TStringBuf SplitPort(TStringBuf endpoint) { for (int i = endpoint.Size() - 1; i >= 0; --i) { if (endpoint[i] == ':') { diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h index 8c140e2217..69b5c1aa7a 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h @@ -20,7 +20,37 @@ NYql::TIssues MakeIssueWithSubIssues(const TString& description, const NYql::TIs TString IssuesSingleLineString(const NYql::TIssues& issues); -size_t CalcDataSize(const TReadSessionEvent::TEvent& event); +template <typename TReadSessionEvent> +size_t CalcDataSize(const typename TReadSessionEvent::TEvent& event) { + constexpr bool UseMigrationProtocol = std::is_same_v<TReadSessionEvent, NPersQueue::TReadSessionEvent>; + + if (const typename TReadSessionEvent::TDataReceivedEvent* dataEvent = + std::get_if<typename TReadSessionEvent::TDataReceivedEvent>(&event)) { + size_t len = 0; + + bool hasCompressedMsgs = [&dataEvent](){ + if constexpr (UseMigrationProtocol) { + return dataEvent->IsCompressedMessages(); + } else { + return dataEvent->HasCompressedMessages(); + } + }(); + + if (hasCompressedMsgs) { + for (const auto& msg : dataEvent->GetCompressedMessages()) { + len += msg.GetData().size(); + } + } else { + for (const auto& msg : dataEvent->GetMessages()) { + if (!msg.HasException()) { + len += msg.GetData().size(); + } + } + } + return len; + } + return 0; +} template <class TMessage> bool IsErrorMessage(const TMessage& serverMessage) { @@ -271,14 +301,15 @@ private: // - packing events for waiters; // - waking up waiters. // Thread safe. -template <class TSettings_, class TEvent_, class TEventInfo_ = TBaseEventInfo<TEvent_>> +template <class TSettings_, class TEvent_, class TClosedEvent_, class TExecutor_, class TEventInfo_ = TBaseEventInfo<TEvent_>> class TBaseSessionEventsQueue : public ISignalable { protected: - using TSelf = TBaseSessionEventsQueue<TSettings_, TEvent_, TEventInfo_>; + using TSelf = TBaseSessionEventsQueue<TSettings_, TEvent_, TClosedEvent_, TExecutor_, TEventInfo_>; using TSettings = TSettings_; using TEvent = TEvent_; using TEventInfo = TEventInfo_; - + using TClosedEvent = TClosedEvent_; + using TExecutor = TExecutor_; // Template for visitor implementation. struct TBaseHandlersVisitor { @@ -316,7 +347,7 @@ protected: }); } - virtual void Post(const IExecutor::TPtr& executor, IExecutor::TFunction&& f) { + virtual void Post(const typename TExecutor::TPtr& executor, typename TExecutor::TFunction&& f) { executor->Post(std::move(f)); } @@ -379,7 +410,7 @@ protected: std::queue<TEventInfo> Events; TCondVar CondVar; TMutex Mutex; - TMaybe<TSessionClosedEvent> CloseEvent; + TMaybe<TClosedEvent> CloseEvent; std::atomic<bool> Closed = false; }; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp index 8eb1bfc4e9..857ea12b17 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp @@ -20,8 +20,6 @@ namespace NYdb::NPersQueue { static const TString DRIVER_IS_STOPPING_DESCRIPTION = "Driver is stopping"; -static const bool RangesMode = !GetEnv("PQ_OFFSET_RANGES_MODE").empty(); - std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent, ui64 index) { if (dataReceivedEvent.IsCompressedMessages()) { const auto& msg = dataReceivedEvent.GetCompressedMessages()[index]; @@ -31,25 +29,8 @@ std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceiv return {msg.GetOffset(), msg.GetOffset() + 1}; } -TString IssuesSingleLineString(const NYql::TIssues& issues) { - return SubstGlobalCopy(issues.ToString(), '\n', ' '); -} - -void MakeCountersNotNull(TReaderCounters& counters); -bool HasNullCounters(TReaderCounters& counters); - -class TErrorHandler : public IErrorHandler { -public: - TErrorHandler(std::weak_ptr<TReadSession> session) - : Session(std::move(session)) - { - } - - void AbortSession(TSessionClosedEvent&& closeEvent) override; - -private: - std::weak_ptr<TReadSession> Session; -}; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TReadSession TStringBuilder TReadSession::GetLogPrefix() const { return TStringBuilder() << GetDatabaseLogPrefix(DbDriverState->Database) << "[" << SessionId << "] "; @@ -90,8 +71,8 @@ Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest TReadSession::MakeClus } void TReadSession::Start() { - ErrorHandler = MakeIntrusive<TErrorHandler>(weak_from_this()); - EventsQueue = std::make_shared<TReadSessionEventsQueue>(Settings, weak_from_this()); + ErrorHandler = MakeIntrusive<TErrorHandler<true>>(weak_from_this()); + EventsQueue = std::make_shared<TReadSessionEventsQueue<true>>(Settings, weak_from_this()); if (!ValidateSettings()) { return; @@ -166,7 +147,7 @@ void TReadSession::StartClusterDiscovery() { void TReadSession::ProceedWithoutClusterDiscovery() { - TDeferredActions deferred; + TDeferredActions<true> deferred; with_lock (Lock) { if (Aborting) { return; @@ -183,7 +164,7 @@ void TReadSession::ProceedWithoutClusterDiscovery() { ScheduleDumpCountersToLog(); } -void TReadSession::CreateClusterSessionsImpl(TDeferredActions& deferred) { +void TReadSession::CreateClusterSessionsImpl(TDeferredActions<true>& deferred) { // Create cluster sessions. ui64 partitionStreamIdStart = 1; const size_t clusterSessionsCount = ClusterSessions.size(); @@ -205,7 +186,7 @@ void TReadSession::CreateClusterSessionsImpl(TDeferredActions& deferred) { return; } clusterSessionInfo.Session = - std::make_shared<TSingleClusterReadSessionImpl>( + std::make_shared<TSingleClusterReadSessionImpl<true>>( sessionSettings, DbDriverState->Database, SessionId, @@ -222,7 +203,7 @@ void TReadSession::CreateClusterSessionsImpl(TDeferredActions& deferred) { } void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult& result) { - TDeferredActions deferred; + TDeferredActions<true> deferred; with_lock (Lock) { if (Aborting) { return; @@ -322,7 +303,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu ScheduleDumpCountersToLog(); } -void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions& deferred) { +void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions<true>& deferred) { Log.Write(TLOG_DEBUG, GetLogPrefix() << "Restart cluster discovery in " << delay); auto startCallback = [self = weak_from_this()](bool ok) { if (ok) { @@ -347,18 +328,7 @@ bool TReadSession::Close(TDuration timeout) { // Log final counters. DumpCountersToLog(); - with_lock (Lock) { - if (ClusterDiscoveryDelayContext) { - ClusterDiscoveryDelayContext->Cancel(); - ClusterDiscoveryDelayContext.reset(); - } - if (DumpCountersContext) { - DumpCountersContext->Cancel(); - DumpCountersContext.reset(); - } - } - - std::vector<TSingleClusterReadSessionImpl::TPtr> sessions; + std::vector<TSingleClusterReadSessionImpl<true>::TPtr> sessions; NThreading::TPromise<bool> promise = NThreading::NewPromise<bool>(); std::shared_ptr<std::atomic<size_t>> count = std::make_shared<std::atomic<size_t>>(0); auto callback = [=]() mutable { @@ -367,7 +337,7 @@ bool TReadSession::Close(TDuration timeout) { } }; - TDeferredActions deferred; + TDeferredActions<true> deferred; with_lock (Lock) { if (Closing || Aborting) { return false; @@ -432,7 +402,7 @@ bool TReadSession::Close(TDuration timeout) { return result; } -void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions& deferred) { +void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions<true>& deferred) { if (!Aborting) { Aborting = true; Log.Write(TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString()); @@ -453,11 +423,11 @@ void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions& } } -void TReadSession::AbortImpl(EStatus statusCode, NYql::TIssues&& issues, TDeferredActions& deferred) { +void TReadSession::AbortImpl(EStatus statusCode, NYql::TIssues&& issues, TDeferredActions<true>& deferred) { AbortImpl(TSessionClosedEvent(statusCode, std::move(issues)), deferred); } -void TReadSession::AbortImpl(EStatus statusCode, const TString& message, TDeferredActions& deferred) { +void TReadSession::AbortImpl(EStatus statusCode, const TString& message, TDeferredActions<true>& deferred) { NYql::TIssues issues; issues.AddIssue(message); AbortImpl(statusCode, std::move(issues), deferred); @@ -474,7 +444,7 @@ void TReadSession::Abort(EStatus statusCode, const TString& message) { } void TReadSession::Abort(TSessionClosedEvent&& closeEvent) { - TDeferredActions deferred; + TDeferredActions<true> deferred; with_lock (Lock) { AbortImpl(std::move(closeEvent), deferred); } @@ -566,7 +536,7 @@ void TReadSession::DumpCountersToLog(size_t timeNumber) { const bool dumpSessionsStatistics = timeNumber % 600 == 0; // Every 10 minutes. *Settings.Counters_->CurrentSessionLifetimeMs = (TInstant::Now() - StartSessionTime).MilliSeconds(); - std::vector<TSingleClusterReadSessionImpl::TPtr> sessions; + std::vector<TSingleClusterReadSessionImpl<true>::TPtr> sessions; with_lock (Lock) { if (Closing || Aborting) { return; @@ -638,985 +608,8 @@ void TReadSession::ScheduleDumpCountersToLog(size_t timeNumber) { } } -TPartitionStreamImpl::~TPartitionStreamImpl() = default; - -TLog TPartitionStreamImpl::GetLog() const { - if (auto session = Session.lock()) { - return session->GetLog(); - } - return {}; -} - -void TPartitionStreamImpl::Commit(ui64 startOffset, ui64 endOffset) { - std::vector<std::pair<ui64, ui64>> toCommit; - if (auto sessionShared = Session.lock()) { - Y_VERIFY(endOffset > startOffset); - with_lock(sessionShared->Lock) { - if (!AddToCommitRanges(startOffset, endOffset, true)) // Add range for real commit always. - return; - - Y_VERIFY(!Commits.Empty()); - for (auto c : Commits) { - if (c.first >= endOffset) break; // Commit only gaps before client range. - toCommit.emplace_back(c); - } - Commits.EraseInterval(0, endOffset); // Drop only committed ranges; - } - for (auto range: toCommit) { - sessionShared->Commit(this, range.first, range.second); - } - } -} - -void TPartitionStreamImpl::RequestStatus() { - if (auto sessionShared = Session.lock()) { - sessionShared->RequestPartitionStreamStatus(this); - } -} - -void TPartitionStreamImpl::ConfirmCreate(TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset) { - if (auto sessionShared = Session.lock()) { - sessionShared->ConfirmPartitionStreamCreate(this, readOffset, commitOffset); - } -} - -void TPartitionStreamImpl::ConfirmDestroy() { - if (auto sessionShared = Session.lock()) { - sessionShared->ConfirmPartitionStreamDestroy(this); - } -} - -void TPartitionStreamImpl::StopReading() { - Y_FAIL("Not implemented"); // TODO -} - -void TPartitionStreamImpl::ResumeReading() { - Y_FAIL("Not implemented"); // TODO -} - -void TPartitionStreamImpl::SignalReadyEvents(TReadSessionEventsQueue* queue, TDeferredActions& deferred) { - for (auto& event : EventsQueue) { - event.Signal(this, queue, deferred); - - if (!event.IsReady()) { - break; - } - } -} - -TStringBuilder TSingleClusterReadSessionImpl::GetLogPrefix() const { - return TStringBuilder() << GetDatabaseLogPrefix(Database) << "[" << SessionId << "] [" << ClusterName << "] "; -} - -void TSingleClusterReadSessionImpl::Start() { - Settings.DecompressionExecutor_->Start(); - Settings.EventHandlers_.HandlersExecutor_->Start(); - if (!Reconnect(TPlainStatus())) { - ErrorHandler->AbortSession(EStatus::ABORTED, DRIVER_IS_STOPPING_DESCRIPTION); - } -} - -bool TSingleClusterReadSessionImpl::Reconnect(const TPlainStatus& status) { - TDuration delay = TDuration::Zero(); - NGrpc::IQueueClientContextPtr delayContext = nullptr; - NGrpc::IQueueClientContextPtr connectContext = ClientContext->CreateContext(); - NGrpc::IQueueClientContextPtr connectTimeoutContext = ClientContext->CreateContext(); - if (!connectContext || !connectTimeoutContext) { - return false; - } - - // Previous operations contexts. - NGrpc::IQueueClientContextPtr prevConnectContext; - NGrpc::IQueueClientContextPtr prevConnectTimeoutContext; - NGrpc::IQueueClientContextPtr prevConnectDelayContext; - - if (!status.Ok()) { - Log.Write( - TLOG_INFO, - GetLogPrefix() << "Got error. Status: " << status.Status - << ". Description: " << IssuesSingleLineString(status.Issues) - ); - } - - TDeferredActions deferred; - with_lock (Lock) { - if (Aborting) { - Cancel(connectContext); - Cancel(connectTimeoutContext); - return false; - } - Processor = nullptr; - WaitingReadResponse = false; - ServerMessage = std::make_shared<Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>(); - ++ConnectionGeneration; - if (RetryState) { - TMaybe<TDuration> nextDelay = RetryState->GetNextRetryDelay(status.Status); - if (nextDelay) { - delay = *nextDelay; - delayContext = ClientContext->CreateContext(); - if (!delayContext) { - return false; - } - Log.Write( - TLOG_DEBUG, - GetLogPrefix() << "Reconnecting session to cluster " << ClusterName << " in "<< delay - ); - } else { - return false; - } - } else { - RetryState = Settings.RetryPolicy_->CreateRetryState(); - } - ++ConnectionAttemptsDone; - - // Set new context - prevConnectContext = std::exchange(ConnectContext, connectContext); - prevConnectTimeoutContext = std::exchange(ConnectTimeoutContext, connectTimeoutContext); - prevConnectDelayContext = std::exchange(ConnectDelayContext, delayContext); - - Y_ASSERT(ConnectContext); - Y_ASSERT(ConnectTimeoutContext); - Y_ASSERT((delay == TDuration::Zero()) == !ConnectDelayContext); - - // Destroy all partition streams before connecting. - DestroyAllPartitionStreamsImpl(deferred); - } - - // Cancel previous operations. - Cancel(prevConnectContext); - Cancel(prevConnectTimeoutContext); - Cancel(prevConnectDelayContext); - - auto connectCallback = [weakThis = weak_from_this(), connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) { - if (auto sharedThis = weakThis.lock()) { - sharedThis->OnConnect(std::move(st), std::move(processor), connectContext); //OnConnect could be called inplace! - } - }; - - auto connectTimeoutCallback = [weakThis = weak_from_this(), connectTimeoutContext = connectTimeoutContext](bool ok) { - if (ok) { - if (auto sharedThis = weakThis.lock()) { - sharedThis->OnConnectTimeout(connectTimeoutContext); - } - } - }; - - Y_ASSERT(connectContext); - Y_ASSERT(connectTimeoutContext); - Y_ASSERT((delay == TDuration::Zero()) == !delayContext); - ConnectionFactory->CreateProcessor( - std::move(connectCallback), - TRpcRequestSettings::Make(Settings), - std::move(connectContext), - TDuration::Seconds(30) /* connect timeout */, // TODO: make connect timeout setting. - std::move(connectTimeoutContext), - std::move(connectTimeoutCallback), - delay, - std::move(delayContext)); - return true; -} - -void TSingleClusterReadSessionImpl::BreakConnectionAndReconnectImpl(TPlainStatus&& status, TDeferredActions& deferred) { - Log.Write( - TLOG_INFO, - GetLogPrefix() << "Break connection due to unexpected message from server. Status: " << status.Status - << ", Issues: \"" << IssuesSingleLineString(status.Issues) << "\"" - ); - - Processor->Cancel(); - Processor = nullptr; - RetryState = Settings.RetryPolicy_->CreateRetryState(); // Explicitly create retry state to determine whether we should connect to server again. - - deferred.DeferReconnection(shared_from_this(), ErrorHandler, std::move(status)); -} - -void TSingleClusterReadSessionImpl::OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext) { - with_lock (Lock) { - if (ConnectTimeoutContext == connectTimeoutContext) { - Cancel(ConnectContext); - ConnectContext = nullptr; - ConnectTimeoutContext = nullptr; - ConnectDelayContext = nullptr; - - if (Closing || Aborting) { - CallCloseCallbackImpl(); - return; - } - } else { - return; - } - } - - ++*Settings.Counters_->Errors; - TStringBuilder description; - description << "Failed to establish connection to server. Attempts done: " << ConnectionAttemptsDone; - if (!Reconnect(TPlainStatus(EStatus::TIMEOUT, description))) { - ErrorHandler->AbortSession(EStatus::TIMEOUT, description); - } -} - -void TSingleClusterReadSessionImpl::OnConnect(TPlainStatus&& st, typename IProcessor::TPtr&& processor, const NGrpc::IQueueClientContextPtr& connectContext) { - TDeferredActions deferred; - with_lock (Lock) { - if (ConnectContext == connectContext) { - Cancel(ConnectTimeoutContext); - ConnectContext = nullptr; - ConnectTimeoutContext = nullptr; - ConnectDelayContext = nullptr; - - if (Closing || Aborting) { - CallCloseCallbackImpl(); - return; - } - - if (st.Ok()) { - Processor = std::move(processor); - RetryState = nullptr; - ConnectionAttemptsDone = 0; - InitImpl(deferred); - return; - } - } else { - return; - } - } - - if (!st.Ok()) { - ++*Settings.Counters_->Errors; - if (!Reconnect(st)) { - ErrorHandler->AbortSession(st.Status, - MakeIssueWithSubIssues( - TStringBuilder() << "Failed to establish connection to server \"" << st.Endpoint << "\" ( cluster " << ClusterName << "). Attempts done: " - << ConnectionAttemptsDone, - st.Issues)); - } - } -} - -void TSingleClusterReadSessionImpl::InitImpl(TDeferredActions& deferred) { // Assumes that we're under lock. - Log.Write(TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session"); - Ydb::PersQueue::V1::MigrationStreamingReadClientMessage req; - auto& init = *req.mutable_init_request(); - init.set_ranges_mode(GetRangesMode()); - for (const TTopicReadSettings& topic : Settings.Topics_) { - auto* topicSettings = init.add_topics_read_settings(); - topicSettings->set_topic(topic.Path_); - if (topic.StartingMessageTimestamp_) { - topicSettings->set_start_from_written_at_ms(topic.StartingMessageTimestamp_->MilliSeconds()); - } - for (ui64 groupId : topic.PartitionGroupIds_) { - topicSettings->add_partition_group_ids(groupId); - } - } - init.set_consumer(Settings.ConsumerName_); - init.set_read_only_original(Settings.ReadOnlyOriginal_); - init.mutable_read_params()->set_max_read_size(Settings.MaxMemoryUsageBytes_); - if (Settings.MaxTimeLag_) { - init.set_max_lag_duration_ms(Settings.MaxTimeLag_->MilliSeconds()); - } - if (Settings.StartingMessageTimestamp_) { - init.set_start_from_written_at_ms(Settings.StartingMessageTimestamp_->MilliSeconds()); - } - - WriteToProcessorImpl(std::move(req)); - ReadFromProcessorImpl(deferred); -} - -void TSingleClusterReadSessionImpl::ContinueReadingDataImpl() { // Assumes that we're under lock. - if (!Closing - && !Aborting - && !WaitingReadResponse - && !DataReadingSuspended - && Processor - && CompressedDataSize < GetCompressedDataSizeLimit() - && static_cast<size_t>(CompressedDataSize + DecompressedDataSize) < Settings.MaxMemoryUsageBytes_) - { - Ydb::PersQueue::V1::MigrationStreamingReadClientMessage req; - req.mutable_read(); - - WriteToProcessorImpl(std::move(req)); - WaitingReadResponse = true; - } -} - -bool TSingleClusterReadSessionImpl::IsActualPartitionStreamImpl(const TPartitionStreamImpl* partitionStream) { // Assumes that we're under lock. - auto actualPartitionStreamIt = PartitionStreams.find(partitionStream->GetAssignId()); - return actualPartitionStreamIt != PartitionStreams.end() - && actualPartitionStreamIt->second->GetPartitionStreamId() == partitionStream->GetPartitionStreamId(); -} - -void TSingleClusterReadSessionImpl::ConfirmPartitionStreamCreate(const TPartitionStreamImpl* partitionStream, TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset) { - TStringBuilder commitOffsetLogStr; - if (commitOffset) { - commitOffsetLogStr << ". Commit offset: " << *commitOffset; - } - Log.Write( - TLOG_INFO, - GetLogPrefix() << "Confirm partition stream create. Partition stream id: " << partitionStream->GetPartitionStreamId() - << ". Cluster: \"" << partitionStream->GetCluster() << "\". Topic: \"" << partitionStream->GetTopicPath() - << "\". Partition: " << partitionStream->GetPartitionId() - << ". Read offset: " << readOffset << commitOffsetLogStr - ); - - with_lock (Lock) { - if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation. - Log.Write( - TLOG_DEBUG, - GetLogPrefix() << "Skip partition stream create confirm. Partition stream id: " - << partitionStream->GetPartitionStreamId() - ); - return; - } - - Ydb::PersQueue::V1::MigrationStreamingReadClientMessage req; - auto& startRead = *req.mutable_start_read(); - startRead.mutable_topic()->set_path(partitionStream->GetTopicPath()); - startRead.set_cluster(partitionStream->GetCluster()); - startRead.set_partition(partitionStream->GetPartitionId()); - startRead.set_assign_id(partitionStream->GetAssignId()); - if (readOffset) { - startRead.set_read_offset(*readOffset); - } - if (commitOffset) { - startRead.set_commit_offset(*commitOffset); - } - - WriteToProcessorImpl(std::move(req)); - } -} - -void TSingleClusterReadSessionImpl::ConfirmPartitionStreamDestroy(TPartitionStreamImpl* partitionStream) { - Log.Write( - TLOG_INFO, - GetLogPrefix() << "Confirm partition stream destroy. Partition stream id: " - << partitionStream->GetPartitionStreamId() - << ". Cluster: \"" << partitionStream->GetCluster() << "\". Topic: \"" << partitionStream->GetTopicPath() - << "\". Partition: " << partitionStream->GetPartitionId() - ); - - TDeferredActions deferred; - with_lock (Lock) { - if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation. - Log.Write( - TLOG_DEBUG, - GetLogPrefix() << "Skip partition stream destroy confirm. Partition stream id: " - << partitionStream->GetPartitionStreamId() - ); - return; - } - - CookieMapping.RemoveMapping(partitionStream->GetPartitionStreamId()); - PartitionStreams.erase(partitionStream->GetAssignId()); - EventsQueue->PushEvent({partitionStream, weak_from_this(), TReadSessionEvent::TPartitionStreamClosedEvent(partitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::DestroyConfirmedByUser)}, deferred); - - Ydb::PersQueue::V1::MigrationStreamingReadClientMessage req; - auto& released = *req.mutable_released(); - released.mutable_topic()->set_path(partitionStream->GetTopicPath()); - released.set_cluster(partitionStream->GetCluster()); - released.set_partition(partitionStream->GetPartitionId()); - released.set_assign_id(partitionStream->GetAssignId()); - - WriteToProcessorImpl(std::move(req)); - } -} - -void TSingleClusterReadSessionImpl::Commit(const TPartitionStreamImpl* partitionStream, ui64 startOffset, ui64 endOffset) { - Log.Write( - TLOG_DEBUG, - GetLogPrefix() << "Commit offsets [" << startOffset << ", " << endOffset - << "). Partition stream id: " << partitionStream->GetPartitionStreamId() - ); - with_lock (Lock) { - if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation. - return; - } - Ydb::PersQueue::V1::MigrationStreamingReadClientMessage req; - bool hasSomethingToCommit = false; - if (GetRangesMode()) { - hasSomethingToCommit = true; - auto* range = req.mutable_commit()->add_offset_ranges(); - range->set_assign_id(partitionStream->GetAssignId()); - range->set_start_offset(startOffset); - range->set_end_offset(endOffset); - } else { - for (ui64 offset = startOffset; offset < endOffset; ++offset) { - TPartitionCookieMapping::TCookie::TPtr cookie = CookieMapping.CommitOffset(partitionStream->GetPartitionStreamId(), offset); - if (cookie) { - hasSomethingToCommit = true; - auto* cookieInfo = req.mutable_commit()->add_cookies(); - cookieInfo->set_assign_id(partitionStream->GetAssignId()); - cookieInfo->set_partition_cookie(cookie->Cookie); - } - } - } - if (hasSomethingToCommit) { - WriteToProcessorImpl(std::move(req)); - } - } -} - -void TSingleClusterReadSessionImpl::RequestPartitionStreamStatus(const TPartitionStreamImpl* partitionStream) { - Log.Write( - TLOG_DEBUG, - GetLogPrefix() << "Requesting status for partition stream id: " << partitionStream->GetPartitionStreamId() - ); - with_lock (Lock) { - if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation. - return; - } - - Ydb::PersQueue::V1::MigrationStreamingReadClientMessage req; - auto& status = *req.mutable_status(); - status.mutable_topic()->set_path(partitionStream->GetTopicPath()); - status.set_cluster(partitionStream->GetCluster()); - status.set_partition(partitionStream->GetPartitionId()); - status.set_assign_id(partitionStream->GetAssignId()); - - WriteToProcessorImpl(std::move(req)); - } -} - -void TSingleClusterReadSessionImpl::OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) { - Log.Write(TLOG_DEBUG, GetLogPrefix() << "Read session event " << DebugString(event)); - const i64 bytesCount = static_cast<i64>(CalcDataSize(event)); - Y_ASSERT(bytesCount >= 0); - - if (!std::get_if<TReadSessionEvent::TDataReceivedEvent>(&event)) { // Event is not data event. - return; - } - - *Settings.Counters_->MessagesInflight -= std::get<TReadSessionEvent::TDataReceivedEvent>(event).GetMessagesCount(); - *Settings.Counters_->BytesInflightTotal -= bytesCount; - *Settings.Counters_->BytesInflightUncompressed -= bytesCount; - - TDeferredActions deferred; - with_lock (Lock) { - UpdateMemoryUsageStatisticsImpl(); - Y_VERIFY(bytesCount <= DecompressedDataSize); - DecompressedDataSize -= bytesCount; - ContinueReadingDataImpl(); - StartDecompressionTasksImpl(deferred); - } -} - -void TSingleClusterReadSessionImpl::WriteToProcessorImpl(Ydb::PersQueue::V1::MigrationStreamingReadClientMessage&& req) { // Assumes that we're under lock. - if (Processor) { - Processor->Write(std::move(req)); - } -} - -bool TSingleClusterReadSessionImpl::HasCommitsInflightImpl() const { - for (const auto& [id, partitionStream] : PartitionStreams) { - if (partitionStream->HasCommitsInflight()) - return true; - } - return false; -} - -void TSingleClusterReadSessionImpl::ReadFromProcessorImpl(TDeferredActions& deferred) { // Assumes that we're under lock. - if (Closing && !HasCommitsInflightImpl()) { - Processor->Cancel(); - CallCloseCallbackImpl(); - return; - } - - if (Processor) { - ServerMessage->Clear(); - - auto callback = [weakThis = weak_from_this(), - connectionGeneration = ConnectionGeneration, - // Capture message & processor not to read in freed memory. - serverMessage = ServerMessage, - processor = Processor](NGrpc::TGrpcStatus&& grpcStatus) { - if (auto sharedThis = weakThis.lock()) { - sharedThis->OnReadDone(std::move(grpcStatus), connectionGeneration); - } - }; - - deferred.DeferReadFromProcessor(Processor, ServerMessage.get(), std::move(callback)); - } -} - -void TSingleClusterReadSessionImpl::OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration) { - TPlainStatus errorStatus; - if (!grpcStatus.Ok()) { - errorStatus = TPlainStatus(std::move(grpcStatus)); - } - - TDeferredActions deferred; - with_lock (Lock) { - if (Aborting) { - return; - } - - if (connectionGeneration != ConnectionGeneration) { - return; // Message from previous connection. Ignore. - } - if (errorStatus.Ok()) { - if (IsErrorMessage(*ServerMessage)) { - errorStatus = MakeErrorFromProto(*ServerMessage); - } else { - switch (ServerMessage->response_case()) { - case Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::kInitResponse: - OnReadDoneImpl(std::move(*ServerMessage->mutable_init_response()), deferred); - break; - case Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::kDataBatch: - OnReadDoneImpl(std::move(*ServerMessage->mutable_data_batch()), deferred); - break; - case Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::kAssigned: - OnReadDoneImpl(std::move(*ServerMessage->mutable_assigned()), deferred); - break; - case Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::kRelease: - OnReadDoneImpl(std::move(*ServerMessage->mutable_release()), deferred); - break; - case Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::kCommitted: - OnReadDoneImpl(std::move(*ServerMessage->mutable_committed()), deferred); - break; - case Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::kPartitionStatus: - OnReadDoneImpl(std::move(*ServerMessage->mutable_partition_status()), deferred); - break; - case Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::RESPONSE_NOT_SET: - errorStatus = TPlainStatus::Internal("Unexpected response from server"); - break; - } - if (errorStatus.Ok()) { - ReadFromProcessorImpl(deferred); // Read next. - } - } - } - } - if (!errorStatus.Ok()) { - ++*Settings.Counters_->Errors; - RetryState = Settings.RetryPolicy_->CreateRetryState(); // Explicitly create retry state to determine whether we should connect to server again. - if (!Reconnect(errorStatus)) { - ErrorHandler->AbortSession(std::move(errorStatus)); - } - } -} - -void TSingleClusterReadSessionImpl::OnReadDoneImpl(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::InitResponse&& msg, TDeferredActions& deferred) { // Assumes that we're under lock. - Y_UNUSED(deferred); - - Log.Write(TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id()); - - // Successful init. Do nothing. - ContinueReadingDataImpl(); -} - -void TSingleClusterReadSessionImpl::OnReadDoneImpl(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch&& msg, TDeferredActions& deferred) { // Assumes that we're under lock. - if (Closing || Aborting) { - return; // Don't process new data. - } - UpdateMemoryUsageStatisticsImpl(); - for (Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::PartitionData& partitionData : *msg.mutable_partition_data()) { - auto partitionStreamIt = PartitionStreams.find(partitionData.cookie().assign_id()); - if (partitionStreamIt == PartitionStreams.end()) { - ++*Settings.Counters_->Errors; - BreakConnectionAndReconnectImpl(EStatus::INTERNAL_ERROR, - TStringBuilder() << "Got unexpected partition stream data message. Topic: " - << partitionData.topic() - << ". Partition: " << partitionData.partition() << " AssignId: " << partitionData.cookie().assign_id(), - deferred); - return; - } - const TIntrusivePtr<TPartitionStreamImpl>& partitionStream = partitionStreamIt->second; - - TPartitionCookieMapping::TCookie::TPtr cookie = MakeIntrusive<TPartitionCookieMapping::TCookie>(partitionData.cookie().partition_cookie(), partitionStream); - - ui64 firstOffset = std::numeric_limits<ui64>::max(); - ui64 currentOffset = std::numeric_limits<ui64>::max(); - ui64 desiredOffset = partitionStream->GetFirstNotReadOffset(); - for (const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch& batch : partitionData.batches()) { - // Validate messages. - for (const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& messageData : batch.message_data()) { - // Check offsets continuity. - if (messageData.offset() != desiredOffset) { - bool res = partitionStream->AddToCommitRanges(desiredOffset, messageData.offset(), GetRangesMode()); - Y_VERIFY(res); - } - - if (firstOffset == std::numeric_limits<ui64>::max()) { - firstOffset = messageData.offset(); - } - currentOffset = messageData.offset(); - desiredOffset = currentOffset + 1; - partitionStream->UpdateMaxReadOffset(currentOffset); - const i64 messageSize = static_cast<i64>(messageData.data().size()); - CompressedDataSize += messageSize; - *Settings.Counters_->BytesInflightTotal += messageSize; - *Settings.Counters_->BytesInflightCompressed += messageSize; - ++*Settings.Counters_->MessagesInflight; - } - } - if (firstOffset == std::numeric_limits<ui64>::max()) { - BreakConnectionAndReconnectImpl(EStatus::INTERNAL_ERROR, - TStringBuilder() << "Got empty data message. Topic: " - << partitionData.topic() - << ". Partition: " << partitionData.partition() - << " message: " << msg, - deferred); - return; - } - cookie->SetOffsetRange(std::make_pair(firstOffset, desiredOffset)); - partitionStream->SetFirstNotReadOffset(desiredOffset); - if (!CookieMapping.AddMapping(cookie)) { - BreakConnectionAndReconnectImpl(EStatus::INTERNAL_ERROR, - TStringBuilder() << "Got unexpected data message. Topic: " - << partitionData.topic() - << ". Partition: " << partitionData.partition() - << ". Cookie mapping already has such cookie", - deferred); - return; - } - TDataDecompressionInfo* decompressionInfo = EventsQueue->PushDataEvent(partitionStream, std::move(partitionData)); - Y_VERIFY(decompressionInfo); - if (decompressionInfo) { - DecompressionQueue.emplace_back(decompressionInfo, partitionStream); - StartDecompressionTasksImpl(deferred); - } - } - - WaitingReadResponse = false; - ContinueReadingDataImpl(); -} - -void TSingleClusterReadSessionImpl::OnReadDoneImpl(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::Assigned&& msg, TDeferredActions& deferred) { // Assumes that we're under lock. - auto partitionStream = MakeIntrusive<TPartitionStreamImpl>(NextPartitionStreamId, - msg.topic().path(), - msg.cluster(), - msg.partition() + 1, // Group. - msg.partition(), // Partition. - msg.assign_id(), - msg.read_offset(), - weak_from_this(), - ErrorHandler); - NextPartitionStreamId += PartitionStreamIdStep; - - // Renew partition stream. - TIntrusivePtr<TPartitionStreamImpl>& currentPartitionStream = PartitionStreams[partitionStream->GetAssignId()]; - if (currentPartitionStream) { - CookieMapping.RemoveMapping(currentPartitionStream->GetPartitionStreamId()); - EventsQueue->PushEvent({currentPartitionStream, weak_from_this(), TReadSessionEvent::TPartitionStreamClosedEvent(currentPartitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost)}, deferred); - } - currentPartitionStream = partitionStream; - - // Send event to user. - EventsQueue->PushEvent({partitionStream, weak_from_this(), TReadSessionEvent::TCreatePartitionStreamEvent(partitionStream, msg.read_offset(), msg.end_offset())}, deferred); -} - -void TSingleClusterReadSessionImpl::OnReadDoneImpl(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::Release&& msg, TDeferredActions& deferred) { // Assumes that we're under lock. - auto partitionStreamIt = PartitionStreams.find(msg.assign_id()); - if (partitionStreamIt == PartitionStreams.end()) { - return; - } - TIntrusivePtr<TPartitionStreamImpl> partitionStream = partitionStreamIt->second; - if (msg.forceful_release()) { - PartitionStreams.erase(msg.assign_id()); - CookieMapping.RemoveMapping(partitionStream->GetPartitionStreamId()); - EventsQueue->PushEvent({partitionStream, weak_from_this(), TReadSessionEvent::TPartitionStreamClosedEvent(partitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost)}, deferred); - } else { - EventsQueue->PushEvent({partitionStream, weak_from_this(), TReadSessionEvent::TDestroyPartitionStreamEvent(std::move(partitionStream), msg.commit_offset())}, deferred); - } -} - -void TSingleClusterReadSessionImpl::OnReadDoneImpl(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::Committed&& msg, TDeferredActions& deferred) { // Assumes that we're under lock. - - Log.Write(TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg); - - TMap<ui64, TIntrusivePtr<TPartitionStreamImpl>> partitionStreams; - for (const Ydb::PersQueue::V1::CommitCookie& cookieProto : msg.cookies()) { - TPartitionCookieMapping::TCookie::TPtr cookie = CookieMapping.RetrieveCommittedCookie(cookieProto); - if (cookie) { - cookie->PartitionStream->UpdateMaxCommittedOffset(cookie->OffsetRange.second); - partitionStreams[cookie->PartitionStream->GetPartitionStreamId()] = cookie->PartitionStream; - } - } - for (auto& [id, partitionStream] : partitionStreams) { - EventsQueue->PushEvent({partitionStream, weak_from_this(), TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, partitionStream->GetMaxCommittedOffset())}, deferred); - } - - for (const auto& rangeProto : msg.offset_ranges()) { - auto partitionStreamIt = PartitionStreams.find(rangeProto.assign_id()); - if (partitionStreamIt != PartitionStreams.end()) { - auto partitionStream = partitionStreamIt->second; - partitionStream->UpdateMaxCommittedOffset(rangeProto.end_offset()); - EventsQueue->PushEvent({partitionStream, weak_from_this(), TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, rangeProto.end_offset())}, deferred); - } - } - -} - -void TSingleClusterReadSessionImpl::OnReadDoneImpl(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::PartitionStatus&& msg, TDeferredActions& deferred) { // Assumes that we're under lock. - auto partitionStreamIt = PartitionStreams.find(msg.assign_id()); - if (partitionStreamIt == PartitionStreams.end()) { - return; - } - EventsQueue->PushEvent( - {partitionStreamIt->second, weak_from_this(), TReadSessionEvent::TPartitionStreamStatusEvent(partitionStreamIt->second, - msg.committed_offset(), - 0, // TODO: support read offset in status - msg.end_offset(), - TInstant::MilliSeconds(msg.write_watermark_ms()))}, - deferred); -} - -void TSingleClusterReadSessionImpl::StartDecompressionTasksImpl(TDeferredActions& deferred) { - UpdateMemoryUsageStatisticsImpl(); - const i64 limit = GetDecompressedDataSizeLimit(); - Y_VERIFY(limit > 0); - while (DecompressedDataSize < limit - && (static_cast<size_t>(CompressedDataSize + DecompressedDataSize) < Settings.MaxMemoryUsageBytes_ - || DecompressedDataSize == 0 /* Allow decompression of at least one message even if memory is full. */) - && !DecompressionQueue.empty()) - { - TDecompressionQueueItem& current = DecompressionQueue.front(); - auto sentToDecompress = current.BatchInfo->StartDecompressionTasks(Settings.DecompressionExecutor_, - Max(limit - DecompressedDataSize, static_cast<i64>(1)), - AverageCompressionRatio, - current.PartitionStream, - deferred); - DecompressedDataSize += sentToDecompress; - if (current.BatchInfo->AllDecompressionTasksStarted()) { - DecompressionQueue.pop_front(); - } else { - break; - } - } -} - -void TSingleClusterReadSessionImpl::DestroyAllPartitionStreamsImpl(TDeferredActions& deferred) { - for (auto&& [key, partitionStream] : PartitionStreams) { - EventsQueue->PushEvent({partitionStream, weak_from_this(), TReadSessionEvent::TPartitionStreamClosedEvent(std::move(partitionStream), TReadSessionEvent::TPartitionStreamClosedEvent::EReason::ConnectionLost)}, deferred); - } - PartitionStreams.clear(); - CookieMapping.ClearMapping(); -} - -void TSingleClusterReadSessionImpl::OnCreateNewDecompressionTask() { - ++DecompressionTasksInflight; -} - -void TSingleClusterReadSessionImpl::OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount) { - TDeferredActions deferred; - --DecompressionTasksInflight; - - *Settings.Counters_->BytesRead += decompressedSize; - *Settings.Counters_->BytesReadCompressed += sourceSize; - *Settings.Counters_->MessagesRead += messagesCount; - *Settings.Counters_->BytesInflightUncompressed += decompressedSize; - *Settings.Counters_->BytesInflightCompressed -= sourceSize; - *Settings.Counters_->BytesInflightTotal += (decompressedSize - sourceSize); - - with_lock (Lock) { - UpdateMemoryUsageStatisticsImpl(); - CompressedDataSize -= sourceSize; - DecompressedDataSize += decompressedSize - estimatedDecompressedSize; - constexpr double weight = 0.6; - if (sourceSize > 0) { - AverageCompressionRatio = weight * static_cast<double>(decompressedSize) / static_cast<double>(sourceSize) + (1 - weight) * AverageCompressionRatio; - } - if (Aborting) { - return; - } - ContinueReadingDataImpl(); - StartDecompressionTasksImpl(deferred); - } -} - -void TSingleClusterReadSessionImpl::Abort() { - Log.Write(TLOG_DEBUG, GetLogPrefix() << "Abort session to cluster"); - - with_lock (Lock) { - if (!Aborting) { - Aborting = true; - CloseCallback = {}; - - // Cancel(ClientContext); // Don't cancel, because this is used only as factory for other contexts. - Cancel(ConnectContext); - Cancel(ConnectTimeoutContext); - Cancel(ConnectDelayContext); - - if (ClientContext) { - ClientContext->Cancel(); - ClientContext.reset(); - } - - if (Processor) { - Processor->Cancel(); - } - } - } -} - -void TSingleClusterReadSessionImpl::Close(std::function<void()> callback) { - with_lock (Lock) { - if (Aborting) { - callback(); - } - - if (!Closing) { - Closing = true; - - CloseCallback = std::move(callback); - - Cancel(ConnectContext); - Cancel(ConnectTimeoutContext); - Cancel(ConnectDelayContext); - - if (!Processor) { - CallCloseCallbackImpl(); - } else { - if (!HasCommitsInflightImpl()) { - Processor->Cancel(); - CallCloseCallbackImpl(); - } - } - } - } -} - -void TSingleClusterReadSessionImpl::CallCloseCallbackImpl() { - if (CloseCallback) { - CloseCallback(); - CloseCallback = {}; - } - Aborting = true; // So abort call will have no effect. - if (ClientContext) { - ClientContext->Cancel(); - ClientContext.reset(); - } -} - -void TSingleClusterReadSessionImpl::StopReadingData() { - with_lock (Lock) { - DataReadingSuspended = true; - } -} - -void TSingleClusterReadSessionImpl::ResumeReadingData() { - with_lock (Lock) { - if (DataReadingSuspended) { - DataReadingSuspended = false; - ContinueReadingDataImpl(); - } - } -} - -void TSingleClusterReadSessionImpl::WaitAllDecompressionTasks() { - Y_ASSERT(DecompressionTasksInflight >= 0); - while (DecompressionTasksInflight > 0) { - Sleep(TDuration::MilliSeconds(5)); // Perform active wait because this is aborting process and there are no decompression tasks here in normal situation. - } -} - -void TSingleClusterReadSessionImpl::DumpStatisticsToLog(TLogElement& log) { - with_lock (Lock) { - // cluster:topic:partition:stream-id:read-offset:committed-offset - for (auto&& [key, partitionStream] : PartitionStreams) { - log << " " - << ClusterName - << ':' << partitionStream->GetTopicPath() - << ':' << partitionStream->GetPartitionId() - << ':' << partitionStream->GetPartitionStreamId() - << ':' << partitionStream->GetMaxReadOffset() - << ':' << partitionStream->GetMaxCommittedOffset(); - } - } -} - -void TSingleClusterReadSessionImpl::UpdateMemoryUsageStatisticsImpl() { - const TInstant now = TInstant::Now(); - const ui64 delta = (now - UsageStatisticsLastUpdateTime).MilliSeconds(); - UsageStatisticsLastUpdateTime = now; - const double percent = 100.0 / static_cast<double>(Settings.MaxMemoryUsageBytes_); - - Settings.Counters_->TotalBytesInflightUsageByTime->Collect((DecompressedDataSize + CompressedDataSize) * percent, delta); - Settings.Counters_->UncompressedBytesInflightUsageByTime->Collect(DecompressedDataSize * percent, delta); - Settings.Counters_->CompressedBytesInflightUsageByTime->Collect(CompressedDataSize * percent, delta); -} - -void TSingleClusterReadSessionImpl::UpdateMemoryUsageStatistics() { - with_lock (Lock) { - UpdateMemoryUsageStatisticsImpl(); - } -} - -bool TSingleClusterReadSessionImpl::GetRangesMode() const { - return Settings.RangesMode_.GetOrElse(RangesMode); -} - -bool TSingleClusterReadSessionImpl::TPartitionCookieMapping::AddMapping(const TCookie::TPtr& cookie) { - if (!Cookies.emplace(cookie->GetKey(), cookie).second) { - return false; - } - for (ui64 offset = cookie->OffsetRange.first; offset < cookie->OffsetRange.second; ++offset) { - if (!UncommittedOffsetToCookie.emplace(std::make_pair(cookie->PartitionStream->GetPartitionStreamId(), offset), cookie).second) { - return false; - } - } - PartitionStreamIdToCookie.emplace(cookie->PartitionStream->GetPartitionStreamId(), cookie); - return true; -} - -TSingleClusterReadSessionImpl::TPartitionCookieMapping::TCookie::TPtr TSingleClusterReadSessionImpl::TPartitionCookieMapping::CommitOffset(ui64 partitionStreamId, ui64 offset) { - auto cookieIt = UncommittedOffsetToCookie.find(std::make_pair(partitionStreamId, offset)); - if (cookieIt != UncommittedOffsetToCookie.end()) { - TCookie::TPtr cookie; - if (!--cookieIt->second->UncommittedMessagesLeft) { - ++CommitInflight; - cookie = cookieIt->second; - } - UncommittedOffsetToCookie.erase(cookieIt); - return cookie; - } else { - ThrowFatalError(TStringBuilder() << "Invalid offset " << offset << ". Partition stream id: " << partitionStreamId << Endl); - } - // If offset wasn't found, there might be already hard released partition. - // This situation is OK. - return nullptr; -} - -TSingleClusterReadSessionImpl::TPartitionCookieMapping::TCookie::TPtr TSingleClusterReadSessionImpl::TPartitionCookieMapping::RetrieveCommittedCookie(const Ydb::PersQueue::V1::CommitCookie& cookieProto) { - TCookie::TPtr cookieInfo; - auto cookieIt = Cookies.find(TCookie::TKey(cookieProto.assign_id(), cookieProto.partition_cookie())); - if (cookieIt != Cookies.end()) { - --CommitInflight; - cookieInfo = cookieIt->second; - Cookies.erase(cookieIt); - - auto [rangeBegin, rangeEnd] = PartitionStreamIdToCookie.equal_range(cookieInfo->PartitionStream->GetPartitionStreamId()); - for (auto i = rangeBegin; i != rangeEnd; ++i) { - if (i->second == cookieInfo) { - PartitionStreamIdToCookie.erase(i); - break; - } - } - } - return cookieInfo; -} - -void TSingleClusterReadSessionImpl::TPartitionCookieMapping::RemoveMapping(ui64 partitionStreamId) { - auto [rangeBegin, rangeEnd] = PartitionStreamIdToCookie.equal_range(partitionStreamId); - for (auto i = rangeBegin; i != rangeEnd; ++i) { - TCookie::TPtr cookie = i->second; - Cookies.erase(cookie->GetKey()); - for (ui64 offset = cookie->OffsetRange.first; offset < cookie->OffsetRange.second; ++offset) { - UncommittedOffsetToCookie.erase(std::make_pair(partitionStreamId, offset)); - } - } - PartitionStreamIdToCookie.erase(rangeBegin, rangeEnd); -} - -void TSingleClusterReadSessionImpl::TPartitionCookieMapping::ClearMapping() { - Cookies.clear(); - UncommittedOffsetToCookie.clear(); - PartitionStreamIdToCookie.clear(); - CommitInflight = 0; -} - -bool TSingleClusterReadSessionImpl::TPartitionCookieMapping::HasUnacknowledgedCookies() const { - return CommitInflight != 0; -} +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NPersQueue::TReadSessionEvent TReadSessionEvent::TCreatePartitionStreamEvent::TCreatePartitionStreamEvent(TPartitionStream::TPtr partitionStream, ui64 committedOffset, ui64 endOffset) : PartitionStream(std::move(partitionStream)) @@ -1627,7 +620,7 @@ TReadSessionEvent::TCreatePartitionStreamEvent::TCreatePartitionStreamEvent(TPar void TReadSessionEvent::TCreatePartitionStreamEvent::Confirm(TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset) { if (PartitionStream) { - static_cast<TPartitionStreamImpl*>(PartitionStream.Get())->ConfirmCreate(readOffset, commitOffset); + static_cast<TPartitionStreamImpl<true>*>(PartitionStream.Get())->ConfirmCreate(readOffset, commitOffset); } } @@ -1639,7 +632,7 @@ TReadSessionEvent::TDestroyPartitionStreamEvent::TDestroyPartitionStreamEvent(TP void TReadSessionEvent::TDestroyPartitionStreamEvent::Confirm() { if (PartitionStream) { - static_cast<TPartitionStreamImpl*>(PartitionStream.Get())->ConfirmDestroy(); + static_cast<TPartitionStreamImpl<true>*>(PartitionStream.Get())->ConfirmDestroy(); } } @@ -1668,7 +661,7 @@ TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(TVector<TMessage> mess void TReadSessionEvent::TDataReceivedEvent::Commit() { for (auto [from, to] : OffsetRanges) { - static_cast<TPartitionStreamImpl*>(PartitionStream.Get())->Commit(from, to); + static_cast<TPartitionStreamImpl<true>*>(PartitionStream.Get())->Commit(from, to); } } @@ -1755,635 +748,6 @@ TReadSessionEvent::TPartitionStreamStatusEvent::TPartitionStreamStatusEvent(TPar { } -TReadSessionEventInfo::TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl> partitionStream, std::weak_ptr<IUserRetrievedEventCallback> session, TEvent event) - : PartitionStream(std::move(partitionStream)) - , Event(std::move(event)) - , Session(std::move(session)) -{} - -TReadSessionEventInfo::TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl> partitionStream, std::weak_ptr<IUserRetrievedEventCallback> session) - : PartitionStream(std::move(partitionStream)) - , Session(std::move(session)) -{} - -TReadSessionEventInfo::TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl> partitionStream, - std::weak_ptr<IUserRetrievedEventCallback> session, - TVector<TReadSessionEvent::TDataReceivedEvent::TMessage> messages, - TVector<TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> compressedMessages) - : PartitionStream(std::move(partitionStream)) - , Event( - NMaybe::TInPlace(), - std::in_place_type_t<TReadSessionEvent::TDataReceivedEvent>(), - std::move(messages), - std::move(compressedMessages), - PartitionStream - ) - , Session(std::move(session)) -{ -} - -void TReadSessionEventInfo::MoveToPartitionStream() { - PartitionStream->InsertEvent(std::move(*Event)); - Event = Nothing(); - Y_ASSERT(PartitionStream->HasEvents()); -} - -void TReadSessionEventInfo::ExtractFromPartitionStream() { - if (!Event && !IsEmpty()) { - Event = std::move(PartitionStream->TopEvent().GetEvent()); - PartitionStream->PopEvent(); - } -} - -bool TReadSessionEventInfo::IsEmpty() const { - return !PartitionStream || !PartitionStream->HasEvents(); -} - -bool TReadSessionEventInfo::IsDataEvent() const { - return !IsEmpty() && PartitionStream->TopEvent().IsDataEvent(); -} - -bool TReadSessionEventInfo::HasMoreData() const { - return PartitionStream->TopEvent().GetData().HasMoreData(); -} - -bool TReadSessionEventInfo::HasReadyUnreadData() const { - return PartitionStream->TopEvent().GetData().HasReadyUnreadData(); -} - -void TReadSessionEventInfo::OnUserRetrievedEvent() { - if (auto session = Session.lock()) { - session->OnUserRetrievedEvent(*Event); - } -} - -bool TReadSessionEventInfo::TakeData(TVector<TReadSessionEvent::TDataReceivedEvent::TMessage>* messages, - TVector<TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>* compressedMessages, - size_t* maxByteSize) -{ - return PartitionStream->TopEvent().GetData().TakeData(PartitionStream, messages, compressedMessages, maxByteSize); -} - -TReadSessionEventsQueue::TReadSessionEventsQueue(const TSettings& settings, std::weak_ptr<IUserRetrievedEventCallback> session) - : TParent(settings) - , Session(std::move(session)) -{ - const auto& h = Settings.EventHandlers_; - if (h.CommonHandler_ - || h.DataReceivedHandler_ - || h.CommitAcknowledgementHandler_ - || h.CreatePartitionStreamHandler_ - || h.DestroyPartitionStreamHandler_ - || h.PartitionStreamStatusHandler_ - || h.PartitionStreamClosedHandler_ - || h.SessionClosedHandler_) - { - HasEventCallbacks = true; - } else { - HasEventCallbacks = false; - } -} - -void TReadSessionEventsQueue::PushEvent(TReadSessionEventInfo eventInfo, TDeferredActions& deferred) { - if (Closed) { - return; - } - - with_lock (Mutex) { - auto partitionStream = eventInfo.PartitionStream; - eventInfo.MoveToPartitionStream(); - SignalReadyEventsImpl(partitionStream.Get(), deferred); - } -} - -void TReadSessionEventsQueue::SignalEventImpl(TIntrusivePtr<TPartitionStreamImpl> partitionStream, TDeferredActions& deferred) { - if (Closed) { - return; - } - auto session = partitionStream->GetSession(); - Events.emplace(std::move(partitionStream), std::move(session)); - SignalWaiterImpl(deferred); -} - -TDataDecompressionInfo* TReadSessionEventsQueue::PushDataEvent(TIntrusivePtr<TPartitionStreamImpl> partitionStream, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::PartitionData&& msg) { - if (Closed) { - return nullptr; - } - - with_lock (Mutex) { - return &partitionStream->InsertDataEvent(std::move(msg), Settings.Decompress_); - } -} - -TMaybe<TReadSessionEventsQueue::TEventInfo> TReadSessionEventsQueue::GetDataEventImpl(TEventInfo& srcDataEventInfo, size_t* maxByteSize) { // Assumes that we're under lock. - TVector<TReadSessionEvent::TDataReceivedEvent::TMessage> messages; - TVector<TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> compressedMessages; - TIntrusivePtr<TPartitionStreamImpl> partitionStream = srcDataEventInfo.PartitionStream; - bool messageExtracted = false; - while (srcDataEventInfo.HasReadyUnreadData() && *maxByteSize > 0) { - const bool hasMoreUnpackedData = srcDataEventInfo.TakeData(&messages, &compressedMessages, maxByteSize); - if (!hasMoreUnpackedData) { - const bool messageIsFullyRead = !srcDataEventInfo.HasMoreData(); - if (messageIsFullyRead) { - partitionStream->PopEvent(); - messageExtracted = true; - break; - } - } - } - if (!messageExtracted) { - partitionStream->TopEvent().Signalled = false; - } - - if (messages.empty() && compressedMessages.empty()) { - return Nothing(); - } - return TEventInfo(partitionStream, partitionStream->GetSession(), std::move(messages), std::move(compressedMessages)); -} - -void TReadSessionEventsQueue::SignalReadyEvents(TPartitionStreamImpl* partitionStream) { - Y_ASSERT(partitionStream); - TDeferredActions deferred; - with_lock (Mutex) { - SignalReadyEventsImpl(partitionStream, deferred); - } -} - -void TReadSessionEventsQueue::SignalReadyEventsImpl(TPartitionStreamImpl* partitionStream, TDeferredActions& deferred) { - partitionStream->SignalReadyEvents(this, deferred); - ApplyCallbacksToReadyEventsImpl(deferred); -} - -bool TReadSessionEventsQueue::ApplyCallbacksToReadyEventsImpl(TDeferredActions& deferred) { - if (!HasEventCallbacks) { - return false; - } - bool applied = false; - while (HasCallbackForNextEventImpl()) { - size_t maxSize = std::numeric_limits<size_t>::max(); - TMaybe<TReadSessionEventInfo> eventInfo = GetEventImpl(&maxSize); - if (!eventInfo) { - break; - } - const TIntrusivePtr<TPartitionStreamImpl> partitionStreamForSignalling = eventInfo->IsDataEvent() ? eventInfo->PartitionStream : nullptr; - applied = true; - if (!ApplyHandler(*eventInfo, deferred)) { // Close session event. - break; - } - if (partitionStreamForSignalling) { - SignalReadyEventsImpl(partitionStreamForSignalling.Get(), deferred); - } - } - return applied; -} - -struct THasCallbackForEventVisitor { - explicit THasCallbackForEventVisitor(const TReadSessionSettings& settings) - : Settings(settings) - { - } - -#define DECLARE_HANDLER(type, handler) \ - bool operator()(const type&) { \ - return bool(Settings.EventHandlers_.handler); \ - } \ - /**/ - - DECLARE_HANDLER(TReadSessionEvent::TDataReceivedEvent, DataReceivedHandler_); - DECLARE_HANDLER(TReadSessionEvent::TCommitAcknowledgementEvent, CommitAcknowledgementHandler_); - DECLARE_HANDLER(TReadSessionEvent::TCreatePartitionStreamEvent, CreatePartitionStreamHandler_); - DECLARE_HANDLER(TReadSessionEvent::TDestroyPartitionStreamEvent, DestroyPartitionStreamHandler_); - DECLARE_HANDLER(TReadSessionEvent::TPartitionStreamStatusEvent, PartitionStreamStatusHandler_); - DECLARE_HANDLER(TReadSessionEvent::TPartitionStreamClosedEvent, PartitionStreamClosedHandler_); - DECLARE_HANDLER(TSessionClosedEvent, SessionClosedHandler_); - -#undef DECLARE_HANDLER - - const TReadSessionSettings& Settings; -}; - -bool TReadSessionEventsQueue::HasCallbackForNextEventImpl() const { - if (!HasEventsImpl()) { - return false; - } - if (Settings.EventHandlers_.CommonHandler_) { - return true; - } - - if (!Events.empty()) { - const TEventInfo& topEvent = Events.front(); - const TReadSessionEvent::TEvent* event = nullptr; - if (topEvent.Event) { - event = &*topEvent.Event; - } else if (topEvent.PartitionStream && topEvent.PartitionStream->HasEvents()) { - const TRawPartitionStreamEvent& partitionStreamTopEvent = topEvent.PartitionStream->TopEvent(); - if (partitionStreamTopEvent.IsDataEvent()) { - return bool(Settings.EventHandlers_.DataReceivedHandler_); - } else { - event = &partitionStreamTopEvent.GetEvent(); - } - } - - if (!event) { - return false; - } - - THasCallbackForEventVisitor visitor(Settings); - return std::visit(visitor, *event); - } else if (CloseEvent) { - return bool(Settings.EventHandlers_.SessionClosedHandler_); - } - Y_ASSERT(false); - return false; -} - -void TReadSessionEventsQueue::ClearAllEvents() { - TDeferredActions deferred; - with_lock (Mutex) { - while (!Events.empty()) { - auto& event = Events.front(); - if (event.PartitionStream && event.PartitionStream->HasEvents()) { - event.PartitionStream->PopEvent(); - } - Events.pop(); - } - } -} - -TDataDecompressionInfo::TDataDecompressionInfo( - Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::PartitionData&& msg, - std::weak_ptr<TSingleClusterReadSessionImpl> session, - bool doDecompress -) - : ServerMessage(std::move(msg)) - , Session(std::move(session)) - , DoDecompress(doDecompress) -{ - for (const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch& batch : ServerMessage.batches()) { - for (const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& messageData : batch.message_data()) { - CompressedDataSize += messageData.data().size(); - } - } - SourceDataNotProcessed = CompressedDataSize; - - BuildBatchesMeta(); -} - -void TDataDecompressionInfo::BuildBatchesMeta() { - BatchesMeta.reserve(ServerMessage.batches_size()); - for (const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch& batch : ServerMessage.batches()) { - // Extra fields. - TWriteSessionMeta::TPtr meta = MakeIntrusive<TWriteSessionMeta>(); - meta->Fields.reserve(batch.extra_fields_size()); - for (const Ydb::PersQueue::V1::KeyValue& kv : batch.extra_fields()) { - meta->Fields.emplace(kv.key(), kv.value()); - } - BatchesMeta.emplace_back(std::move(meta)); - } -} - -void TDataDecompressionInfo::PutDecompressionError(std::exception_ptr error, size_t batch, size_t message) { - if (!DecompressionErrorsStructCreated) { - with_lock (DecompressionErrorsStructLock) { - DecompressionErrors.resize(ServerMessage.batches_size()); - for (size_t batch = 0; batch < static_cast<size_t>(ServerMessage.batches_size()); ++batch) { - DecompressionErrors[batch].resize(static_cast<size_t>(ServerMessage.batches(batch).message_data_size())); - } - - // Set barrier. - DecompressionErrorsStructCreated = true; - } - } - Y_ASSERT(batch < DecompressionErrors.size()); - Y_ASSERT(message < DecompressionErrors[batch].size()); - DecompressionErrors[batch][message] = std::move(error); -} - -std::exception_ptr TDataDecompressionInfo::GetDecompressionError(size_t batch, size_t message) { - if (!DecompressionErrorsStructCreated) { - return {}; - } - Y_ASSERT(batch < DecompressionErrors.size()); - Y_ASSERT(message < DecompressionErrors[batch].size()); - return DecompressionErrors[batch][message]; -} - -i64 TDataDecompressionInfo::StartDecompressionTasks(const IExecutor::TPtr& executor, i64 availableMemory, double averageCompressionRatio, const TIntrusivePtr<TPartitionStreamImpl>& partitionStream, TDeferredActions& deferred) { - constexpr size_t TASK_LIMIT = 512_KB; - std::shared_ptr<TSingleClusterReadSessionImpl> session = Session.lock(); - Y_ASSERT(session); - ReadyThresholds.emplace_back(); - TDecompressionTask task(this, partitionStream, &ReadyThresholds.back()); - i64 used = 0; - while (availableMemory > 0 && !AllDecompressionTasksStarted()) { - const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch& batch = ServerMessage.batches(CurrentDecompressingMessage.first); - if (CurrentDecompressingMessage.second < static_cast<size_t>(batch.message_data_size())) { - const auto& messageData = batch.message_data(CurrentDecompressingMessage.second); - const i64 size = static_cast<i64>(messageData.data().size()); - const i64 estimatedDecompressedSize = - messageData.uncompressed_size() ? static_cast<i64>(messageData.uncompressed_size()) : static_cast<i64>(size * averageCompressionRatio); - - Y_VERIFY(estimatedDecompressedSize >= 0); - - task.Add(CurrentDecompressingMessage.first, CurrentDecompressingMessage.second, size, estimatedDecompressedSize); - used += estimatedDecompressedSize; - availableMemory -= estimatedDecompressedSize; - } - ++CurrentDecompressingMessage.second; - if (CurrentDecompressingMessage.second >= static_cast<size_t>(batch.message_data_size())) { // next batch - ++CurrentDecompressingMessage.first; - CurrentDecompressingMessage.second = 0; - } - if (task.AddedDataSize() >= TASK_LIMIT) { - session->OnCreateNewDecompressionTask(); - deferred.DeferStartExecutorTask(executor, std::move(task)); - ReadyThresholds.emplace_back(); - task = TDecompressionTask(this, partitionStream, &ReadyThresholds.back()); - } - } - if (task.AddedMessagesCount() > 0) { - session->OnCreateNewDecompressionTask(); - deferred.DeferStartExecutorTask(executor, std::move(task)); - } else { - ReadyThresholds.pop_back(); // Revert. - } - return used; -} - -bool TDataDecompressionInfo::TakeData(const TIntrusivePtr<TPartitionStreamImpl>& partitionStream, - TVector<TReadSessionEvent::TDataReceivedEvent::TMessage>* messages, - TVector<TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>* compressedMessages, - size_t* maxByteSize) -{ - TMaybe<std::pair<size_t, size_t>> readyThreshold = GetReadyThreshold(); - Y_ASSERT(readyThreshold); - auto& msg = GetServerMessage(); - ui64 minOffset = Max<ui64>(); - ui64 maxOffset = 0; - const auto prevReadingMessage = CurrentReadingMessage; - while (HasMoreData() && *maxByteSize > 0 && CurrentReadingMessage <= *readyThreshold) { - auto& batch = *msg.mutable_batches(CurrentReadingMessage.first); - if (CurrentReadingMessage.second < static_cast<size_t>(batch.message_data_size())) { - const auto& meta = GetBatchMeta(CurrentReadingMessage.first); - const TInstant batchWriteTimestamp = TInstant::MilliSeconds(batch.write_timestamp_ms()); - auto& messageData = *batch.mutable_message_data(CurrentReadingMessage.second); - minOffset = Min(minOffset, messageData.offset()); - maxOffset = Max(maxOffset, messageData.offset()); - TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo( - messageData.offset(), - batch.source_id(), - messageData.seq_no(), - TInstant::MilliSeconds(messageData.create_timestamp_ms()), - batchWriteTimestamp, - batch.ip(), - meta, - messageData.uncompressed_size() - ); - if (DoDecompress) { - messages->emplace_back( - messageData.data(), - GetDecompressionError(CurrentReadingMessage.first, CurrentReadingMessage.second), - messageInfo, - partitionStream, - messageData.partition_key(), - messageData.explicit_hash() - ); - } else { - compressedMessages->emplace_back( - static_cast<ECodec>(messageData.codec()), - messageData.data(), - TVector<TReadSessionEvent::TDataReceivedEvent::TMessageInformation>{messageInfo}, - partitionStream, - messageData.partition_key(), - messageData.explicit_hash() - ); - } - *maxByteSize -= Min(*maxByteSize, messageData.data().size()); - - // Clear data to free internal session's memory. - messageData.clear_data(); - } - - ++CurrentReadingMessage.second; - if (CurrentReadingMessage.second >= static_cast<size_t>(batch.message_data_size())) { - CurrentReadingMessage.second = 0; - do { - ++CurrentReadingMessage.first; - } while (CurrentReadingMessage.first < static_cast<size_t>(msg.batches_size()) && msg.batches(CurrentReadingMessage.first).message_data_size() == 0); - } - } - partitionStream->GetLog().Write(TLOG_DEBUG, TStringBuilder() << "Take Data. Partition " << partitionStream->GetPartitionId() - << ". Read: {" << prevReadingMessage.first << ", " << prevReadingMessage.second << "} -> {" - << CurrentReadingMessage.first << ", " << CurrentReadingMessage.second << "} (" - << minOffset << "-" << maxOffset << ")"); - return CurrentReadingMessage <= *readyThreshold; -} - -bool TDataDecompressionInfo::HasReadyUnreadData() const { - TMaybe<std::pair<size_t, size_t>> threshold = GetReadyThreshold(); - if (!threshold) { - return false; - } - return CurrentReadingMessage <= *threshold; -} - -void TDataDecompressionInfo::TDecompressionTask::Add(size_t batch, size_t message, size_t sourceDataSize, size_t estimatedDecompressedSize) { - if (Messages.empty() || Messages.back().Batch != batch) { - Messages.push_back({ batch, { message, message + 1 } }); - } - Messages.back().MessageRange.second = message + 1; - SourceDataSize += sourceDataSize; - EstimatedDecompressedSize += estimatedDecompressedSize; - Ready->Batch = batch; - Ready->Message = message; -} - -TDataDecompressionInfo::TDecompressionTask::TDecompressionTask(TDataDecompressionInfo* parent, TIntrusivePtr<TPartitionStreamImpl> partitionStream, TReadyMessageThreshold* ready) - : Parent(parent) - , PartitionStream(std::move(partitionStream)) - , Ready(ready) -{ -} - -// Forward delcaration -namespace NCompressionDetails { - extern TString Decompress(const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& data); -} - -void TDataDecompressionInfo::TDecompressionTask::operator()() { - ui64 minOffset = Max<ui64>(); - ui64 maxOffset = 0; - const ui64 partition = Parent->ServerMessage.partition(); - i64 dataProcessed = 0; - size_t messagesProcessed = 0; - for (const TMessageRange& messages : Messages) { - Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch& batch = - *Parent->ServerMessage.mutable_batches(messages.Batch); - for (size_t i = messages.MessageRange.first; i < messages.MessageRange.second; ++i) { - Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& data = - *batch.mutable_message_data(i); - - ++messagesProcessed; - dataProcessed += static_cast<i64>(data.data().size()); - minOffset = Min(minOffset, data.offset()); - maxOffset = Max(maxOffset, data.offset()); - - try { - if (Parent->DoDecompress - && data.codec() != Ydb::PersQueue::V1::CODEC_RAW - && data.codec() != Ydb::PersQueue::V1::CODEC_UNSPECIFIED - ) { - TString decompressed = NCompressionDetails::Decompress(data); - data.set_data(decompressed); - data.set_codec(Ydb::PersQueue::V1::CODEC_RAW); - } - DecompressedSize += data.data().size(); - } catch (...) { - Parent->PutDecompressionError(std::current_exception(), messages.Batch, i); - data.clear_data(); // Free memory, because we don't count it. - - std::shared_ptr<TSingleClusterReadSessionImpl> session = Parent->Session.lock(); - if (session) { - session->GetLog() << TLOG_INFO << "Error decompressing data: " << CurrentExceptionMessage(); - } - } - } - } - if (auto session = Parent->Session.lock()) { - session->GetLog().Write( - TLOG_DEBUG, - TStringBuilder() << "Decompression task done. Partition: " << partition << " (" << minOffset << "-" << maxOffset << ")" - ); - } - Y_ASSERT(dataProcessed == SourceDataSize); - std::shared_ptr<TSingleClusterReadSessionImpl> session = Parent->Session.lock(); - - if (session) { - session->OnDataDecompressed(SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed); - } - - Parent->SourceDataNotProcessed -= dataProcessed; - Ready->Ready = true; - - if (session) { - session->GetEventsQueue()->SignalReadyEvents(PartitionStream.Get()); - } -} - -void TRawPartitionStreamEvent::Signal(TPartitionStreamImpl* partitionStream, TReadSessionEventsQueue* queue, TDeferredActions& deferred) { - if (!Signalled) { - Signalled = true; - queue->SignalEventImpl(partitionStream, deferred); - } -} - -void TDeferredActions::DeferReadFromProcessor(const IProcessor::TPtr& processor, - Ydb::PersQueue::V1::MigrationStreamingReadServerMessage* dst, - IProcessor::TReadCallback callback) -{ - Y_ASSERT(!Processor); - Y_ASSERT(!ReadDst); - Y_ASSERT(!ReadCallback); - Processor = processor; - ReadDst = dst; - ReadCallback = std::move(callback); -} - -void TDeferredActions::DeferStartExecutorTask(const IExecutor::TPtr& executor, IExecutor::TFunction task) { - ExecutorsTasks.emplace_back(executor, std::move(task)); -} - -void TDeferredActions::DeferAbortSession(const IErrorHandler::TPtr& errorHandler, TSessionClosedEvent&& closeEvent) { - ErrorHandler = errorHandler; - SessionClosedEvent.ConstructInPlace(std::move(closeEvent)); -} - -void TDeferredActions::DeferAbortSession(const IErrorHandler::TPtr& errorHandler, EStatus statusCode, NYql::TIssues&& issues) { - DeferAbortSession(errorHandler, TSessionClosedEvent(statusCode, std::move(issues))); -} - -void TDeferredActions::DeferAbortSession(const IErrorHandler::TPtr& errorHandler, EStatus statusCode, const TString& message) { - NYql::TIssues issues; - issues.AddIssue(message); - DeferAbortSession(errorHandler, statusCode, std::move(issues)); -} - -void TDeferredActions::DeferAbortSession(const IErrorHandler::TPtr& errorHandler, TPlainStatus&& status) { - DeferAbortSession(errorHandler, TSessionClosedEvent(std::move(status))); -} - -void TDeferredActions::DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl> session, const IErrorHandler::TPtr& errorHandler, TPlainStatus&& status) { - Session = std::move(session); - ErrorHandler = errorHandler; - ReconnectionStatus = std::move(status); -} - -void TDeferredActions::DeferStartSession(std::shared_ptr<TSingleClusterReadSessionImpl> session) { - Sessions.push_back(std::move(session)); -} - -void TDeferredActions::DeferSignalWaiter(TWaiter&& waiter) { - Waiters.emplace_back(std::move(waiter)); -} - -void TDeferredActions::DoActions() { - Read(); - StartExecutorTasks(); - AbortSession(); - Reconnect(); - SignalWaiters(); - StartSessions(); -} - -void TDeferredActions::StartSessions() { - for (auto& session : Sessions) { - session->Start(); - } -} - - -void TDeferredActions::Read() { - if (ReadDst) { - Y_ASSERT(Processor); - Y_ASSERT(ReadCallback); - Processor->Read(ReadDst, std::move(ReadCallback)); - } -} - -void TDeferredActions::StartExecutorTasks() { - for (auto&& [executor, task] : ExecutorsTasks) { - executor->Post(std::move(task)); - } -} - -void TDeferredActions::AbortSession() { - if (SessionClosedEvent) { - Y_ASSERT(ErrorHandler); - ErrorHandler->AbortSession(std::move(*SessionClosedEvent)); - } -} - -void TDeferredActions::Reconnect() { - if (Session) { - Y_ASSERT(ErrorHandler); - if (!Session->Reconnect(ReconnectionStatus)) { - ErrorHandler->AbortSession(std::move(ReconnectionStatus)); - } - } -} - -void TDeferredActions::SignalWaiters() { - for (auto& w : Waiters) { - w.Signal(); - } -} - -void TErrorHandler::AbortSession(TSessionClosedEvent&& closeEvent) { - if (auto session = Session.lock()) { - session->Abort(std::move(closeEvent)); - } -} - class TGracefulReleasingSimpleDataHandlers : public TThrRefBase { public: explicit TGracefulReleasingSimpleDataHandlers(std::function<void(TReadSessionEvent::TDataReceivedEvent&)> dataHandler, bool commitAfterProcessing) @@ -2514,6 +878,9 @@ TReadSessionSettings::TEventHandlers& TReadSessionSettings::TEventHandlers::Simp return *this; } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TDeferredCommit + class TDeferredCommit::TImpl { public: @@ -2624,7 +991,7 @@ void TDeferredCommit::TImpl::Add(const TReadSessionEvent::TDataReceivedEvent& da void TDeferredCommit::TImpl::Commit() { for (auto&& [partitionStream, offsetRanges] : Offsets) { for (auto&& [startOffset, endOffset] : offsetRanges) { - static_cast<TPartitionStreamImpl*>(partitionStream.Get())->Commit(startOffset, endOffset); + static_cast<TPartitionStreamImpl<true>*>(partitionStream.Get())->Commit(startOffset, endOffset); } } Offsets.clear(); @@ -2648,72 +1015,6 @@ TReaderCounters::TReaderCounters(const TIntrusivePtr<::NMonitoring::TDynamicCoun CompressedBytesInflightUsageByTime = counters->GetHistogram("compressedBytesInflightUsageByTime", HISTOGRAM_SETUP); } -void MakeCountersNotNull(TReaderCounters& counters) { - if (!counters.Errors) { - counters.Errors = MakeIntrusive<NMonitoring::TCounterForPtr>(true); - } - - if (!counters.CurrentSessionLifetimeMs) { - counters.CurrentSessionLifetimeMs = MakeIntrusive<NMonitoring::TCounterForPtr>(false); - } - - if (!counters.BytesRead) { - counters.BytesRead = MakeIntrusive<NMonitoring::TCounterForPtr>(true); - } - - if (!counters.MessagesRead) { - counters.MessagesRead = MakeIntrusive<NMonitoring::TCounterForPtr>(true); - } - - if (!counters.BytesReadCompressed) { - counters.BytesReadCompressed = MakeIntrusive<NMonitoring::TCounterForPtr>(true); - } - - if (!counters.BytesInflightUncompressed) { - counters.BytesInflightUncompressed = MakeIntrusive<NMonitoring::TCounterForPtr>(false); - } - - if (!counters.BytesInflightCompressed) { - counters.BytesInflightCompressed = MakeIntrusive<NMonitoring::TCounterForPtr>(false); - } - - if (!counters.BytesInflightTotal) { - counters.BytesInflightTotal = MakeIntrusive<NMonitoring::TCounterForPtr>(false); - } - - if (!counters.MessagesInflight) { - counters.MessagesInflight = MakeIntrusive<NMonitoring::TCounterForPtr>(false); - } - - - if (!counters.TotalBytesInflightUsageByTime) { - counters.TotalBytesInflightUsageByTime = MakeIntrusive<NMonitoring::THistogramCounter>(HISTOGRAM_SETUP); - } - - if (!counters.UncompressedBytesInflightUsageByTime) { - counters.UncompressedBytesInflightUsageByTime = MakeIntrusive<NMonitoring::THistogramCounter>(HISTOGRAM_SETUP); - } - - if (!counters.CompressedBytesInflightUsageByTime) { - counters.CompressedBytesInflightUsageByTime = MakeIntrusive<NMonitoring::THistogramCounter>(HISTOGRAM_SETUP); - } -} - #undef HISTOGRAM_SETUP -bool HasNullCounters(TReaderCounters& counters) { - return !counters.Errors - || !counters.CurrentSessionLifetimeMs - || !counters.BytesRead - || !counters.MessagesRead - || !counters.BytesReadCompressed - || !counters.BytesInflightUncompressed - || !counters.BytesInflightCompressed - || !counters.BytesInflightTotal - || !counters.MessagesInflight - || !counters.TotalBytesInflightUsageByTime - || !counters.UncompressedBytesInflightUsageByTime - || !counters.CompressedBytesInflightUsageByTime; -} - } // namespace NYdb::NPersQueue diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index 3317cd28eb..fc3e6f1164 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -5,7 +5,9 @@ #include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> #include <ydb/public/api/grpc/draft/ydb_persqueue_v1.grpc.pb.h> +#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h> @@ -18,20 +20,87 @@ namespace NYdb::NPersQueue { +template <bool UseMigrationProtocol> +using TClientMessage = std::conditional_t<UseMigrationProtocol, + Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, + Ydb::Topic::StreamReadMessage::FromClient>; + +template <bool UseMigrationProtocol> +using TServerMessage = std::conditional_t<UseMigrationProtocol, + Ydb::PersQueue::V1::MigrationStreamingReadServerMessage, + Ydb::Topic::StreamReadMessage::FromServer>; + +template <bool UseMigrationProtocol> +using IReadSessionConnectionProcessorFactory = + ISessionConnectionProcessorFactory<TClientMessage<UseMigrationProtocol>, TServerMessage<UseMigrationProtocol>>; + +template <bool UseMigrationProtocol> +using IProcessor = typename IReadSessionConnectionProcessorFactory<UseMigrationProtocol>::IProcessor; + +template <bool UseMigrationProtocol> +using TPartitionData = std::conditional_t<UseMigrationProtocol, + Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::PartitionData, + Ydb::Topic::StreamReadMessage::ReadResponse::PartitionData>; + +template <bool UseMigrationProtocol> +using TAWriteSessionMeta = std::conditional_t<UseMigrationProtocol, + NYdb::NPersQueue::TWriteSessionMeta, + NYdb::NTopic::TWriteSessionMeta>; + +template <bool UseMigrationProtocol> +using TASessionClosedEvent = std::conditional_t<UseMigrationProtocol, + NYdb::NPersQueue::TSessionClosedEvent, + NYdb::NTopic::TSessionClosedEvent>; + +template <bool UseMigrationProtocol> +using TAPartitionStream = std::conditional_t<UseMigrationProtocol, + NYdb::NPersQueue::TPartitionStream, + NYdb::NTopic::TPartitionSession>; + +template <bool UseMigrationProtocol> +using TAReadSessionEvent = std::conditional_t<UseMigrationProtocol, + NYdb::NPersQueue::TReadSessionEvent, + NYdb::NTopic::TReadSessionEvent>; + +template <bool UseMigrationProtocol> +using IARetryPolicy = std::conditional_t<UseMigrationProtocol, + NYdb::NPersQueue::IRetryPolicy, + NYdb::NTopic::IRetryPolicy>; + +template <bool UseMigrationProtocol> +using IAExecutor = std::conditional_t<UseMigrationProtocol, + NYdb::NPersQueue::IExecutor, + NYdb::NTopic::IExecutor>; + +template <bool UseMigrationProtocol> +using TAReadSessionSettings = std::conditional_t<UseMigrationProtocol, + NYdb::NPersQueue::TReadSessionSettings, + NYdb::NTopic::TReadSessionSettings>; + + +template <bool UseMigrationProtocol> class TPartitionStreamImpl; + +template <bool UseMigrationProtocol> class TSingleClusterReadSessionImpl; + +template <bool UseMigrationProtocol> class TDeferredActions; -class TReadSession; -using IReadSessionConnectionProcessorFactory = ISessionConnectionProcessorFactory<Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>; + +template <bool UseMigrationProtocol> class TReadSessionEventsQueue; +class TReadSession; + + +template <bool UseMigrationProtocol> struct IErrorHandler : public TThrRefBase { using TPtr = TIntrusivePtr<IErrorHandler>; - virtual void AbortSession(TSessionClosedEvent&& closeEvent) = 0; + virtual void AbortSession(TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) = 0; void AbortSession(EStatus statusCode, NYql::TIssues&& issues) { - AbortSession(TSessionClosedEvent(statusCode, std::move(issues))); + AbortSession(TASessionClosedEvent<UseMigrationProtocol>(statusCode, std::move(issues))); } void AbortSession(EStatus statusCode, const TString& message) { @@ -41,28 +110,26 @@ struct IErrorHandler : public TThrRefBase { } void AbortSession(TPlainStatus&& status) { - AbortSession(TSessionClosedEvent(std::move(status))); + AbortSession(TASessionClosedEvent<UseMigrationProtocol>(std::move(status))); } }; // Special class that stores actions to be done after lock will be released. +template <bool UseMigrationProtocol> class TDeferredActions { public: - using IProcessor = IReadSessionConnectionProcessorFactory::IProcessor; - -public: ~TDeferredActions() { DoActions(); } - void DeferReadFromProcessor(const IProcessor::TPtr& processor, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage* dst, IProcessor::TReadCallback callback); - void DeferStartExecutorTask(const IExecutor::TPtr& executor, IExecutor::TFunction task); - void DeferAbortSession(const IErrorHandler::TPtr& errorHandler, TSessionClosedEvent&& closeEvent); - void DeferAbortSession(const IErrorHandler::TPtr& errorHandler, EStatus statusCode, NYql::TIssues&& issues); - void DeferAbortSession(const IErrorHandler::TPtr& errorHandler, EStatus statusCode, const TString& message); - void DeferAbortSession(const IErrorHandler::TPtr& errorHandler, TPlainStatus&& status); - void DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl> session, const IErrorHandler::TPtr& errorHandler, TPlainStatus&& status); - void DeferStartSession(std::shared_ptr<TSingleClusterReadSessionImpl> session); + void DeferReadFromProcessor(const typename IProcessor<UseMigrationProtocol>::TPtr& processor, TServerMessage<UseMigrationProtocol>* dst, typename IProcessor<UseMigrationProtocol>::TReadCallback callback); + void DeferStartExecutorTask(const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, typename IAExecutor<UseMigrationProtocol>::TFunction task); + void DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent); + void DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, EStatus statusCode, NYql::TIssues&& issues); + void DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, EStatus statusCode, const TString& message); + void DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TPlainStatus&& status); + void DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TPlainStatus&& status); + void DeferStartSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session); void DeferSignalWaiter(TWaiter&& waiter); @@ -78,44 +145,44 @@ private: private: // Read. - IProcessor::TPtr Processor; - Ydb::PersQueue::V1::MigrationStreamingReadServerMessage* ReadDst = nullptr; - IProcessor::TReadCallback ReadCallback; + typename IProcessor<UseMigrationProtocol>::TPtr Processor; + TServerMessage<UseMigrationProtocol>* ReadDst = nullptr; + typename IProcessor<UseMigrationProtocol>::TReadCallback ReadCallback; // Executor tasks. - std::vector<std::pair<IExecutor::TPtr, IExecutor::TFunction>> ExecutorsTasks; + std::vector<std::pair<typename IAExecutor<UseMigrationProtocol>::TPtr, typename IAExecutor<UseMigrationProtocol>::TFunction>> ExecutorsTasks; // Abort session. - IErrorHandler::TPtr ErrorHandler; - TMaybe<TSessionClosedEvent> SessionClosedEvent; + typename IErrorHandler<UseMigrationProtocol>::TPtr ErrorHandler; + TMaybe<TASessionClosedEvent<UseMigrationProtocol>> SessionClosedEvent; // Waiters. std::vector<TWaiter> Waiters; // Reconnection. - std::shared_ptr<TSingleClusterReadSessionImpl> Session; + std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session; TPlainStatus ReconnectionStatus; // Session to start - std::vector<std::shared_ptr<TSingleClusterReadSessionImpl>> Sessions; - + std::vector<std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>>> Sessions; }; +template <bool UseMigrationProtocol> class TDataDecompressionInfo { public: TDataDecompressionInfo(const TDataDecompressionInfo&) = default; TDataDecompressionInfo(TDataDecompressionInfo&&) = default; TDataDecompressionInfo( - Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::PartitionData&& msg, - std::weak_ptr<TSingleClusterReadSessionImpl> session, + TPartitionData<UseMigrationProtocol>&& msg, + std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, bool doDecompress ); - i64 StartDecompressionTasks(const IExecutor::TPtr& executor, + i64 StartDecompressionTasks(const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, i64 availableMemory, double averageCompressionRatio, - const TIntrusivePtr<TPartitionStreamImpl>& partitionStream, - TDeferredActions& deferred); + const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>& partitionStream, + TDeferredActions<UseMigrationProtocol>& deferred); bool IsReady() const { return SourceDataNotProcessed == 0; @@ -130,11 +197,11 @@ public: return CompressedDataSize; } - const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::PartitionData& GetServerMessage() const { + const TPartitionData<UseMigrationProtocol>& GetServerMessage() const { return ServerMessage; } - Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::PartitionData& GetServerMessage() { + TPartitionData<UseMigrationProtocol>& GetServerMessage() { return ServerMessage; } @@ -156,15 +223,15 @@ public: return ret; } - TWriteSessionMeta::TPtr GetBatchMeta(size_t batchIndex) const { + typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr GetBatchMeta(size_t batchIndex) const { Y_ASSERT(batchIndex < BatchesMeta.size()); return BatchesMeta[batchIndex]; } // Takes data. Returns true if event has more unpacked data. - bool TakeData(const TIntrusivePtr<TPartitionStreamImpl>& partitionStream, - TVector<TReadSessionEvent::TDataReceivedEvent::TMessage>* messages, - TVector<TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>* compressedMessages, + bool TakeData(const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>& partitionStream, + TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages, + TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages, size_t* maxByteSize); bool HasMoreData() const { @@ -185,7 +252,7 @@ private: }; struct TDecompressionTask { - explicit TDecompressionTask(TDataDecompressionInfo* parent, TIntrusivePtr<TPartitionStreamImpl> partitionStream, TReadyMessageThreshold* ready); + explicit TDecompressionTask(TDataDecompressionInfo* parent, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, TReadyMessageThreshold* ready); // Decompress and notify about memory consumption changes. void operator()(); @@ -201,7 +268,7 @@ private: private: TDataDecompressionInfo* Parent; - TIntrusivePtr<TPartitionStreamImpl> PartitionStream; + TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> PartitionStream; i64 SourceDataSize = 0; i64 EstimatedDecompressedSize = 0; i64 DecompressedSize = 0; @@ -216,9 +283,9 @@ private: void BuildBatchesMeta(); private: - Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::PartitionData ServerMessage; - std::vector<TWriteSessionMeta::TPtr> BatchesMeta; - std::weak_ptr<TSingleClusterReadSessionImpl> Session; + TPartitionData<UseMigrationProtocol> ServerMessage; + std::vector<typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr> BatchesMeta; + std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session; bool DoDecompress; i64 CompressedDataSize = 0; std::atomic<i64> SourceDataNotProcessed = 0; @@ -233,45 +300,50 @@ private: std::vector<std::vector<std::exception_ptr>> DecompressionErrors; }; +template <bool UseMigrationProtocol> struct IUserRetrievedEventCallback { virtual ~IUserRetrievedEventCallback() = default; - virtual void OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) = 0; + virtual void OnUserRetrievedEvent(const typename TAReadSessionEvent<UseMigrationProtocol>::TEvent& event) = 0; }; +template <bool UseMigrationProtocol> struct TReadSessionEventInfo { - using TEvent = TReadSessionEvent::TEvent; + using TEvent = typename TAReadSessionEvent<UseMigrationProtocol>::TEvent; + using TDataReceivedEvent = typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent; + using TMessage = typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage; + using TCompressedMessage = typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage; // Event with only partition stream ref. // Partition stream holds all its events. - TIntrusivePtr<TPartitionStreamImpl> PartitionStream; + TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> PartitionStream; TMaybe<TEvent> Event; - std::weak_ptr<IUserRetrievedEventCallback> Session; + std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> Session; // Close event. - TReadSessionEventInfo(const TSessionClosedEvent& event, std::weak_ptr<IUserRetrievedEventCallback> session = {}) + TReadSessionEventInfo(const TASessionClosedEvent<UseMigrationProtocol>& event, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session = {}) : Event(TEvent(event)) , Session(session) { } // Usual event. - TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl> partitionStream, std::weak_ptr<IUserRetrievedEventCallback> session, TEvent event); + TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, TEvent event); // Data event. - TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl> partitionStream, std::weak_ptr<IUserRetrievedEventCallback> session); + TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session); - TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl> partitionStream, - std::weak_ptr<IUserRetrievedEventCallback> session, - TVector<TReadSessionEvent::TDataReceivedEvent::TMessage> messages, - TVector<TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> compressedMessages); + TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, + std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, + TVector<TMessage> messages, + TVector<TCompressedMessage> compressedMessages); bool IsEmpty() const; bool IsDataEvent() const; // Takes data. Returns true if event has more unpacked data. - bool TakeData(TVector<TReadSessionEvent::TDataReceivedEvent::TMessage>* messages, - TVector<TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>* comressedMessages, + bool TakeData(TVector<TMessage>* messages, + TVector<TCompressedMessage>* comressedMessages, size_t* maxByteSize); TEvent& GetEvent() { @@ -290,67 +362,71 @@ struct TReadSessionEventInfo { bool HasReadyUnreadData() const; // Has ready unread data. bool IsSessionClosedEvent() const { - return Event && std::holds_alternative<TSessionClosedEvent>(*Event); + return Event && std::holds_alternative<TASessionClosedEvent<UseMigrationProtocol>>(*Event); } }; // Raw data with maybe uncompressed parts or other read session event. +template <bool UseMigrationProtocol> struct TRawPartitionStreamEvent { - std::variant<TDataDecompressionInfo, TReadSessionEvent::TEvent> Event; + using TEvent = typename TAReadSessionEvent<UseMigrationProtocol>::TEvent; + + std::variant<TDataDecompressionInfo<UseMigrationProtocol>, TEvent> Event; bool Signalled = false; TRawPartitionStreamEvent(const TRawPartitionStreamEvent&) = default; TRawPartitionStreamEvent(TRawPartitionStreamEvent&&) = default; TRawPartitionStreamEvent( - Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::PartitionData&& msg, - std::weak_ptr<TSingleClusterReadSessionImpl> session, + TPartitionData<UseMigrationProtocol>&& msg, + std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, bool doDecompress ) - : Event(std::in_place_type_t<TDataDecompressionInfo>(), std::move(msg), std::move(session), doDecompress) + : Event(std::in_place_type_t<TDataDecompressionInfo<UseMigrationProtocol>>(), std::move(msg), std::move(session), doDecompress) { } template <class T> explicit TRawPartitionStreamEvent(T&& event) - : Event(std::in_place_type_t<TReadSessionEvent::TEvent>(), std::forward<T>(event)) + : Event(std::in_place_type_t<TEvent>(), std::forward<T>(event)) { } bool IsDataEvent() const { - return std::holds_alternative<TDataDecompressionInfo>(Event); + return std::holds_alternative<TDataDecompressionInfo<UseMigrationProtocol>>(Event); } - const TDataDecompressionInfo& GetData() const { + const TDataDecompressionInfo<UseMigrationProtocol>& GetData() const { Y_ASSERT(IsDataEvent()); - return std::get<TDataDecompressionInfo>(Event); + return std::get<TDataDecompressionInfo<UseMigrationProtocol>>(Event); } - TDataDecompressionInfo& GetData() { + TDataDecompressionInfo<UseMigrationProtocol>& GetData() { Y_ASSERT(IsDataEvent()); - return std::get<TDataDecompressionInfo>(Event); + return std::get<TDataDecompressionInfo<UseMigrationProtocol>>(Event); } - TReadSessionEvent::TEvent& GetEvent() { + TEvent& GetEvent() { Y_ASSERT(!IsDataEvent()); - return std::get<TReadSessionEvent::TEvent>(Event); + return std::get<TEvent>(Event); } - const TReadSessionEvent::TEvent& GetEvent() const { + const TEvent& GetEvent() const { Y_ASSERT(!IsDataEvent()); - return std::get<TReadSessionEvent::TEvent>(Event); + return std::get<TEvent>(Event); } bool IsReady() const { return !IsDataEvent() || GetData().IsReady(); } - void Signal(TPartitionStreamImpl* partitionStream, TReadSessionEventsQueue* queue, TDeferredActions& deferred); + void Signal(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, TReadSessionEventsQueue<UseMigrationProtocol>* queue, TDeferredActions<UseMigrationProtocol>& deferred); }; -class TPartitionStreamImpl : public TPartitionStream { +template <bool UseMigrationProtocol> +class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> { public: struct TKey { // Hash<TKey> is defined later in this file. TString Topic; @@ -365,30 +441,49 @@ public: } }; - TPartitionStreamImpl(ui64 partitionStreamId, + TPartitionStreamImpl<true>(ui64 partitionStreamId, TString topicPath, TString cluster, ui64 partitionGroupId, ui64 partitionId, ui64 assignId, ui64 readOffset, - std::weak_ptr<TSingleClusterReadSessionImpl> parentSession, - IErrorHandler::TPtr errorHandler) + std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> parentSession, + typename IErrorHandler<UseMigrationProtocol>::TPtr errorHandler) : Key{topicPath, cluster, partitionId} , AssignId(assignId) , FirstNotReadOffset(readOffset) , Session(std::move(parentSession)) , ErrorHandler(std::move(errorHandler)) { - PartitionStreamId = partitionStreamId; - TopicPath = std::move(topicPath); - Cluster = std::move(cluster); - PartitionGroupId = partitionGroupId; - PartitionId = partitionId; + TAPartitionStream<true>::PartitionStreamId = partitionStreamId; + TAPartitionStream<true>::TopicPath = std::move(topicPath); + TAPartitionStream<true>::Cluster = std::move(cluster); + TAPartitionStream<true>::PartitionGroupId = partitionGroupId; + TAPartitionStream<true>::PartitionId = partitionId; MaxCommittedOffset = readOffset; } - ~TPartitionStreamImpl(); + TPartitionStreamImpl<false>(ui64 partitionStreamId, + TString topicPath, + i64 partitionId, + i64 assignId, + i64 readOffset, + std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> parentSession, + typename IErrorHandler<UseMigrationProtocol>::TPtr errorHandler) + : Key{topicPath, "", static_cast<ui64>(partitionId)} + , AssignId(static_cast<ui64>(assignId)) + , FirstNotReadOffset(static_cast<ui64>(readOffset)) + , Session(std::move(parentSession)) + , ErrorHandler(std::move(errorHandler)) + { + TAPartitionStream<false>::PartitionSessionId = static_cast<i64>(partitionStreamId); + TAPartitionStream<false>::TopicPath = std::move(topicPath); + TAPartitionStream<false>::PartitionId = partitionId; + MaxCommittedOffset = static_cast<ui64>(readOffset); + } + + ~TPartitionStreamImpl() = default; ui64 GetFirstNotReadOffset() const { return FirstNotReadOffset; @@ -420,8 +515,8 @@ public: EventsQueue.emplace_back(std::forward<T>(event)); } - TDataDecompressionInfo& InsertDataEvent( - Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::PartitionData&& msg, + TDataDecompressionInfo<UseMigrationProtocol>& InsertDataEvent( + TPartitionData<UseMigrationProtocol>&& msg, bool doDecompress ) { ++DataDecompressionEventsCount; @@ -436,11 +531,11 @@ public: return !EventsQueue.empty(); } - TRawPartitionStreamEvent& TopEvent() { + TRawPartitionStreamEvent<UseMigrationProtocol>& TopEvent() { return EventsQueue.front(); } - const TRawPartitionStreamEvent& TopEvent() const { + const TRawPartitionStreamEvent<UseMigrationProtocol>& TopEvent() const { return EventsQueue.front(); } @@ -451,15 +546,15 @@ public: EventsQueue.pop_front(); } - std::weak_ptr<TSingleClusterReadSessionImpl> GetSession() const { + std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> GetSession() const { return Session; } TLog GetLog() const; - void SignalReadyEvents(TReadSessionEventsQueue* queue, TDeferredActions& deferred); + void SignalReadyEvents(TReadSessionEventsQueue<UseMigrationProtocol>* queue, TDeferredActions<UseMigrationProtocol>& deferred); - const IErrorHandler::TPtr& GetErrorHandler() const { + const typename IErrorHandler<UseMigrationProtocol>::TPtr& GetErrorHandler() const { return ErrorHandler; } @@ -499,8 +594,15 @@ public: bool AddToCommitRanges(const ui64 startOffset, const ui64 endOffset, bool rangesMode) { if (ClientCommits.Intersects(startOffset, endOffset) || startOffset < MaxCommittedOffset) { + auto id = [this](){ + if constexpr (UseMigrationProtocol) { + return this->PartitionStreamId; + } else { + return this->PartitionSessionId; + } + }(); ThrowFatalError(TStringBuilder() << "Invalid offset range [" << startOffset << ", " << endOffset << ") : range must start from " - << MaxCommittedOffset << " or has some offsets that are committed already. Partition stream id: " << PartitionStreamId << Endl); + << MaxCommittedOffset << " or has some offsets that are committed already. Partition stream id: -" << id << Endl); return false; } if (rangesMode) { // Otherwise no need to send it to server. @@ -516,9 +618,9 @@ private: const TKey Key; ui64 AssignId; ui64 FirstNotReadOffset; - std::weak_ptr<TSingleClusterReadSessionImpl> Session; - IErrorHandler::TPtr ErrorHandler; - std::deque<TRawPartitionStreamEvent> EventsQueue; + std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session; + typename IErrorHandler<UseMigrationProtocol>::TPtr ErrorHandler; + std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> EventsQueue; size_t DataDecompressionEventsCount = 0; ui64 MaxReadOffset = 0; ui64 MaxCommittedOffset = 0; @@ -527,22 +629,30 @@ private: TDisjointIntervalTree<ui64> ClientCommits; }; - -class TReadSessionEventsQueue : public TBaseSessionEventsQueue<TReadSessionSettings, TReadSessionEvent::TEvent, TReadSessionEventInfo> { - using TParent = TBaseSessionEventsQueue<TReadSessionSettings, TReadSessionEvent::TEvent, TReadSessionEventInfo>; +template <bool UseMigrationProtocol> +class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSettings<UseMigrationProtocol>, + typename TAReadSessionEvent<UseMigrationProtocol>::TEvent, + TASessionClosedEvent<UseMigrationProtocol>, + IAExecutor<UseMigrationProtocol>, + TReadSessionEventInfo<UseMigrationProtocol>> { + using TParent = TBaseSessionEventsQueue<TAReadSessionSettings<UseMigrationProtocol>, + typename TAReadSessionEvent<UseMigrationProtocol>::TEvent, + TASessionClosedEvent<UseMigrationProtocol>, + IAExecutor<UseMigrationProtocol>, + TReadSessionEventInfo<UseMigrationProtocol>>; public: - explicit TReadSessionEventsQueue(const TSettings& settings, std::weak_ptr<IUserRetrievedEventCallback> session); + explicit TReadSessionEventsQueue(const TAReadSessionSettings<UseMigrationProtocol>& settings, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session); - TMaybe<TEventInfo> GetDataEventImpl(TEventInfo& srcDataEventInfo, size_t* maxByteSize); // Assumes that we're under lock. + TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> GetDataEventImpl(TReadSessionEventInfo<UseMigrationProtocol>& srcDataEventInfo, size_t* maxByteSize); // Assumes that we're under lock. - TMaybe<TEventInfo> TryGetEventImpl(size_t* maxByteSize) { // Assumes that we're under lock. - Y_ASSERT(HasEventsImpl()); - TVector<TReadSessionEvent::TDataReceivedEvent::TMessage> messages; - if (!Events.empty()) { - TEventInfo event = std::move(Events.front()); - Events.pop(); - RenewWaiterImpl(); + TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> TryGetEventImpl(size_t* maxByteSize) { // Assumes that we're under lock. + Y_ASSERT(TParent::HasEventsImpl()); + TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage> messages; + if (!TParent::Events.empty()) { + TReadSessionEventInfo<UseMigrationProtocol> event = std::move(TParent::Events.front()); + TParent::Events.pop(); + TParent::RenewWaiterImpl(); auto partitionStream = event.PartitionStream; if (!partitionStream->HasEvents()) { @@ -554,43 +664,43 @@ public: return GetDataEventImpl(event, maxByteSize); } - event = TReadSessionEventInfo(partitionStream.Get(), event.Session, partitionStream->TopEvent().GetEvent()); + event = TReadSessionEventInfo<UseMigrationProtocol>(partitionStream.Get(), event.Session, partitionStream->TopEvent().GetEvent()); partitionStream->PopEvent(); return event; } - Y_ASSERT(CloseEvent); - return TEventInfo(*CloseEvent, Session); + Y_ASSERT(TParent::CloseEvent); + return TReadSessionEventInfo<UseMigrationProtocol>(*TParent::CloseEvent, Session); } - TMaybe<TEventInfo> GetEventImpl(size_t* maxByteSize) { // Assumes that we're under lock and that the event queue has events. + TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> GetEventImpl(size_t* maxByteSize) { // Assumes that we're under lock and that the event queue has events. do { - TMaybe<TEventInfo> result = TryGetEventImpl(maxByteSize); // We could have read all the data in current message previous time. + TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> result = TryGetEventImpl(maxByteSize); // We could have read all the data in current message previous time. if (result) { return result; } - } while (HasEventsImpl()); + } while (TParent::HasEventsImpl()); return Nothing(); } - TVector<TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing(), size_t maxByteSize = std::numeric_limits<size_t>::max()) { - TVector<TEventInfo> eventInfos; + TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing(), size_t maxByteSize = std::numeric_limits<size_t>::max()) { + TVector<TReadSessionEventInfo<UseMigrationProtocol>> eventInfos; const size_t maxCount = maxEventsCount ? *maxEventsCount : std::numeric_limits<size_t>::max(); - TDeferredActions deferred; - std::vector<TIntrusivePtr<TPartitionStreamImpl>> partitionStreamsForSignalling; - with_lock (Mutex) { - eventInfos.reserve(Min(Events.size() + CloseEvent.Defined(), maxCount)); + TDeferredActions<UseMigrationProtocol> deferred; + std::vector<TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>> partitionStreamsForSignalling; + with_lock (TParent::Mutex) { + eventInfos.reserve(Min(TParent::Events.size() + TParent::CloseEvent.Defined(), maxCount)); do { if (block) { - WaitEventsImpl(); + TParent::WaitEventsImpl(); } ApplyCallbacksToReadyEventsImpl(deferred); - while (HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) { - TMaybe<TEventInfo> event = GetEventImpl(&maxByteSize); + while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) { + TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> event = GetEventImpl(&maxByteSize); if (event) { - const TIntrusivePtr<TPartitionStreamImpl> partitionStreamForSignalling = event->IsDataEvent() ? event->PartitionStream : nullptr; + const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStreamForSignalling = event->IsDataEvent() ? event->PartitionStream : nullptr; eventInfos.emplace_back(std::move(*event)); if (eventInfos.back().IsSessionClosedEvent()) { break; @@ -607,28 +717,28 @@ public: } } - TVector<TEvent> result; + TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> result; result.reserve(eventInfos.size()); - for (TEventInfo& eventInfo : eventInfos) { + for (TReadSessionEventInfo<UseMigrationProtocol>& eventInfo : eventInfos) { eventInfo.OnUserRetrievedEvent(); result.emplace_back(std::move(eventInfo.GetEvent())); } return result; } - TMaybe<TEvent> GetEvent(bool block = false, size_t maxByteSize = std::numeric_limits<size_t>::max()) { - TMaybe<TEventInfo> eventInfo; - TDeferredActions deferred; - with_lock (Mutex) { - TIntrusivePtr<TPartitionStreamImpl> partitionStreamForSignalling; + TMaybe<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> GetEvent(bool block = false, size_t maxByteSize = std::numeric_limits<size_t>::max()) { + TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> eventInfo; + TDeferredActions<UseMigrationProtocol> deferred; + with_lock (TParent::Mutex) { + TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStreamForSignalling; do { if (block) { - WaitEventsImpl(); + TParent::WaitEventsImpl(); } const bool appliedCallbacks = ApplyCallbacksToReadyEventsImpl(deferred); - if (HasEventsImpl()) { + if (TParent::HasEventsImpl()) { eventInfo = GetEventImpl(&maxByteSize); if (eventInfo && eventInfo->IsDataEvent()) { partitionStreamForSignalling = eventInfo->PartitionStream; @@ -650,37 +760,39 @@ public: } } - void Close(const TSessionClosedEvent& event, TDeferredActions& deferred) { + void Close(const TASessionClosedEvent<UseMigrationProtocol>& event, TDeferredActions<UseMigrationProtocol>& deferred) { TWaiter waiter; - with_lock (Mutex) { - CloseEvent = event; - Closed = true; - waiter = TWaiter(Waiter.ExtractPromise(), this); + with_lock (TParent::Mutex) { + TParent::CloseEvent = event; + TParent::Closed = true; + waiter = TWaiter(TParent::Waiter.ExtractPromise(), this); } - TEventInfo info(event); + TReadSessionEventInfo<UseMigrationProtocol> info(event); ApplyHandler(info, deferred); waiter.Signal(); } bool HasCallbackForNextEventImpl() const; - bool ApplyCallbacksToReadyEventsImpl(TDeferredActions& deferred); + bool ApplyCallbacksToReadyEventsImpl(TDeferredActions<UseMigrationProtocol>& deferred); // Push usual event. - void PushEvent(TReadSessionEventInfo eventInfo, TDeferredActions& deferred); + void PushEvent(TReadSessionEventInfo<UseMigrationProtocol> eventInfo, TDeferredActions<UseMigrationProtocol>& deferred); // Push data event. - TDataDecompressionInfo* PushDataEvent(TIntrusivePtr<TPartitionStreamImpl> partitionStream, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::PartitionData&& msg); + TDataDecompressionInfo<UseMigrationProtocol>* + PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, + TPartitionData<UseMigrationProtocol>&& msg); - void SignalEventImpl(TIntrusivePtr<TPartitionStreamImpl> partitionStream, TDeferredActions& deferred); // Assumes that we're under lock. + void SignalEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, TDeferredActions<UseMigrationProtocol>& deferred); // Assumes that we're under lock. - void SignalReadyEvents(TPartitionStreamImpl* partitionStream); + void SignalReadyEvents(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream); - void SignalReadyEventsImpl(TPartitionStreamImpl* partitionStream, TDeferredActions& deferred); // Assumes that we're under lock. + void SignalReadyEventsImpl(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, TDeferredActions<UseMigrationProtocol>& deferred); // Assumes that we're under lock. - void SignalWaiterImpl(TDeferredActions& deferred) { - TWaiter waiter = PopWaiterImpl(); + void SignalWaiterImpl(TDeferredActions<UseMigrationProtocol>& deferred) { + TWaiter waiter = TParent::PopWaiterImpl(); deferred.DeferSignalWaiter(std::move(waiter)); // No effect if waiter is empty. } @@ -688,60 +800,76 @@ public: private: struct THandlersVisitor : public TParent::TBaseHandlersVisitor { - THandlersVisitor(const TSettings& settings, TEventInfo& eventInfo, TDeferredActions& deferred) - : TBaseHandlersVisitor(settings, eventInfo) + THandlersVisitor(const TAReadSessionSettings<UseMigrationProtocol>& settings, TReadSessionEventInfo<UseMigrationProtocol>& eventInfo, TDeferredActions<UseMigrationProtocol>& deferred) + : TParent::TBaseHandlersVisitor(settings, eventInfo) , Deferred(deferred) {} #define DECLARE_HANDLER(type, handler, answer) \ bool operator()(type&) { \ - if (PushHandler<type>( \ - std::move(EventInfo), \ - Settings.EventHandlers_.handler, \ - Settings.EventHandlers_.CommonHandler_)) { \ + if (this->template PushHandler<type>( \ + std::move(TParent::TBaseHandlersVisitor::EventInfo), \ + this->Settings.EventHandlers_.handler, \ + this->Settings.EventHandlers_.CommonHandler_)) { \ return answer; \ } \ return false; \ } \ /**/ - DECLARE_HANDLER(TReadSessionEvent::TDataReceivedEvent, DataReceivedHandler_, true); - DECLARE_HANDLER(TReadSessionEvent::TCommitAcknowledgementEvent, CommitAcknowledgementHandler_, true); - DECLARE_HANDLER(TReadSessionEvent::TCreatePartitionStreamEvent, CreatePartitionStreamHandler_, true); - DECLARE_HANDLER(TReadSessionEvent::TDestroyPartitionStreamEvent, DestroyPartitionStreamHandler_, true); - DECLARE_HANDLER(TReadSessionEvent::TPartitionStreamStatusEvent, PartitionStreamStatusHandler_, true); - DECLARE_HANDLER(TReadSessionEvent::TPartitionStreamClosedEvent, PartitionStreamClosedHandler_, true); - DECLARE_HANDLER(TSessionClosedEvent, SessionClosedHandler_, false); // Not applied + DECLARE_HANDLER(typename TAReadSessionEvent<true>::TDataReceivedEvent, DataReceivedHandler_, true); + DECLARE_HANDLER(typename TAReadSessionEvent<true>::TCommitAcknowledgementEvent, CommitAcknowledgementHandler_, true); + DECLARE_HANDLER(typename TAReadSessionEvent<true>::TCreatePartitionStreamEvent, CreatePartitionStreamHandler_, true); + DECLARE_HANDLER(typename TAReadSessionEvent<true>::TDestroyPartitionStreamEvent, DestroyPartitionStreamHandler_, true); + DECLARE_HANDLER(typename TAReadSessionEvent<true>::TPartitionStreamStatusEvent, PartitionStreamStatusHandler_, true); + DECLARE_HANDLER(typename TAReadSessionEvent<true>::TPartitionStreamClosedEvent, PartitionStreamClosedHandler_, true); + + DECLARE_HANDLER(typename TAReadSessionEvent<false>::TDataReceivedEvent, DataReceivedHandler_, true); + DECLARE_HANDLER(typename TAReadSessionEvent<false>::TCommitOffsetAcknowledgementEvent, CommitOffsetAcknowledgementHandler_, true); + DECLARE_HANDLER(typename TAReadSessionEvent<false>::TStartPartitionSessionEvent, StartPartitionSessionHandler_, true); + DECLARE_HANDLER(typename TAReadSessionEvent<false>::TStopPartitionSessionEvent, StopPartitionSessionHandler_, true); + DECLARE_HANDLER(typename TAReadSessionEvent<false>::TPartitionSessionStatusEvent, PartitionSessionStatusHandler_, true); + DECLARE_HANDLER(typename TAReadSessionEvent<false>::TPartitionSessionClosedEvent, PartitionSessionClosedHandler_, true); + + DECLARE_HANDLER(TASessionClosedEvent<UseMigrationProtocol>, SessionClosedHandler_, false); // Not applied #undef DECLARE_HANDLER bool Visit() { - return std::visit(*this, EventInfo.GetEvent()); + return std::visit(*this, TParent::TBaseHandlersVisitor::EventInfo.GetEvent()); } - void Post(const IExecutor::TPtr& executor, IExecutor::TFunction&& f) { + void Post(const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, typename IAExecutor<UseMigrationProtocol>::TFunction&& f) { Deferred.DeferStartExecutorTask(executor, std::move(f)); } - TDeferredActions& Deferred; + TDeferredActions<UseMigrationProtocol>& Deferred; }; - bool ApplyHandler(TEventInfo& eventInfo, TDeferredActions& deferred) { - THandlersVisitor visitor(Settings, eventInfo, deferred); + bool ApplyHandler(TReadSessionEventInfo<UseMigrationProtocol>& eventInfo, TDeferredActions<UseMigrationProtocol>& deferred) { + THandlersVisitor visitor(this->Settings, eventInfo, deferred); return visitor.Visit(); } private: bool HasEventCallbacks; - std::weak_ptr<IUserRetrievedEventCallback> Session; + std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> Session; }; - - } // namespace NYdb::NPersQueue template <> -struct THash<NYdb::NPersQueue::TPartitionStreamImpl::TKey> { - size_t operator()(const NYdb::NPersQueue::TPartitionStreamImpl::TKey& key) const { +struct THash<NYdb::NPersQueue::TPartitionStreamImpl<false>::TKey> { + size_t operator()(const NYdb::NPersQueue::TPartitionStreamImpl<false>::TKey& key) const { + THash<TString> strHash; + const size_t h1 = strHash(key.Topic); + const size_t h2 = NumericHash(key.Partition); + return CombineHashes(h1, h2); + } +}; + +template <> +struct THash<NYdb::NPersQueue::TPartitionStreamImpl<true>::TKey> { + size_t operator()(const NYdb::NPersQueue::TPartitionStreamImpl<true>::TKey& key) const { THash<TString> strHash; const size_t h1 = strHash(key.Topic); const size_t h2 = strHash(key.Cluster); @@ -756,23 +884,24 @@ namespace NYdb::NPersQueue { // This class holds only read session logic. // It is parametrized with output queue for client events // and connection factory interface to separate logic from transport. -class TSingleClusterReadSessionImpl : public std::enable_shared_from_this<TSingleClusterReadSessionImpl>, - public IUserRetrievedEventCallback { +template <bool UseMigrationProtocol> +class TSingleClusterReadSessionImpl : public std::enable_shared_from_this<TSingleClusterReadSessionImpl<UseMigrationProtocol>>, + public IUserRetrievedEventCallback<UseMigrationProtocol> { public: - using TPtr = std::shared_ptr<TSingleClusterReadSessionImpl>; - using IProcessor = IReadSessionConnectionProcessorFactory::IProcessor; + using TPtr = std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>>; + using IProcessor = typename IReadSessionConnectionProcessorFactory<UseMigrationProtocol>::IProcessor; - friend class TPartitionStreamImpl; + friend class TPartitionStreamImpl<UseMigrationProtocol>; TSingleClusterReadSessionImpl( - const TReadSessionSettings& settings, + const TAReadSessionSettings<UseMigrationProtocol>& settings, const TString& database, const TString& sessionId, const TString& clusterName, const TLog& log, - std::shared_ptr<IReadSessionConnectionProcessorFactory> connectionFactory, - std::shared_ptr<TReadSessionEventsQueue> eventsQueue, - IErrorHandler::TPtr errorHandler, + std::shared_ptr<IReadSessionConnectionProcessorFactory<UseMigrationProtocol>> connectionFactory, + std::shared_ptr<TReadSessionEventsQueue<UseMigrationProtocol>> eventsQueue, + typename IErrorHandler<UseMigrationProtocol>::TPtr errorHandler, NGrpc::IQueueClientContextPtr clientContext, ui64 partitionStreamIdStart, ui64 partitionStreamIdStep ) @@ -792,19 +921,19 @@ public: } void Start(); - void ConfirmPartitionStreamCreate(const TPartitionStreamImpl* partitionStream, TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset); - void ConfirmPartitionStreamDestroy(TPartitionStreamImpl* partitionStream); - void RequestPartitionStreamStatus(const TPartitionStreamImpl* partitionStream); - void Commit(const TPartitionStreamImpl* partitionStream, ui64 startOffset, ui64 endOffset); + void ConfirmPartitionStreamCreate(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset); + void ConfirmPartitionStreamDestroy(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream); + void RequestPartitionStreamStatus(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream); + void Commit(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, ui64 startOffset, ui64 endOffset); void OnCreateNewDecompressionTask(); void OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount); - TReadSessionEventsQueue* GetEventsQueue() { + TReadSessionEventsQueue<UseMigrationProtocol>* GetEventsQueue() { return EventsQueue.get(); } - void OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) override; + void OnUserRetrievedEvent(const typename TAReadSessionEvent<UseMigrationProtocol>::TEvent& event) override; void Abort(); void Close(std::function<void()> callback); @@ -826,13 +955,13 @@ public: } private: - void BreakConnectionAndReconnectImpl(TPlainStatus&& status, TDeferredActions& deferred); + void BreakConnectionAndReconnectImpl(TPlainStatus&& status, TDeferredActions<UseMigrationProtocol>& deferred); - void BreakConnectionAndReconnectImpl(EStatus statusCode, NYql::TIssues&& issues, TDeferredActions& deferred) { + void BreakConnectionAndReconnectImpl(EStatus statusCode, NYql::TIssues&& issues, TDeferredActions<UseMigrationProtocol>& deferred) { BreakConnectionAndReconnectImpl(TPlainStatus(statusCode, std::move(issues)), deferred); } - void BreakConnectionAndReconnectImpl(EStatus statusCode, const TString& message, TDeferredActions& deferred) { + void BreakConnectionAndReconnectImpl(EStatus statusCode, const TString& message, TDeferredActions<UseMigrationProtocol>& deferred) { BreakConnectionAndReconnectImpl(TPlainStatus(statusCode, message), deferred); } @@ -840,27 +969,25 @@ private: void OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext); void OnConnect(TPlainStatus&&, typename IProcessor::TPtr&&, const NGrpc::IQueueClientContextPtr& connectContext); - void DestroyAllPartitionStreamsImpl(TDeferredActions& deferred); // Destroy all streams before setting new connection // Assumes that we're under lock. + void DestroyAllPartitionStreamsImpl(TDeferredActions<UseMigrationProtocol>& deferred); // Destroy all streams before setting new connection // Assumes that we're under lock. // Initing. - void InitImpl(TDeferredActions& deferred); // Assumes that we're under lock. + inline void InitImpl(TDeferredActions<UseMigrationProtocol>& deferred); // Assumes that we're under lock. // Working logic. void ContinueReadingDataImpl(); // Assumes that we're under lock. - bool IsActualPartitionStreamImpl(const TPartitionStreamImpl* partitionStream); // Assumes that we're under lock. + bool IsActualPartitionStreamImpl(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream); // Assumes that we're under lock. // Read/Write. - void ReadFromProcessorImpl(TDeferredActions& deferred); // Assumes that we're under lock. - void WriteToProcessorImpl(Ydb::PersQueue::V1::MigrationStreamingReadClientMessage&& req); // Assumes that we're under lock. + void ReadFromProcessorImpl(TDeferredActions<UseMigrationProtocol>& deferred); // Assumes that we're under lock. + void WriteToProcessorImpl(TClientMessage<UseMigrationProtocol>&& req); // Assumes that we're under lock. void OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration); - void OnReadDoneImpl(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::InitResponse&& msg, TDeferredActions& deferred); // Assumes that we're under lock. - void OnReadDoneImpl(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch&& msg, TDeferredActions& deferred); // Assumes that we're under lock. - void OnReadDoneImpl(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::Assigned&& msg, TDeferredActions& deferred); // Assumes that we're under lock. - void OnReadDoneImpl(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::Release&& msg, TDeferredActions& deferred); // Assumes that we're under lock. - void OnReadDoneImpl(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::Committed&& msg, TDeferredActions& deferred); // Assumes that we're under lock. - void OnReadDoneImpl(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::PartitionStatus&& msg, TDeferredActions& deferred); // Assumes that we're under lock. - void StartDecompressionTasksImpl(TDeferredActions& deferred); // Assumes that we're under lock. + // Assumes that we're under lock. + template<typename TMessage> + inline void OnReadDoneImpl(TMessage&& msg, TDeferredActions<UseMigrationProtocol>& deferred); + + void StartDecompressionTasksImpl(TDeferredActions<UseMigrationProtocol>& deferred); // Assumes that we're under lock. i64 GetCompressedDataSizeLimit() const { const double overallLimit = static_cast<double>(Settings.MaxMemoryUsageBytes_); @@ -906,7 +1033,7 @@ private: using TPtr = TIntrusivePtr<TCookie>; - explicit TCookie(ui64 cookie, TIntrusivePtr<TPartitionStreamImpl> partitionStream) + explicit TCookie(ui64 cookie, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream) : Cookie(cookie) , PartitionStream(std::move(partitionStream)) { @@ -923,24 +1050,24 @@ private: } ui64 Cookie = 0; - TIntrusivePtr<TPartitionStreamImpl> PartitionStream; + TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> PartitionStream; std::pair<ui64, ui64> OffsetRange; size_t UncommittedMessagesLeft = 0; }; - explicit TPartitionCookieMapping(IErrorHandler::TPtr errorHandler) + explicit TPartitionCookieMapping(typename IErrorHandler<UseMigrationProtocol>::TPtr errorHandler) : ErrorHandler(std::move(errorHandler)) { } - bool AddMapping(const TCookie::TPtr& cookie); + bool AddMapping(const typename TCookie::TPtr& cookie); // Removes (partition stream, offset) from mapping. // Returns cookie ptr if this was the last message, otherwise nullptr. - TCookie::TPtr CommitOffset(ui64 partitionStreamId, ui64 offset); + typename TCookie::TPtr CommitOffset(ui64 partitionStreamId, ui64 offset); // Gets and then removes committed cookie from mapping. - TCookie::TPtr RetrieveCommittedCookie(const Ydb::PersQueue::V1::CommitCookie& cookieProto); + typename TCookie::TPtr RetrieveCommittedCookie(const Ydb::PersQueue::V1::CommitCookie& cookieProto); // Removes mapping on partition stream. void RemoveMapping(ui64 partitionStreamId); @@ -951,43 +1078,43 @@ private: bool HasUnacknowledgedCookies() const; private: - THashMap<TCookie::TKey, TCookie::TPtr, TCookie::TKey::THash> Cookies; - THashMap<std::pair<ui64, ui64>, TCookie::TPtr> UncommittedOffsetToCookie; // (Partition stream id, Offset) -> Cookie. - THashMultiMap<ui64, TCookie::TPtr> PartitionStreamIdToCookie; - IErrorHandler::TPtr ErrorHandler; + THashMap<typename TCookie::TKey, typename TCookie::TPtr, typename TCookie::TKey::THash> Cookies; + THashMap<std::pair<ui64, ui64>, typename TCookie::TPtr> UncommittedOffsetToCookie; // (Partition stream id, Offset) -> Cookie. + THashMultiMap<ui64, typename TCookie::TPtr> PartitionStreamIdToCookie; + typename IErrorHandler<UseMigrationProtocol>::TPtr ErrorHandler; size_t CommitInflight = 0; // Commit inflight to server. }; struct TDecompressionQueueItem { - TDecompressionQueueItem(TDataDecompressionInfo* batchInfo, TIntrusivePtr<TPartitionStreamImpl> partitionStream) + TDecompressionQueueItem(TDataDecompressionInfo<UseMigrationProtocol>* batchInfo, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream) : BatchInfo(batchInfo) , PartitionStream(std::move(partitionStream)) { } - TDataDecompressionInfo* BatchInfo; - TIntrusivePtr<TPartitionStreamImpl> PartitionStream; + TDataDecompressionInfo<UseMigrationProtocol>* BatchInfo; + TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> PartitionStream; }; private: - const TReadSessionSettings Settings; + const TAReadSessionSettings<UseMigrationProtocol> Settings; const TString Database; const TString SessionId; const TString ClusterName; TLog Log; ui64 NextPartitionStreamId; ui64 PartitionStreamIdStep; - std::shared_ptr<IReadSessionConnectionProcessorFactory> ConnectionFactory; - std::shared_ptr<TReadSessionEventsQueue> EventsQueue; - IErrorHandler::TPtr ErrorHandler; + std::shared_ptr<IReadSessionConnectionProcessorFactory<UseMigrationProtocol>> ConnectionFactory; + std::shared_ptr<TReadSessionEventsQueue<UseMigrationProtocol>> EventsQueue; + typename IErrorHandler<UseMigrationProtocol>::TPtr ErrorHandler; NGrpc::IQueueClientContextPtr ClientContext; // Common client context. NGrpc::IQueueClientContextPtr ConnectContext; NGrpc::IQueueClientContextPtr ConnectTimeoutContext; NGrpc::IQueueClientContextPtr ConnectDelayContext; size_t ConnectionGeneration = 0; TAdaptiveLock Lock; - IProcessor::TPtr Processor; - IRetryPolicy::IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting). + typename IProcessor::TPtr Processor; + typename IARetryPolicy<UseMigrationProtocol>::IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting). size_t ConnectionAttemptsDone = 0; // Memory usage. @@ -997,8 +1124,8 @@ private: TInstant UsageStatisticsLastUpdateTime = TInstant::Now(); bool WaitingReadResponse = false; - std::shared_ptr<Ydb::PersQueue::V1::MigrationStreamingReadServerMessage> ServerMessage; // Server message to write server response to. - THashMap<ui64, TIntrusivePtr<TPartitionStreamImpl>> PartitionStreams; // assignId -> Partition stream. + std::shared_ptr<TServerMessage<UseMigrationProtocol>> ServerMessage; // Server message to write server response to. + THashMap<ui64, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>> PartitionStreams; // assignId -> Partition stream. TPartitionCookieMapping CookieMapping; std::deque<TDecompressionQueueItem> DecompressionQueue; bool DataReadingSuspended = false; @@ -1015,7 +1142,7 @@ private: // This class communicates with cluster discovery service and then creates // sessions to each cluster. class TReadSession : public IReadSession, - public IUserRetrievedEventCallback, + public IUserRetrievedEventCallback<true>, public std::enable_shared_from_this<TReadSession> { struct TClusterSessionInfo { TClusterSessionInfo(const TString& cluster) @@ -1024,7 +1151,7 @@ class TReadSession : public IReadSession, } TString ClusterName; // In lower case - TSingleClusterReadSessionImpl::TPtr Session; + TSingleClusterReadSessionImpl<true>::TPtr Session; TVector<TTopicReadSettings> Topics; TString ClusterEndpoint; }; @@ -1091,17 +1218,17 @@ private: void StartClusterDiscovery(); void OnClusterDiscovery(const TStatus& status, const Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult& result); void ProceedWithoutClusterDiscovery(); - void RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions& deferred); - void CreateClusterSessionsImpl(TDeferredActions& deferred); + void RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions<true>& deferred); + void CreateClusterSessionsImpl(TDeferredActions<true>& deferred); // Shutdown. void Abort(EStatus statusCode, NYql::TIssues&& issues); void Abort(EStatus statusCode, const TString& message); - void AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions& deferred); - void AbortImpl(EStatus statusCode, NYql::TIssues&& issues, TDeferredActions& deferred); - void AbortImpl(EStatus statusCode, const TString& message, TDeferredActions& deferred); + void AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions<true>& deferred); + void AbortImpl(EStatus statusCode, NYql::TIssues&& issues, TDeferredActions<true>& deferred); + void AbortImpl(EStatus statusCode, const TString& message, TDeferredActions<true>& deferred); void OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) override; @@ -1116,10 +1243,10 @@ private: TLog Log; std::shared_ptr<TPersQueueClient::TImpl> Client; std::shared_ptr<TGRpcConnectionsImpl> Connections; - IErrorHandler::TPtr ErrorHandler; + typename IErrorHandler<true>::TPtr ErrorHandler; TDbDriverStatePtr DbDriverState; TAdaptiveLock Lock; - std::shared_ptr<TReadSessionEventsQueue> EventsQueue; + std::shared_ptr<TReadSessionEventsQueue<true>> EventsQueue; THashMap<TString, TClusterSessionInfo> ClusterSessions; // Cluster name (in lower case) -> TClusterSessionInfo NGrpc::IQueueClientContextPtr ClusterDiscoveryDelayContext; IRetryPolicy::IRetryState::TPtr ClusterDiscoveryRetryState; @@ -1133,3 +1260,9 @@ private: }; } // namespace NYdb::NPersQueue + +///////////////////////////////////////// +// Templates implementation +#define READ_SESSION_IMPL +#include "read_session.ipp" +#undef READ_SESSION_IMPL diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp new file mode 100644 index 0000000000..17701a52dd --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -0,0 +1,2334 @@ +#ifndef READ_SESSION_IMPL +#error "Do not include this file directly" +#endif +// #include "read_session.h" + +#include "persqueue_impl.h" +#include "common.h" + +#define INCLUDE_YDB_INTERNAL_H +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/logger/log.h> +#undef INCLUDE_YDB_INTERNAL_H + +#include <google/protobuf/util/time_util.h> + +#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h> +#include <util/generic/guid.h> +#include <util/generic/size_literals.h> +#include <util/generic/utility.h> +#include <util/generic/yexception.h> +#include <util/stream/mem.h> +#include <util/system/env.h> + +#include <variant> + +namespace NYdb::NTopic { + class TReadSession; +} + +namespace NYdb::NPersQueue { + +static const bool RangesMode = !GetEnv("PQ_OFFSET_RANGES_MODE").empty(); + +template <typename TReaderCounters> +void MakeCountersNotNull(TReaderCounters& counters); +template <typename TReaderCounters> +bool HasNullCounters(TReaderCounters& counters); + +template <bool UseMigrationProtocol> +class TErrorHandler : public IErrorHandler<UseMigrationProtocol> { + using TReadSession = typename std::conditional_t<UseMigrationProtocol, + NPersQueue::TReadSession, + NTopic::TReadSession>; +public: + TErrorHandler(std::weak_ptr<TReadSession> session) + : Session(std::move(session)) + { + } + + void AbortSession(TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) override; + +private: + std::weak_ptr<TReadSession> Session; +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TPartitionStreamImpl + +template<bool UseMigrationProtocol> +TLog TPartitionStreamImpl<UseMigrationProtocol>::GetLog() const { + if (auto session = Session.lock()) { + return session->GetLog(); + } + return {}; +} + +template<bool UseMigrationProtocol> +void TPartitionStreamImpl<UseMigrationProtocol>::Commit(ui64 startOffset, ui64 endOffset) { + std::vector<std::pair<ui64, ui64>> toCommit; + if (auto sessionShared = Session.lock()) { + Y_VERIFY(endOffset > startOffset); + with_lock(sessionShared->Lock) { + if (!AddToCommitRanges(startOffset, endOffset, true)) // Add range for real commit always. + return; + + Y_VERIFY(!Commits.Empty()); + for (auto c : Commits) { + if (c.first >= endOffset) break; // Commit only gaps before client range. + toCommit.emplace_back(c); + } + Commits.EraseInterval(0, endOffset); // Drop only committed ranges; + } + for (auto range: toCommit) { + sessionShared->Commit(this, range.first, range.second); + } + } +} + +template<bool UseMigrationProtocol> +void TPartitionStreamImpl<UseMigrationProtocol>::RequestStatus() { + if (auto sessionShared = Session.lock()) { + sessionShared->RequestPartitionStreamStatus(this); + } +} + +template<bool UseMigrationProtocol> +void TPartitionStreamImpl<UseMigrationProtocol>::ConfirmCreate(TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset) { + if (auto sessionShared = Session.lock()) { + sessionShared->ConfirmPartitionStreamCreate(this, readOffset, commitOffset); + } +} + +template<bool UseMigrationProtocol> +void TPartitionStreamImpl<UseMigrationProtocol>::ConfirmDestroy() { + if (auto sessionShared = Session.lock()) { + sessionShared->ConfirmPartitionStreamDestroy(this); + } +} + +template<bool UseMigrationProtocol> +void TPartitionStreamImpl<UseMigrationProtocol>::StopReading() { + Y_FAIL("Not implemented"); // TODO +} + +template<bool UseMigrationProtocol> +void TPartitionStreamImpl<UseMigrationProtocol>::ResumeReading() { + Y_FAIL("Not implemented"); // TODO +} + +template<bool UseMigrationProtocol> +void TPartitionStreamImpl<UseMigrationProtocol>::SignalReadyEvents(TReadSessionEventsQueue<UseMigrationProtocol>* queue, TDeferredActions<UseMigrationProtocol>& deferred) { + for (auto& event : EventsQueue) { + event.Signal(this, queue, deferred); + + if (!event.IsReady()) { + break; + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TSingleClusterReadSessionImpl + +template<bool UseMigrationProtocol> +TStringBuilder TSingleClusterReadSessionImpl<UseMigrationProtocol>::GetLogPrefix() const { + return TStringBuilder() << GetDatabaseLogPrefix(Database) << "[" << SessionId << "] [" << ClusterName << "] "; +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Start() { + Settings.DecompressionExecutor_->Start(); + Settings.EventHandlers_.HandlersExecutor_->Start(); + if (!Reconnect(TPlainStatus())) { + ErrorHandler->AbortSession(EStatus::ABORTED, "Driver is stopping"); + } +} + +template<bool UseMigrationProtocol> +bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlainStatus& status) { + TDuration delay = TDuration::Zero(); + NGrpc::IQueueClientContextPtr delayContext = nullptr; + NGrpc::IQueueClientContextPtr connectContext = ClientContext->CreateContext(); + NGrpc::IQueueClientContextPtr connectTimeoutContext = ClientContext->CreateContext(); + if (!connectContext || !connectTimeoutContext) { + return false; + } + + // Previous operations contexts. + NGrpc::IQueueClientContextPtr prevConnectContext; + NGrpc::IQueueClientContextPtr prevConnectTimeoutContext; + NGrpc::IQueueClientContextPtr prevConnectDelayContext; + + if (!status.Ok()) { + Log.Write(TLOG_INFO, GetLogPrefix() << "Got error. Status: " << status.Status + << ". Description: " << IssuesSingleLineString(status.Issues)); + } + + TDeferredActions<UseMigrationProtocol> deferred; + with_lock (Lock) { + if (Aborting) { + Cancel(connectContext); + Cancel(connectTimeoutContext); + return false; + } + Processor = nullptr; + WaitingReadResponse = false; + ServerMessage = std::make_shared<TServerMessage<UseMigrationProtocol>>(); + ++ConnectionGeneration; + if (RetryState) { + TMaybe<TDuration> nextDelay = RetryState->GetNextRetryDelay(status.Status); + if (nextDelay) { + delay = *nextDelay; + delayContext = ClientContext->CreateContext(); + if (!delayContext) { + return false; + } + Log.Write(TLOG_DEBUG, GetLogPrefix() + << "Reconnecting session to cluster " << ClusterName << " in " << delay); + } else { + return false; + } + } else { + RetryState = Settings.RetryPolicy_->CreateRetryState(); + } + ++ConnectionAttemptsDone; + + // Set new context + prevConnectContext = std::exchange(ConnectContext, connectContext); + prevConnectTimeoutContext = std::exchange(ConnectTimeoutContext, connectTimeoutContext); + prevConnectDelayContext = std::exchange(ConnectDelayContext, delayContext); + + Y_ASSERT(ConnectContext); + Y_ASSERT(ConnectTimeoutContext); + Y_ASSERT((delay == TDuration::Zero()) == !ConnectDelayContext); + + // Destroy all partition streams before connecting. + DestroyAllPartitionStreamsImpl(deferred); + } + + // Cancel previous operations. + Cancel(prevConnectContext); + Cancel(prevConnectTimeoutContext); + Cancel(prevConnectDelayContext); + + auto connectCallback = [weakThis = TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), + connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) { + if (auto sharedThis = weakThis.lock()) { + sharedThis->OnConnect(std::move(st), std::move(processor), + connectContext); // OnConnect could be called inplace! + } + }; + + auto connectTimeoutCallback = [weakThis = TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), + connectTimeoutContext = connectTimeoutContext](bool ok) { + if (ok) { + if (auto sharedThis = weakThis.lock()) { + sharedThis->OnConnectTimeout(connectTimeoutContext); + } + } + }; + + Y_ASSERT(connectContext); + Y_ASSERT(connectTimeoutContext); + Y_ASSERT((delay == TDuration::Zero()) == !delayContext); + ConnectionFactory->CreateProcessor( + std::move(connectCallback), + TRpcRequestSettings::Make(Settings), + std::move(connectContext), + TDuration::Seconds(30) /* connect timeout */, // TODO: make connect timeout setting. + std::move(connectTimeoutContext), + std::move(connectTimeoutCallback), + delay, + std::move(delayContext)); + return true; +} + +template <bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::BreakConnectionAndReconnectImpl( + TPlainStatus&& status, TDeferredActions<UseMigrationProtocol>& deferred) { + Log.Write(TLOG_INFO, + GetLogPrefix() << "Break connection due to unexpected message from server. Status: " << status.Status + << ", Issues: \"" << IssuesSingleLineString(status.Issues) << "\""); + + Processor->Cancel(); + Processor = nullptr; + RetryState = Settings.RetryPolicy_->CreateRetryState(); // Explicitly create retry state to determine whether we should connect to server again. + + deferred.DeferReconnection(TSingleClusterReadSessionImpl<UseMigrationProtocol>::shared_from_this(), ErrorHandler, std::move(status)); +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext) { + with_lock (Lock) { + if (ConnectTimeoutContext == connectTimeoutContext) { + Cancel(ConnectContext); + ConnectContext = nullptr; + ConnectTimeoutContext = nullptr; + ConnectDelayContext = nullptr; + + if (Closing || Aborting) { + CallCloseCallbackImpl(); + return; + } + } else { + return; + } + } + + ++*Settings.Counters_->Errors; + TStringBuilder description; + description << "Failed to establish connection to server. Attempts done: " << ConnectionAttemptsDone; + if (!Reconnect(TPlainStatus(EStatus::TIMEOUT, description))) { + ErrorHandler->AbortSession(EStatus::TIMEOUT, description); + } +} + +template <bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnConnect( + TPlainStatus&& st, typename IProcessor::TPtr&& processor, const NGrpc::IQueueClientContextPtr& connectContext) { + TDeferredActions<UseMigrationProtocol> deferred; + with_lock (Lock) { + if (ConnectContext == connectContext) { + Cancel(ConnectTimeoutContext); + ConnectContext = nullptr; + ConnectTimeoutContext = nullptr; + ConnectDelayContext = nullptr; + + if (Closing || Aborting) { + CallCloseCallbackImpl(); + return; + } + + if (st.Ok()) { + Processor = std::move(processor); + RetryState = nullptr; + ConnectionAttemptsDone = 0; + InitImpl(deferred); + return; + } + } else { + return; + } + } + + if (!st.Ok()) { + ++*Settings.Counters_->Errors; + if (!Reconnect(st)) { + ErrorHandler->AbortSession( + st.Status, MakeIssueWithSubIssues(TStringBuilder() << "Failed to establish connection to server \"" + << st.Endpoint << "\" ( cluster " << ClusterName + << "). Attempts done: " << ConnectionAttemptsDone, + st.Issues)); + } + } +} + +template<> +inline void TSingleClusterReadSessionImpl<true>::InitImpl(TDeferredActions<true>& deferred) { // Assumes that we're under lock. + Log.Write(TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session"); + TClientMessage<true> req; + auto& init = *req.mutable_init_request(); + init.set_ranges_mode(GetRangesMode()); + for (const TTopicReadSettings& topic : Settings.Topics_) { + auto* topicSettings = init.add_topics_read_settings(); + topicSettings->set_topic(topic.Path_); + if (topic.StartingMessageTimestamp_) { + topicSettings->set_start_from_written_at_ms(topic.StartingMessageTimestamp_->MilliSeconds()); + } + for (ui64 groupId : topic.PartitionGroupIds_) { + topicSettings->add_partition_group_ids(groupId); + } + } + init.set_consumer(Settings.ConsumerName_); + init.set_read_only_original(Settings.ReadOnlyOriginal_); + init.mutable_read_params()->set_max_read_size(Settings.MaxMemoryUsageBytes_); + if (Settings.MaxTimeLag_) { + init.set_max_lag_duration_ms(Settings.MaxTimeLag_->MilliSeconds()); + } + if (Settings.StartingMessageTimestamp_) { + init.set_start_from_written_at_ms(Settings.StartingMessageTimestamp_->MilliSeconds()); + } + + WriteToProcessorImpl(std::move(req)); + ReadFromProcessorImpl(deferred); +} + +template<> +inline void TSingleClusterReadSessionImpl<false>::InitImpl(TDeferredActions<false>& deferred) { // Assumes that we're under lock. + Log.Write(TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session"); + TClientMessage<false> req; + auto& init = *req.mutable_init_request(); + + init.set_consumer(Settings.ConsumerName_); + + for (const NTopic::TTopicReadSettings& topic : Settings.Topics_) { + auto* topicSettings = init.add_topics_read_settings(); + topicSettings->set_path(topic.Path_); + for (ui64 partitionId : topic.PartitionIds_) { + topicSettings->add_partition_ids(partitionId); + } + + if (topic.ReadFromTimestamp_) { + *topicSettings->mutable_read_from() = + ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(topic.ReadFromTimestamp_->MilliSeconds()); + } else if (Settings.ReadFromTimestamp_) { + *topicSettings->mutable_read_from() = + ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(Settings.ReadFromTimestamp_->MilliSeconds()); + } + + if (topic.MaxLag_) { + *topicSettings->mutable_max_lag() = + ::google::protobuf::util::TimeUtil::MillisecondsToDuration(topic.MaxLag_->MilliSeconds()); + } else if (Settings.ReadFromTimestamp_) { + *topicSettings->mutable_max_lag() = + ::google::protobuf::util::TimeUtil::MillisecondsToDuration(Settings.MaxLag_->MilliSeconds()); + } + } + + WriteToProcessorImpl(std::move(req)); + ReadFromProcessorImpl(deferred); +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ContinueReadingDataImpl() { // Assumes that we're under lock. + if (!Closing + && !Aborting + && !WaitingReadResponse + && !DataReadingSuspended + && Processor + && CompressedDataSize < GetCompressedDataSizeLimit() + && static_cast<size_t>(CompressedDataSize + DecompressedDataSize) < Settings.MaxMemoryUsageBytes_) + { + TClientMessage<UseMigrationProtocol> req; + if constexpr (UseMigrationProtocol) { + req.mutable_read(); + } else { + req.mutable_read_request()->set_bytes_size(GetCompressedDataSizeLimit() - CompressedDataSize); + // TODO account serverside bytes_size! + } + + WriteToProcessorImpl(std::move(req)); + WaitingReadResponse = true; + } +} + +template<bool UseMigrationProtocol> +ui64 GetPartitionStreamId(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream) { + if constexpr (UseMigrationProtocol) { + return partitionStream->GetPartitionStreamId(); + } else { + return partitionStream->GetPartitionSessionId(); + } +} + +template<bool UseMigrationProtocol> +TString GetCluster(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream) { + if constexpr (UseMigrationProtocol) { + return partitionStream->GetCluster(); + } else { + return "-"; + } +} + +template<bool UseMigrationProtocol> +bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::IsActualPartitionStreamImpl(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream) { // Assumes that we're under lock. + auto actualPartitionStreamIt = PartitionStreams.find(partitionStream->GetAssignId()); + return actualPartitionStreamIt != PartitionStreams.end() + && GetPartitionStreamId(actualPartitionStreamIt->second.Get()) == GetPartitionStreamId(partitionStream); +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStreamCreate(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset) { + TStringBuilder commitOffsetLogStr; + if (commitOffset) { + commitOffsetLogStr << ". Commit offset: " << *commitOffset; + } + Log.Write( + TLOG_INFO, + GetLogPrefix() << "Confirm partition stream create. Partition stream id: " << GetPartitionStreamId(partitionStream) + << ". Cluster: \"" << GetCluster(partitionStream) << "\". Topic: \"" << partitionStream->GetTopicPath() + << "\". Partition: " << partitionStream->GetPartitionId() + << ". Read offset: " << readOffset << commitOffsetLogStr + ); + + with_lock (Lock) { + if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation. + Log.Write( + TLOG_DEBUG, + GetLogPrefix() << "Skip partition stream create confirm. Partition stream id: " + << GetPartitionStreamId(partitionStream) + ); + return; + } + + TClientMessage<UseMigrationProtocol> req; + + if constexpr (UseMigrationProtocol) { + auto& startRead = *req.mutable_start_read(); + startRead.mutable_topic()->set_path(partitionStream->GetTopicPath()); + startRead.set_cluster(partitionStream->GetCluster()); + startRead.set_partition(partitionStream->GetPartitionId()); + startRead.set_assign_id(partitionStream->GetAssignId()); + if (readOffset) { + startRead.set_read_offset(*readOffset); + } + if (commitOffset) { + startRead.set_commit_offset(*commitOffset); + } + } else { + auto& startRead = *req.mutable_start_partition_session_response(); + startRead.set_partition_session_id(GetPartitionStreamId(partitionStream)); + if (readOffset) { + startRead.set_read_offset(*readOffset); + } + if (commitOffset) { + startRead.set_commit_offset(*commitOffset); + } + } + + WriteToProcessorImpl(std::move(req)); + } +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStreamDestroy(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream) { + Log.Write( + TLOG_INFO, + GetLogPrefix() << "Confirm partition stream destroy. Partition stream id: " + << GetPartitionStreamId(partitionStream) + << ". Cluster: \"" << GetCluster(partitionStream) << "\". Topic: \"" << partitionStream->GetTopicPath() + << "\". Partition: " << partitionStream->GetPartitionId() + ); + + TDeferredActions<UseMigrationProtocol> deferred; + with_lock (Lock) { + if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation. + Log.Write( + TLOG_DEBUG, + GetLogPrefix() << "Skip partition stream destroy confirm. Partition stream id: " + << GetPartitionStreamId(partitionStream) + ); + return; + } + + using TClosedEvent = typename std::conditional_t<UseMigrationProtocol, NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent, + NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>; + + CookieMapping.RemoveMapping(GetPartitionStreamId(partitionStream)); + PartitionStreams.erase(partitionStream->GetAssignId()); + + if constexpr (UseMigrationProtocol) { + EventsQueue->PushEvent({partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), + TClosedEvent(partitionStream, TClosedEvent::EReason::DestroyConfirmedByUser)}, + deferred); + } else { + EventsQueue->PushEvent({partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), + TClosedEvent(partitionStream, TClosedEvent::EReason::StopConfirmedByUser)}, + deferred); + } + + TClientMessage<UseMigrationProtocol> req; + + if constexpr (UseMigrationProtocol) { + auto& released = *req.mutable_released(); + released.mutable_topic()->set_path(partitionStream->GetTopicPath()); + released.set_cluster(partitionStream->GetCluster()); + released.set_partition(partitionStream->GetPartitionId()); + released.set_assign_id(partitionStream->GetAssignId()); + } else { + auto& released = *req.mutable_stop_partition_session_response(); + released.set_partition_session_id(partitionStream->GetPartitionSessionId()); + } + + WriteToProcessorImpl(std::move(req)); + } +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Commit(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, ui64 startOffset, ui64 endOffset) { + Log.Write( + TLOG_DEBUG, + GetLogPrefix() << "Commit offsets [" << startOffset << ", " << endOffset + << "). Partition stream id: " << GetPartitionStreamId(partitionStream) + ); + with_lock (Lock) { + if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation. + return; + } + TClientMessage<UseMigrationProtocol> req; + bool hasSomethingToCommit = false; + + if constexpr (UseMigrationProtocol) { + if (GetRangesMode()) { + hasSomethingToCommit = true; + auto* range = req.mutable_commit()->add_offset_ranges(); + range->set_assign_id(partitionStream->GetAssignId()); + range->set_start_offset(startOffset); + range->set_end_offset(endOffset); + } else { + for (ui64 offset = startOffset; offset < endOffset; ++offset) { + typename TPartitionCookieMapping::TCookie::TPtr cookie = CookieMapping.CommitOffset(GetPartitionStreamId(partitionStream), offset); + if (cookie) { + hasSomethingToCommit = true; + auto* cookieInfo = req.mutable_commit()->add_cookies(); + cookieInfo->set_assign_id(partitionStream->GetAssignId()); + cookieInfo->set_partition_cookie(cookie->Cookie); + } + } + } + } else { + hasSomethingToCommit = true; + auto* part_commit = req.mutable_commit_offset_request()->add_commit_offsets(); + part_commit->set_partition_session_id(GetPartitionStreamId(partitionStream)); + auto* range = part_commit->add_offsets(); + range->set_start(startOffset); + range->set_end(endOffset); + } + + if (hasSomethingToCommit) { + WriteToProcessorImpl(std::move(req)); + } + } +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::RequestPartitionStreamStatus(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream) { + Log.Write( + TLOG_DEBUG, + GetLogPrefix() << "Requesting status for partition stream id: " << GetPartitionStreamId(partitionStream) + ); + with_lock (Lock) { + if (Aborting || Closing || !IsActualPartitionStreamImpl(partitionStream)) { // Got previous incarnation. + return; + } + + TClientMessage<UseMigrationProtocol> req; + + if constexpr (UseMigrationProtocol) { + auto& status = *req.mutable_status(); + status.mutable_topic()->set_path(partitionStream->GetTopicPath()); + status.set_cluster(partitionStream->GetCluster()); + status.set_partition(partitionStream->GetPartitionId()); + status.set_assign_id(partitionStream->GetAssignId()); + } else { + auto& status = *req.mutable_partition_session_status_request(); + status.set_partition_session_id(GetPartitionStreamId(partitionStream)); + } + + WriteToProcessorImpl(std::move(req)); + } +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnUserRetrievedEvent(const typename TAReadSessionEvent<UseMigrationProtocol>::TEvent& event) { + Log.Write(TLOG_DEBUG, GetLogPrefix() << "Read session event " << DebugString(event)); + const i64 bytesCount = static_cast<i64>(CalcDataSize<TAReadSessionEvent<UseMigrationProtocol>>(event)); + Y_ASSERT(bytesCount >= 0); + + if (!std::get_if<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent>(&event)) { // Event is not data event. + return; + } + + *Settings.Counters_->MessagesInflight -= std::get<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent>(event).GetMessagesCount(); + *Settings.Counters_->BytesInflightTotal -= bytesCount; + *Settings.Counters_->BytesInflightUncompressed -= bytesCount; + + TDeferredActions<UseMigrationProtocol> deferred; + with_lock (Lock) { + UpdateMemoryUsageStatisticsImpl(); + Y_VERIFY(bytesCount <= DecompressedDataSize); + DecompressedDataSize -= bytesCount; + ContinueReadingDataImpl(); + StartDecompressionTasksImpl(deferred); + } +} + +template <bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::WriteToProcessorImpl( + TClientMessage<UseMigrationProtocol>&& req) { // Assumes that we're under lock. + if (Processor) { + Processor->Write(std::move(req)); + } +} + +template<bool UseMigrationProtocol> +bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::HasCommitsInflightImpl() const { + for (const auto& [id, partitionStream] : PartitionStreams) { + if (partitionStream->HasCommitsInflight()) + return true; + } + return false; +} + +template <bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ReadFromProcessorImpl( + TDeferredActions<UseMigrationProtocol>& deferred) { // Assumes that we're under lock. + if (Closing && !HasCommitsInflightImpl()) { + Processor->Cancel(); + CallCloseCallbackImpl(); + return; + } + + if (Processor) { + ServerMessage->Clear(); + + auto callback = [weakThis = TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), + connectionGeneration = ConnectionGeneration, + // Capture message & processor not to read in freed memory. + serverMessage = ServerMessage, + processor = Processor](NGrpc::TGrpcStatus&& grpcStatus) { + if (auto sharedThis = weakThis.lock()) { + sharedThis->OnReadDone(std::move(grpcStatus), connectionGeneration); + } + }; + + deferred.DeferReadFromProcessor(Processor, ServerMessage.get(), std::move(callback)); + } +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration) { + TPlainStatus errorStatus; + if (!grpcStatus.Ok()) { + errorStatus = TPlainStatus(std::move(grpcStatus)); + } + + TDeferredActions<UseMigrationProtocol> deferred; + with_lock (Lock) { + if (Aborting) { + return; + } + + if (connectionGeneration != ConnectionGeneration) { + return; // Message from previous connection. Ignore. + } + if (errorStatus.Ok()) { + if (IsErrorMessage(*ServerMessage)) { + errorStatus = MakeErrorFromProto(*ServerMessage); + } else { + + if constexpr (UseMigrationProtocol) { + switch (ServerMessage->response_case()) { + case Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::kInitResponse: + OnReadDoneImpl(std::move(*ServerMessage->mutable_init_response()), deferred); + break; + case Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::kDataBatch: + OnReadDoneImpl(std::move(*ServerMessage->mutable_data_batch()), deferred); + break; + case Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::kAssigned: + OnReadDoneImpl(std::move(*ServerMessage->mutable_assigned()), deferred); + break; + case Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::kRelease: + OnReadDoneImpl(std::move(*ServerMessage->mutable_release()), deferred); + break; + case Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::kCommitted: + OnReadDoneImpl(std::move(*ServerMessage->mutable_committed()), deferred); + break; + case Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::kPartitionStatus: + OnReadDoneImpl(std::move(*ServerMessage->mutable_partition_status()), deferred); + break; + case Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::RESPONSE_NOT_SET: + errorStatus = TPlainStatus::Internal("Unexpected response from server"); + break; + } + } else { + switch (ServerMessage->server_message_case()) { + case TServerMessage<false>::kInitResponse: + OnReadDoneImpl(std::move(*ServerMessage->mutable_init_response()), deferred); + break; + case TServerMessage<false>::kReadResponse: + OnReadDoneImpl(std::move(*ServerMessage->mutable_read_response()), deferred); + break; + case TServerMessage<false>::kStartPartitionSessionRequest: + OnReadDoneImpl(std::move(*ServerMessage->mutable_start_partition_session_request()), deferred); + break; + case TServerMessage<false>::kStopPartitionSessionRequest: + OnReadDoneImpl(std::move(*ServerMessage->mutable_stop_partition_session_request()), deferred); + break; + case TServerMessage<false>::kCommitOffsetResponse: + OnReadDoneImpl(std::move(*ServerMessage->mutable_commit_offset_response()), deferred); + break; + case TServerMessage<false>::kPartitionSessionStatusResponse: + OnReadDoneImpl(std::move(*ServerMessage->mutable_partition_session_status_response()), deferred); + break; + case TServerMessage<false>::kUpdateTokenResponse: + OnReadDoneImpl(std::move(*ServerMessage->mutable_update_token_response()), deferred); + break; + case TServerMessage<false>::SERVER_MESSAGE_NOT_SET: + errorStatus = TPlainStatus::Internal("Unexpected response from server"); + break; + } + } + + if (errorStatus.Ok()) { + ReadFromProcessorImpl(deferred); // Read next. + } + } + } + } + if (!errorStatus.Ok()) { + ++*Settings.Counters_->Errors; + // Explicitly create retry state to determine whether we should connect to server again. + RetryState = Settings.RetryPolicy_->CreateRetryState(); + if (!Reconnect(errorStatus)) { + ErrorHandler->AbortSession(std::move(errorStatus)); + } + } +} + +template <> +template <> +inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( + Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::InitResponse&& msg, + TDeferredActions<true>& deferred) { // Assumes that we're under lock. + Y_UNUSED(deferred); + + Log.Write(TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id()); + + // Successful init. Do nothing. + ContinueReadingDataImpl(); +} + +template <> +template <> +inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( + Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch&& msg, + TDeferredActions<true>& deferred) { // Assumes that we're under lock. + if (Closing || Aborting) { + return; // Don't process new data. + } + UpdateMemoryUsageStatisticsImpl(); + for (TPartitionData<true>& partitionData : *msg.mutable_partition_data()) { + auto partitionStreamIt = PartitionStreams.find(partitionData.cookie().assign_id()); + if (partitionStreamIt == PartitionStreams.end()) { + ++*Settings.Counters_->Errors; + BreakConnectionAndReconnectImpl(EStatus::INTERNAL_ERROR, + TStringBuilder() + << "Got unexpected partition stream data message. Topic: " + << partitionData.topic() << ". Partition: " << partitionData.partition() + << " AssignId: " << partitionData.cookie().assign_id(), + deferred); + return; + } + const TIntrusivePtr<TPartitionStreamImpl<true>>& partitionStream = partitionStreamIt->second; + + typename TPartitionCookieMapping::TCookie::TPtr cookie = MakeIntrusive<typename TPartitionCookieMapping::TCookie>(partitionData.cookie().partition_cookie(), partitionStream); + + ui64 firstOffset = std::numeric_limits<ui64>::max(); + ui64 currentOffset = std::numeric_limits<ui64>::max(); + ui64 desiredOffset = partitionStream->GetFirstNotReadOffset(); + for (const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch& batch : partitionData.batches()) { + // Validate messages. + for (const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& messageData : batch.message_data()) { + // Check offsets continuity. + if (messageData.offset() != desiredOffset) { + bool res = partitionStream->AddToCommitRanges(desiredOffset, messageData.offset(), GetRangesMode()); + Y_VERIFY(res); + } + + if (firstOffset == std::numeric_limits<ui64>::max()) { + firstOffset = messageData.offset(); + } + currentOffset = messageData.offset(); + desiredOffset = currentOffset + 1; + partitionStream->UpdateMaxReadOffset(currentOffset); + const i64 messageSize = static_cast<i64>(messageData.data().size()); + CompressedDataSize += messageSize; + *Settings.Counters_->BytesInflightTotal += messageSize; + *Settings.Counters_->BytesInflightCompressed += messageSize; + ++*Settings.Counters_->MessagesInflight; + } + } + if (firstOffset == std::numeric_limits<ui64>::max()) { + BreakConnectionAndReconnectImpl(EStatus::INTERNAL_ERROR, + TStringBuilder() << "Got empty data message. Topic: " + << partitionData.topic() + << ". Partition: " << partitionData.partition() + << " message: " << msg, + deferred); + return; + } + cookie->SetOffsetRange(std::make_pair(firstOffset, desiredOffset)); + partitionStream->SetFirstNotReadOffset(desiredOffset); + if (!CookieMapping.AddMapping(cookie)) { + BreakConnectionAndReconnectImpl(EStatus::INTERNAL_ERROR, + TStringBuilder() << "Got unexpected data message. Topic: " + << partitionData.topic() + << ". Partition: " << partitionData.partition() + << ". Cookie mapping already has such cookie", + deferred); + return; + } + TDataDecompressionInfo<true>* decompressionInfo = EventsQueue->PushDataEvent(partitionStream, std::move(partitionData)); + Y_VERIFY(decompressionInfo); + if (decompressionInfo) { + DecompressionQueue.emplace_back(decompressionInfo, partitionStream); + StartDecompressionTasksImpl(deferred); + } + } + + WaitingReadResponse = false; + ContinueReadingDataImpl(); +} + +template <> +template <> +inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( + Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::Assigned&& msg, + TDeferredActions<true>& deferred) { // Assumes that we're under lock. + auto partitionStream = MakeIntrusive<TPartitionStreamImpl<true>>( + NextPartitionStreamId, msg.topic().path(), msg.cluster(), + msg.partition() + 1, // Group. + msg.partition(), // Partition. + msg.assign_id(), msg.read_offset(), weak_from_this(), + ErrorHandler); + NextPartitionStreamId += PartitionStreamIdStep; + + // Renew partition stream. + TIntrusivePtr<TPartitionStreamImpl<true>>& currentPartitionStream = + PartitionStreams[partitionStream->GetAssignId()]; + if (currentPartitionStream) { + CookieMapping.RemoveMapping(currentPartitionStream->GetPartitionStreamId()); + EventsQueue->PushEvent( + {currentPartitionStream, weak_from_this(), + TReadSessionEvent::TPartitionStreamClosedEvent( + currentPartitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost)}, + deferred); + } + currentPartitionStream = partitionStream; + + // Send event to user. + EventsQueue->PushEvent( + {partitionStream, weak_from_this(), + TReadSessionEvent::TCreatePartitionStreamEvent(partitionStream, msg.read_offset(), msg.end_offset())}, + deferred); +} + +template <> +template <> +inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( + Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::Release&& msg, + TDeferredActions<true>& deferred) { // Assumes that we're under lock. + auto partitionStreamIt = PartitionStreams.find(msg.assign_id()); + if (partitionStreamIt == PartitionStreams.end()) { + return; + } + TIntrusivePtr<TPartitionStreamImpl<true>> partitionStream = partitionStreamIt->second; + if (msg.forceful_release()) { + PartitionStreams.erase(msg.assign_id()); + CookieMapping.RemoveMapping(partitionStream->GetPartitionStreamId()); + EventsQueue->PushEvent({partitionStream, weak_from_this(), + TReadSessionEvent::TPartitionStreamClosedEvent( + partitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost)}, + deferred); + } else { + EventsQueue->PushEvent( + {partitionStream, weak_from_this(), + TReadSessionEvent::TDestroyPartitionStreamEvent(std::move(partitionStream), msg.commit_offset())}, + deferred); + } +} + +template <> +template <> +inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( + Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::Committed&& msg, + TDeferredActions<true>& deferred) { // Assumes that we're under lock. + + Log.Write(TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg); + + TMap<ui64, TIntrusivePtr<TPartitionStreamImpl<true>>> partitionStreams; + for (const Ydb::PersQueue::V1::CommitCookie& cookieProto : msg.cookies()) { + typename TPartitionCookieMapping::TCookie::TPtr cookie = CookieMapping.RetrieveCommittedCookie(cookieProto); + if (cookie) { + cookie->PartitionStream->UpdateMaxCommittedOffset(cookie->OffsetRange.second); + partitionStreams[cookie->PartitionStream->GetPartitionStreamId()] = cookie->PartitionStream; + } + } + for (auto& [id, partitionStream] : partitionStreams) { + EventsQueue->PushEvent( + {partitionStream, weak_from_this(), + TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, partitionStream->GetMaxCommittedOffset())}, + deferred); + } + + for (const auto& rangeProto : msg.offset_ranges()) { + auto partitionStreamIt = PartitionStreams.find(rangeProto.assign_id()); + if (partitionStreamIt != PartitionStreams.end()) { + auto partitionStream = partitionStreamIt->second; + partitionStream->UpdateMaxCommittedOffset(rangeProto.end_offset()); + EventsQueue->PushEvent( + {partitionStream, weak_from_this(), + TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, rangeProto.end_offset())}, + deferred); + } + } +} + +template <> +template <> +inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( + Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::PartitionStatus&& msg, + TDeferredActions<true>& deferred) { // Assumes that we're under lock. + auto partitionStreamIt = PartitionStreams.find(msg.assign_id()); + if (partitionStreamIt == PartitionStreams.end()) { + return; + } + EventsQueue->PushEvent({partitionStreamIt->second, weak_from_this(), + TReadSessionEvent::TPartitionStreamStatusEvent( + partitionStreamIt->second, msg.committed_offset(), + 0, // TODO: support read offset in status + msg.end_offset(), TInstant::MilliSeconds(msg.write_watermark_ms()))}, + deferred); +} + +////////////// + +template <> +template <> +inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( + Ydb::Topic::StreamReadMessage::InitResponse&& msg, + TDeferredActions<false>& deferred) { // Assumes that we're under lock. + Y_UNUSED(deferred); + + Log.Write(TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id()); + + // Successful init. Do nothing. + ContinueReadingDataImpl(); +} + +template <> +template <> +inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( + Ydb::Topic::StreamReadMessage::ReadResponse&& msg, + TDeferredActions<false>& deferred) { // Assumes that we're under lock. + + // TODO account bytes_size! + + if (Closing || Aborting) { + return; // Don't process new data. + } + UpdateMemoryUsageStatisticsImpl(); + for (TPartitionData<false>& partitionData : *msg.mutable_partition_data()) { + auto partitionStreamIt = PartitionStreams.find(partitionData.partition_session_id()); + if (partitionStreamIt == PartitionStreams.end()) { + ++*Settings.Counters_->Errors; + BreakConnectionAndReconnectImpl(EStatus::INTERNAL_ERROR, + TStringBuilder() << "Got unexpected partition stream data message. " + << "PartitionSessionId: " << partitionData.partition_session_id(), + deferred); + return; + } + const TIntrusivePtr<TPartitionStreamImpl<false>>& partitionStream = partitionStreamIt->second; + + i64 firstOffset = std::numeric_limits<i64>::max(); + i64 currentOffset = std::numeric_limits<i64>::max(); + i64 desiredOffset = partitionStream->GetFirstNotReadOffset(); + for (const auto& batch : partitionData.batches()) { + // Validate messages. + for (const auto& messageData : batch.message_data()) { + // Check offsets continuity. + if (messageData.offset() != desiredOffset) { + bool res = partitionStream->AddToCommitRanges(desiredOffset, messageData.offset(), GetRangesMode()); + Y_VERIFY(res); + } + + if (firstOffset == std::numeric_limits<i64>::max()) { + firstOffset = messageData.offset(); + } + currentOffset = messageData.offset(); + desiredOffset = currentOffset + 1; + partitionStream->UpdateMaxReadOffset(currentOffset); + const i64 messageSize = static_cast<i64>(messageData.data().size()); + CompressedDataSize += messageSize; + *Settings.Counters_->BytesInflightTotal += messageSize; + *Settings.Counters_->BytesInflightCompressed += messageSize; + ++*Settings.Counters_->MessagesInflight; + } + } + if (firstOffset == std::numeric_limits<i64>::max()) { + BreakConnectionAndReconnectImpl(EStatus::INTERNAL_ERROR, + TStringBuilder() << "Got empty data message. " + << "PartitionSessionId: " << partitionData.partition_session_id() + << " message: " << msg, + deferred); + return; + } + partitionStream->SetFirstNotReadOffset(desiredOffset); + TDataDecompressionInfo<false>* decompressionInfo = EventsQueue->PushDataEvent(partitionStream, std::move(partitionData)); + Y_VERIFY(decompressionInfo); + if (decompressionInfo) { + DecompressionQueue.emplace_back(decompressionInfo, partitionStream); + StartDecompressionTasksImpl(deferred); + } + } + + WaitingReadResponse = false; + ContinueReadingDataImpl(); +} + +template <> +template <> +inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( + Ydb::Topic::StreamReadMessage::StartPartitionSessionRequest&& msg, + TDeferredActions<false>& deferred) { // Assumes that we're under lock. + auto partitionStream = MakeIntrusive<TPartitionStreamImpl<false>>( + NextPartitionStreamId, msg.partition_session().path(), msg.partition_session().partition_id(), + msg.partition_session().partition_session_id(), msg.committed_offset(), + weak_from_this(), ErrorHandler); + NextPartitionStreamId += PartitionStreamIdStep; + + // Renew partition stream. + TIntrusivePtr<TPartitionStreamImpl<false>>& currentPartitionStream = PartitionStreams[partitionStream->GetAssignId()]; + if (currentPartitionStream) { + EventsQueue->PushEvent( + {currentPartitionStream, weak_from_this(), + NTopic::TReadSessionEvent::TPartitionSessionClosedEvent( + currentPartitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost)}, + deferred); + } + currentPartitionStream = partitionStream; + + // Send event to user. + EventsQueue->PushEvent({partitionStream, weak_from_this(), + NTopic::TReadSessionEvent::TStartPartitionSessionEvent( + partitionStream, msg.committed_offset(), msg.partition_offsets().end())}, + deferred); +} + +template <> +template <> +inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( + Ydb::Topic::StreamReadMessage::StopPartitionSessionRequest&& msg, + TDeferredActions<false>& deferred) { // Assumes that we're under lock. + auto partitionStreamIt = PartitionStreams.find(msg.partition_session_id()); + if (partitionStreamIt == PartitionStreams.end()) { + return; + } + TIntrusivePtr<TPartitionStreamImpl<false>> partitionStream = partitionStreamIt->second; + if (!msg.graceful()) { + PartitionStreams.erase(msg.partition_session_id()); + EventsQueue->PushEvent({partitionStream, weak_from_this(), + NTopic::TReadSessionEvent::TPartitionSessionClosedEvent( + partitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost)}, + deferred); + } else { + EventsQueue->PushEvent( + {partitionStream, weak_from_this(), + NTopic::TReadSessionEvent::TStopPartitionSessionEvent(std::move(partitionStream), msg.committed_offset())}, + deferred); + } +} + +template <> +template <> +inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( + Ydb::Topic::StreamReadMessage::CommitOffsetResponse&& msg, + TDeferredActions<false>& deferred) { // Assumes that we're under lock. + + Log.Write(TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg); + + for (const auto& rangeProto : msg.partitions_committed_offsets()) { + auto partitionStreamIt = PartitionStreams.find(rangeProto.partition_session_id()); + if (partitionStreamIt != PartitionStreams.end()) { + auto partitionStream = partitionStreamIt->second; + partitionStream->UpdateMaxCommittedOffset(rangeProto.committed_offset()); + EventsQueue->PushEvent({partitionStream, weak_from_this(), + NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent( + partitionStream, rangeProto.committed_offset())}, + deferred); + } + } +} + +template <> +template <> +inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( + Ydb::Topic::StreamReadMessage::PartitionSessionStatusResponse&& msg, + TDeferredActions<false>& deferred) { // Assumes that we're under lock. + auto partitionStreamIt = PartitionStreams.find(msg.partition_session_id()); + if (partitionStreamIt == PartitionStreams.end()) { + return; + } + EventsQueue->PushEvent({partitionStreamIt->second, weak_from_this(), + NTopic::TReadSessionEvent::TPartitionSessionStatusEvent( + partitionStreamIt->second, msg.committed_offset(), + 0, // TODO: support read offset in status + msg.partition_offsets().end(), + TInstant::MilliSeconds(::google::protobuf::util::TimeUtil::TimestampToMilliseconds( + msg.write_time_high_watermark())))}, + deferred); +} + +template <> +template <> +inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( + Ydb::Topic::UpdateTokenResponse&& msg, + TDeferredActions<false>& deferred) { // Assumes that we're under lock. + // TODO + Y_UNUSED(msg, deferred); +} + +////////////// + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::StartDecompressionTasksImpl(TDeferredActions<UseMigrationProtocol>& deferred) { + UpdateMemoryUsageStatisticsImpl(); + const i64 limit = GetDecompressedDataSizeLimit(); + Y_VERIFY(limit > 0); + while (DecompressedDataSize < limit + && (static_cast<size_t>(CompressedDataSize + DecompressedDataSize) < Settings.MaxMemoryUsageBytes_ + || DecompressedDataSize == 0 /* Allow decompression of at least one message even if memory is full. */) + && !DecompressionQueue.empty()) + { + TDecompressionQueueItem& current = DecompressionQueue.front(); + auto sentToDecompress = current.BatchInfo->StartDecompressionTasks(Settings.DecompressionExecutor_, + Max(limit - DecompressedDataSize, static_cast<i64>(1)), + AverageCompressionRatio, + current.PartitionStream, + deferred); + DecompressedDataSize += sentToDecompress; + if (current.BatchInfo->AllDecompressionTasksStarted()) { + DecompressionQueue.pop_front(); + } else { + break; + } + } +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::DestroyAllPartitionStreamsImpl(TDeferredActions<UseMigrationProtocol>& deferred) { + using TClosedEvent = + typename std::conditional_t<UseMigrationProtocol, NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent, + NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>; + + for (auto&& [key, partitionStream] : PartitionStreams) { + EventsQueue->PushEvent({partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), + TClosedEvent(std::move(partitionStream), TClosedEvent::EReason::ConnectionLost)}, + deferred); + } + PartitionStreams.clear(); + CookieMapping.ClearMapping(); +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnCreateNewDecompressionTask() { + ++DecompressionTasksInflight; +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount) { + TDeferredActions<UseMigrationProtocol> deferred; + --DecompressionTasksInflight; + + *Settings.Counters_->BytesRead += decompressedSize; + *Settings.Counters_->BytesReadCompressed += sourceSize; + *Settings.Counters_->MessagesRead += messagesCount; + *Settings.Counters_->BytesInflightUncompressed += decompressedSize; + *Settings.Counters_->BytesInflightCompressed -= sourceSize; + *Settings.Counters_->BytesInflightTotal += (decompressedSize - sourceSize); + + with_lock (Lock) { + UpdateMemoryUsageStatisticsImpl(); + CompressedDataSize -= sourceSize; + DecompressedDataSize += decompressedSize - estimatedDecompressedSize; + constexpr double weight = 0.6; + if (sourceSize > 0) { + AverageCompressionRatio = weight * static_cast<double>(decompressedSize) / static_cast<double>(sourceSize) + (1 - weight) * AverageCompressionRatio; + } + if (Aborting) { + return; + } + ContinueReadingDataImpl(); + StartDecompressionTasksImpl(deferred); + } +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Abort() { + Log.Write(TLOG_DEBUG, GetLogPrefix() << "Abort session to cluster"); + + with_lock (Lock) { + if (!Aborting) { + Aborting = true; + CloseCallback = {}; + + // Cancel(ClientContext); // Don't cancel, because this is used only as factory for other contexts. + Cancel(ConnectContext); + Cancel(ConnectTimeoutContext); + Cancel(ConnectDelayContext); + + if (Processor) { + Processor->Cancel(); + } + } + } +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Close(std::function<void()> callback) { + with_lock (Lock) { + if (Aborting) { + callback(); + } + + if (!Closing) { + Closing = true; + + CloseCallback = std::move(callback); + + Cancel(ConnectContext); + Cancel(ConnectTimeoutContext); + Cancel(ConnectDelayContext); + + if (!Processor) { + CallCloseCallbackImpl(); + } else { + if (!HasCommitsInflightImpl()) { + Processor->Cancel(); + CallCloseCallbackImpl(); + } + } + } + } +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::CallCloseCallbackImpl() { + if (CloseCallback) { + CloseCallback(); + CloseCallback = {}; + } + Aborting = true; // So abort call will have no effect. +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::StopReadingData() { + with_lock (Lock) { + DataReadingSuspended = true; + } +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ResumeReadingData() { + with_lock (Lock) { + if (DataReadingSuspended) { + DataReadingSuspended = false; + ContinueReadingDataImpl(); + } + } +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::WaitAllDecompressionTasks() { + Y_ASSERT(DecompressionTasksInflight >= 0); + while (DecompressionTasksInflight > 0) { + Sleep(TDuration::MilliSeconds(5)); // Perform active wait because this is aborting process and there are no decompression tasks here in normal situation. + } +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::DumpStatisticsToLog(TLogElement& log) { + with_lock (Lock) { + // cluster:topic:partition:stream-id:read-offset:committed-offset + for (auto&& [key, partitionStream] : PartitionStreams) { + if constexpr (UseMigrationProtocol) { + log << " " + << ClusterName + << ':' << partitionStream->GetTopicPath() + << ':' << partitionStream->GetPartitionId() + << ':' << partitionStream->GetPartitionStreamId() + << ':' << partitionStream->GetMaxReadOffset() + << ':' << partitionStream->GetMaxCommittedOffset(); + } else { + log << " " + << "-" + << ':' << partitionStream->GetTopicPath() + << ':' << partitionStream->GetPartitionId() + << ':' << partitionStream->GetPartitionSessionId() + << ':' << partitionStream->GetMaxReadOffset() + << ':' << partitionStream->GetMaxCommittedOffset(); + } + } + } +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::UpdateMemoryUsageStatisticsImpl() { + const TInstant now = TInstant::Now(); + const ui64 delta = (now - UsageStatisticsLastUpdateTime).MilliSeconds(); + UsageStatisticsLastUpdateTime = now; + const double percent = 100.0 / static_cast<double>(Settings.MaxMemoryUsageBytes_); + + Settings.Counters_->TotalBytesInflightUsageByTime->Collect((DecompressedDataSize + CompressedDataSize) * percent, delta); + Settings.Counters_->UncompressedBytesInflightUsageByTime->Collect(DecompressedDataSize * percent, delta); + Settings.Counters_->CompressedBytesInflightUsageByTime->Collect(CompressedDataSize * percent, delta); +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::UpdateMemoryUsageStatistics() { + with_lock (Lock) { + UpdateMemoryUsageStatisticsImpl(); + } +} + +template<bool UseMigrationProtocol> +bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::GetRangesMode() const { + if constexpr (UseMigrationProtocol) { + return Settings.RangesMode_.GetOrElse(RangesMode); + } else { + return true; + } +} + +template<bool UseMigrationProtocol> +bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::TPartitionCookieMapping::AddMapping(const typename TCookie::TPtr& cookie) { + if (!Cookies.emplace(cookie->GetKey(), cookie).second) { + return false; + } + for (ui64 offset = cookie->OffsetRange.first; offset < cookie->OffsetRange.second; ++offset) { + if (!UncommittedOffsetToCookie.emplace(std::make_pair(cookie->PartitionStream->GetPartitionStreamId(), offset), cookie).second) { + return false; + } + } + PartitionStreamIdToCookie.emplace(cookie->PartitionStream->GetPartitionStreamId(), cookie); + return true; +} + +template<bool UseMigrationProtocol> +typename TSingleClusterReadSessionImpl<UseMigrationProtocol>::TPartitionCookieMapping::TCookie::TPtr TSingleClusterReadSessionImpl<UseMigrationProtocol>::TPartitionCookieMapping::CommitOffset(ui64 partitionStreamId, ui64 offset) { + auto cookieIt = UncommittedOffsetToCookie.find(std::make_pair(partitionStreamId, offset)); + if (cookieIt != UncommittedOffsetToCookie.end()) { + typename TCookie::TPtr cookie; + if (!--cookieIt->second->UncommittedMessagesLeft) { + ++CommitInflight; + cookie = cookieIt->second; + } + UncommittedOffsetToCookie.erase(cookieIt); + return cookie; + } else { + ThrowFatalError(TStringBuilder() << "Invalid offset " << offset << ". Partition stream id: " << partitionStreamId << Endl); + } + // If offset wasn't found, there might be already hard released partition. + // This situation is OK. + return nullptr; +} + +template<bool UseMigrationProtocol> +typename TSingleClusterReadSessionImpl<UseMigrationProtocol>::TPartitionCookieMapping::TCookie::TPtr TSingleClusterReadSessionImpl<UseMigrationProtocol>::TPartitionCookieMapping::RetrieveCommittedCookie(const Ydb::PersQueue::V1::CommitCookie& cookieProto) { + typename TCookie::TPtr cookieInfo; + auto cookieIt = Cookies.find(typename TCookie::TKey(cookieProto.assign_id(), cookieProto.partition_cookie())); + if (cookieIt != Cookies.end()) { + --CommitInflight; + cookieInfo = cookieIt->second; + Cookies.erase(cookieIt); + + auto [rangeBegin, rangeEnd] = PartitionStreamIdToCookie.equal_range(cookieInfo->PartitionStream->GetPartitionStreamId()); + for (auto i = rangeBegin; i != rangeEnd; ++i) { + if (i->second == cookieInfo) { + PartitionStreamIdToCookie.erase(i); + break; + } + } + } + return cookieInfo; +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::TPartitionCookieMapping::RemoveMapping(ui64 partitionStreamId) { + auto [rangeBegin, rangeEnd] = PartitionStreamIdToCookie.equal_range(partitionStreamId); + for (auto i = rangeBegin; i != rangeEnd; ++i) { + typename TCookie::TPtr cookie = i->second; + Cookies.erase(cookie->GetKey()); + for (ui64 offset = cookie->OffsetRange.first; offset < cookie->OffsetRange.second; ++offset) { + UncommittedOffsetToCookie.erase(std::make_pair(partitionStreamId, offset)); + } + } + PartitionStreamIdToCookie.erase(rangeBegin, rangeEnd); +} + +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::TPartitionCookieMapping::ClearMapping() { + Cookies.clear(); + UncommittedOffsetToCookie.clear(); + PartitionStreamIdToCookie.clear(); + CommitInflight = 0; +} + +template<bool UseMigrationProtocol> +bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::TPartitionCookieMapping::HasUnacknowledgedCookies() const { + return CommitInflight != 0; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TReadSessionEventInfo + +template<bool UseMigrationProtocol> +TReadSessionEventInfo<UseMigrationProtocol>::TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, TEvent event) + : PartitionStream(std::move(partitionStream)) + , Event(std::move(event)) + , Session(std::move(session)) +{} + +template<bool UseMigrationProtocol> +TReadSessionEventInfo<UseMigrationProtocol>::TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session) + : PartitionStream(std::move(partitionStream)) + , Session(std::move(session)) +{} + +template<bool UseMigrationProtocol> +TReadSessionEventInfo<UseMigrationProtocol>::TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, + std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, + TVector<TMessage> messages, + TVector<TCompressedMessage> compressedMessages) + : PartitionStream(std::move(partitionStream)) + , Event( + NMaybe::TInPlace(), + std::in_place_type_t<TDataReceivedEvent>(), + std::move(messages), + std::move(compressedMessages), + PartitionStream + ) + , Session(std::move(session)) +{ +} + +template<bool UseMigrationProtocol> +void TReadSessionEventInfo<UseMigrationProtocol>::MoveToPartitionStream() { + PartitionStream->InsertEvent(std::move(*Event)); + Event = Nothing(); + Y_ASSERT(PartitionStream->HasEvents()); +} + +template<bool UseMigrationProtocol> +void TReadSessionEventInfo<UseMigrationProtocol>::ExtractFromPartitionStream() { + if (!Event && !IsEmpty()) { + Event = std::move(PartitionStream->TopEvent().GetEvent()); + PartitionStream->PopEvent(); + } +} + +template<bool UseMigrationProtocol> +bool TReadSessionEventInfo<UseMigrationProtocol>::IsEmpty() const { + return !PartitionStream || !PartitionStream->HasEvents(); +} + +template<bool UseMigrationProtocol> +bool TReadSessionEventInfo<UseMigrationProtocol>::IsDataEvent() const { + return !IsEmpty() && PartitionStream->TopEvent().IsDataEvent(); +} + +template<bool UseMigrationProtocol> +bool TReadSessionEventInfo<UseMigrationProtocol>::HasMoreData() const { + return PartitionStream->TopEvent().GetData().HasMoreData(); +} + +template<bool UseMigrationProtocol> +bool TReadSessionEventInfo<UseMigrationProtocol>::HasReadyUnreadData() const { + return PartitionStream->TopEvent().GetData().HasReadyUnreadData(); +} + +template<bool UseMigrationProtocol> +void TReadSessionEventInfo<UseMigrationProtocol>::OnUserRetrievedEvent() { + if (auto session = Session.lock()) { + session->OnUserRetrievedEvent(*Event); + } +} + +template<bool UseMigrationProtocol> +bool TReadSessionEventInfo<UseMigrationProtocol>::TakeData(TVector<TMessage>* messages, + TVector<TCompressedMessage>* compressedMessages, + size_t* maxByteSize) +{ + return PartitionStream->TopEvent().GetData().TakeData(PartitionStream, messages, compressedMessages, maxByteSize); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TReadSessionEventsQueue + +template <bool UseMigrationProtocol> +TReadSessionEventsQueue<UseMigrationProtocol>::TReadSessionEventsQueue( + const TAReadSessionSettings<UseMigrationProtocol>& settings, + std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session) + : TParent(settings) + , Session(std::move(session)) { + const auto& h = TParent::Settings.EventHandlers_; + + if constexpr (UseMigrationProtocol) { + HasEventCallbacks = (h.CommonHandler_ + || h.DataReceivedHandler_ + || h.CommitAcknowledgementHandler_ + || h.CreatePartitionStreamHandler_ + || h.DestroyPartitionStreamHandler_ + || h.PartitionStreamStatusHandler_ + || h.PartitionStreamClosedHandler_ + || h.SessionClosedHandler_); + } else { + HasEventCallbacks = (h.CommonHandler_ + || h.DataReceivedHandler_ + || h.CommitOffsetAcknowledgementHandler_ + || h.StartPartitionSessionHandler_ + || h.StopPartitionSessionHandler_ + || h.PartitionSessionStatusHandler_ + || h.PartitionSessionClosedHandler_ + || h.SessionClosedHandler_); + } +} + +template <bool UseMigrationProtocol> +void TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TReadSessionEventInfo<UseMigrationProtocol> eventInfo, + TDeferredActions<UseMigrationProtocol>& deferred) { + if (TParent::Closed) { + return; + } + + with_lock (TParent::Mutex) { + auto partitionStream = eventInfo.PartitionStream; + eventInfo.MoveToPartitionStream(); + SignalReadyEventsImpl(partitionStream.Get(), deferred); + } +} + +template <bool UseMigrationProtocol> +void TReadSessionEventsQueue<UseMigrationProtocol>::SignalEventImpl( + TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, + TDeferredActions<UseMigrationProtocol>& deferred) { + if (TParent::Closed) { + return; + } + auto session = partitionStream->GetSession(); + TParent::Events.emplace(std::move(partitionStream), std::move(session)); + SignalWaiterImpl(deferred); +} + +template <bool UseMigrationProtocol> +TDataDecompressionInfo<UseMigrationProtocol>* TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent( + TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, + TPartitionData<UseMigrationProtocol>&& msg) { + if (this->Closed) { + return nullptr; + } + + with_lock (this->Mutex) { + return &partitionStream->InsertDataEvent(std::move(msg), this->Settings.Decompress_); + } +} + +template <bool UseMigrationProtocol> +TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventImpl( + TReadSessionEventInfo<UseMigrationProtocol>& srcDataEventInfo, + size_t* maxByteSize) { // Assumes that we're under lock. + + TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage> messages; + TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage> compressedMessages; + TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream = srcDataEventInfo.PartitionStream; + bool messageExtracted = false; + while (srcDataEventInfo.HasReadyUnreadData() && *maxByteSize > 0) { + const bool hasMoreUnpackedData = srcDataEventInfo.TakeData(&messages, &compressedMessages, maxByteSize); + if (!hasMoreUnpackedData) { + const bool messageIsFullyRead = !srcDataEventInfo.HasMoreData(); + if (messageIsFullyRead) { + partitionStream->PopEvent(); + messageExtracted = true; + break; + } + } + } + if (!messageExtracted) { + partitionStream->TopEvent().Signalled = false; + } + + if (messages.empty() && compressedMessages.empty()) { + return Nothing(); + } + return TReadSessionEventInfo<UseMigrationProtocol>(partitionStream, partitionStream->GetSession(), + std::move(messages), std::move(compressedMessages)); +} + +template <bool UseMigrationProtocol> +void TReadSessionEventsQueue<UseMigrationProtocol>::SignalReadyEvents( + TPartitionStreamImpl<UseMigrationProtocol>* partitionStream) { + Y_ASSERT(partitionStream); + TDeferredActions<UseMigrationProtocol> deferred; + with_lock (TReadSessionEventsQueue<UseMigrationProtocol>::Mutex) { + SignalReadyEventsImpl(partitionStream, deferred); + } +} + +template <bool UseMigrationProtocol> +void TReadSessionEventsQueue<UseMigrationProtocol>::SignalReadyEventsImpl( + TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, TDeferredActions<UseMigrationProtocol>& deferred) { + partitionStream->SignalReadyEvents(this, deferred); + ApplyCallbacksToReadyEventsImpl(deferred); +} + +template <bool UseMigrationProtocol> +bool TReadSessionEventsQueue<UseMigrationProtocol>::ApplyCallbacksToReadyEventsImpl( + TDeferredActions<UseMigrationProtocol>& deferred) { + if (!HasEventCallbacks) { + return false; + } + bool applied = false; + while (HasCallbackForNextEventImpl()) { + size_t maxSize = std::numeric_limits<size_t>::max(); + TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> eventInfo = GetEventImpl(&maxSize); + if (!eventInfo) { + break; + } + const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStreamForSignalling = + eventInfo->IsDataEvent() ? eventInfo->PartitionStream : nullptr; + applied = true; + if (!ApplyHandler(*eventInfo, deferred)) { // Close session event. + break; + } + if (partitionStreamForSignalling) { + SignalReadyEventsImpl(partitionStreamForSignalling.Get(), deferred); + } + } + return applied; +} + +template <bool UseMigrationProtocol> +struct THasCallbackForEventVisitor { + explicit THasCallbackForEventVisitor(const TAReadSessionSettings<UseMigrationProtocol>& settings) + : Settings(settings) { + } + + template <typename TEv> + inline bool operator()(const TEv&); + + const TAReadSessionSettings<UseMigrationProtocol>& Settings; +}; + +#define DEFINE_HANDLER(use_migration_protocol, type, handler) \ + template <> \ + template <> \ + inline bool THasCallbackForEventVisitor<use_migration_protocol>::operator()(const type&) { \ + return bool(Settings.EventHandlers_.handler); \ + } \ + /**/ + +DEFINE_HANDLER(true, TReadSessionEvent::TDataReceivedEvent, DataReceivedHandler_); +DEFINE_HANDLER(true, TReadSessionEvent::TCommitAcknowledgementEvent, CommitAcknowledgementHandler_); +DEFINE_HANDLER(true, TReadSessionEvent::TCreatePartitionStreamEvent, CreatePartitionStreamHandler_); +DEFINE_HANDLER(true, TReadSessionEvent::TDestroyPartitionStreamEvent, DestroyPartitionStreamHandler_); +DEFINE_HANDLER(true, TReadSessionEvent::TPartitionStreamStatusEvent, PartitionStreamStatusHandler_); +DEFINE_HANDLER(true, TReadSessionEvent::TPartitionStreamClosedEvent, PartitionStreamClosedHandler_); +DEFINE_HANDLER(true, TSessionClosedEvent, SessionClosedHandler_); + +DEFINE_HANDLER(false, NTopic::TReadSessionEvent::TDataReceivedEvent, DataReceivedHandler_); +DEFINE_HANDLER(false, NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent, CommitOffsetAcknowledgementHandler_); +DEFINE_HANDLER(false, NTopic::TReadSessionEvent::TStartPartitionSessionEvent, StartPartitionSessionHandler_); +DEFINE_HANDLER(false, NTopic::TReadSessionEvent::TStopPartitionSessionEvent, StopPartitionSessionHandler_); +DEFINE_HANDLER(false, NTopic::TReadSessionEvent::TPartitionSessionStatusEvent, PartitionSessionStatusHandler_); +DEFINE_HANDLER(false, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent, PartitionSessionClosedHandler_); +DEFINE_HANDLER(false, NTopic::TSessionClosedEvent, SessionClosedHandler_); + +#undef DEFINE_HANDLER + + +template<bool UseMigrationProtocol> +bool TReadSessionEventsQueue<UseMigrationProtocol>::HasCallbackForNextEventImpl() const { + if (!TParent::HasEventsImpl()) { + return false; + } + if (TParent::Settings.EventHandlers_.CommonHandler_) { + return true; + } + + if (!TParent::Events.empty()) { + const TReadSessionEventInfo<UseMigrationProtocol>& topEvent = TParent::Events.front(); + const typename TAReadSessionEvent<UseMigrationProtocol>::TEvent* event = nullptr; + if (topEvent.Event) { + event = &*topEvent.Event; + } else if (topEvent.PartitionStream && topEvent.PartitionStream->HasEvents()) { + const TRawPartitionStreamEvent<UseMigrationProtocol>& partitionStreamTopEvent = topEvent.PartitionStream->TopEvent(); + if (partitionStreamTopEvent.IsDataEvent()) { + return bool(TParent::Settings.EventHandlers_.DataReceivedHandler_); + } else { + event = &partitionStreamTopEvent.GetEvent(); + } + } + + if (!event) { + return false; + } + + THasCallbackForEventVisitor<UseMigrationProtocol> visitor(TParent::Settings); + return std::visit(visitor, *event); + } else if (TParent::CloseEvent) { + return bool(TParent::Settings.EventHandlers_.SessionClosedHandler_); + } + Y_ASSERT(false); + return false; +} + +template<bool UseMigrationProtocol> +void TReadSessionEventsQueue<UseMigrationProtocol>::ClearAllEvents() { + TDeferredActions<UseMigrationProtocol> deferred; + with_lock (TParent::Mutex) { + while (!TParent::Events.empty()) { + auto& event = TParent::Events.front(); + if (event.PartitionStream && event.PartitionStream->HasEvents()) { + event.PartitionStream->PopEvent(); + } + TParent::Events.pop(); + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TDataDecompressionInfo + +template<bool UseMigrationProtocol> +TDataDecompressionInfo<UseMigrationProtocol>::TDataDecompressionInfo( + TPartitionData<UseMigrationProtocol>&& msg, + std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, + bool doDecompress +) + : ServerMessage(std::move(msg)) + , Session(std::move(session)) + , DoDecompress(doDecompress) +{ + for (const auto& batch : ServerMessage.batches()) { + for (const auto& messageData : batch.message_data()) { + CompressedDataSize += messageData.data().size(); + } + } + SourceDataNotProcessed = CompressedDataSize; + + BuildBatchesMeta(); +} + +template<bool UseMigrationProtocol> +void TDataDecompressionInfo<UseMigrationProtocol>::BuildBatchesMeta() { + BatchesMeta.reserve(ServerMessage.batches_size()); + for (const auto& batch : ServerMessage.batches()) { + // Extra fields. + typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr meta = MakeIntrusive<TAWriteSessionMeta<UseMigrationProtocol>>(); + + if constexpr (UseMigrationProtocol) { + meta->Fields.reserve(batch.extra_fields_size()); + for (const Ydb::PersQueue::V1::KeyValue& kv : batch.extra_fields()) { + meta->Fields.emplace(kv.key(), kv.value()); + } + } else { + meta->Fields.reserve(batch.write_session_meta_size()); + for (const auto& [key, value] : batch.write_session_meta()) { + meta->Fields.emplace(key, value); + } + } + + BatchesMeta.emplace_back(std::move(meta)); + } +} + +template<bool UseMigrationProtocol> +void TDataDecompressionInfo<UseMigrationProtocol>::PutDecompressionError(std::exception_ptr error, size_t batch, size_t message) { + if (!DecompressionErrorsStructCreated) { + with_lock (DecompressionErrorsStructLock) { + DecompressionErrors.resize(ServerMessage.batches_size()); + for (size_t batch = 0; batch < static_cast<size_t>(ServerMessage.batches_size()); ++batch) { + DecompressionErrors[batch].resize(static_cast<size_t>(ServerMessage.batches(batch).message_data_size())); + } + + // Set barrier. + DecompressionErrorsStructCreated = true; + } + } + Y_ASSERT(batch < DecompressionErrors.size()); + Y_ASSERT(message < DecompressionErrors[batch].size()); + DecompressionErrors[batch][message] = std::move(error); +} + +template<bool UseMigrationProtocol> +std::exception_ptr TDataDecompressionInfo<UseMigrationProtocol>::GetDecompressionError(size_t batch, size_t message) { + if (!DecompressionErrorsStructCreated) { + return {}; + } + Y_ASSERT(batch < DecompressionErrors.size()); + Y_ASSERT(message < DecompressionErrors[batch].size()); + return DecompressionErrors[batch][message]; +} + +template <bool UseMigrationProtocol> +i64 TDataDecompressionInfo<UseMigrationProtocol>::StartDecompressionTasks( + const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, i64 availableMemory, + double averageCompressionRatio, const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>& partitionStream, + TDeferredActions<UseMigrationProtocol>& deferred) { + + constexpr size_t TASK_LIMIT = 512_KB; + std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Session.lock(); + Y_ASSERT(session); + ReadyThresholds.emplace_back(); + TDecompressionTask task(this, partitionStream, &ReadyThresholds.back()); + i64 used = 0; + while (availableMemory > 0 && !AllDecompressionTasksStarted()) { + const auto& batch = ServerMessage.batches(CurrentDecompressingMessage.first); + if (CurrentDecompressingMessage.second < static_cast<size_t>(batch.message_data_size())) { + const auto& messageData = batch.message_data(CurrentDecompressingMessage.second); + const i64 size = static_cast<i64>(messageData.data().size()); + const i64 estimatedDecompressedSize = messageData.uncompressed_size() + ? static_cast<i64>(messageData.uncompressed_size()) + : static_cast<i64>(size * averageCompressionRatio); + + Y_VERIFY(estimatedDecompressedSize >= 0); + + task.Add(CurrentDecompressingMessage.first, CurrentDecompressingMessage.second, size, estimatedDecompressedSize); + used += estimatedDecompressedSize; + availableMemory -= estimatedDecompressedSize; + } + ++CurrentDecompressingMessage.second; + if (CurrentDecompressingMessage.second >= static_cast<size_t>(batch.message_data_size())) { // next batch + ++CurrentDecompressingMessage.first; + CurrentDecompressingMessage.second = 0; + } + if (task.AddedDataSize() >= TASK_LIMIT) { + session->OnCreateNewDecompressionTask(); + deferred.DeferStartExecutorTask(executor, std::move(task)); + ReadyThresholds.emplace_back(); + task = TDecompressionTask(this, partitionStream, &ReadyThresholds.back()); + } + } + if (task.AddedMessagesCount() > 0) { + session->OnCreateNewDecompressionTask(); + deferred.DeferStartExecutorTask(executor, std::move(task)); + } else { + ReadyThresholds.pop_back(); // Revert. + } + return used; +} + +template<bool UseMigrationProtocol> +bool TDataDecompressionInfo<UseMigrationProtocol>::TakeData(const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>& partitionStream, + TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages, + TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages, + size_t* maxByteSize) +{ + TMaybe<std::pair<size_t, size_t>> readyThreshold = GetReadyThreshold(); + Y_ASSERT(readyThreshold); + auto& msg = GetServerMessage(); + i64 minOffset = Max<i64>(); + i64 maxOffset = 0; + const auto prevReadingMessage = CurrentReadingMessage; + while (HasMoreData() && *maxByteSize > 0 && CurrentReadingMessage <= *readyThreshold) { + auto& batch = *msg.mutable_batches(CurrentReadingMessage.first); + if (CurrentReadingMessage.second < static_cast<size_t>(batch.message_data_size())) { + const auto& meta = GetBatchMeta(CurrentReadingMessage.first); + const TInstant batchWriteTimestamp = [&batch](){ + if constexpr (UseMigrationProtocol) { + return TInstant::MilliSeconds(batch.write_timestamp_ms()); + } else { + return TInstant::MilliSeconds( + ::google::protobuf::util::TimeUtil::TimestampToMilliseconds(batch.written_at())); + } + }(); + auto& messageData = *batch.mutable_message_data(CurrentReadingMessage.second); + minOffset = Min(minOffset, static_cast<i64>(messageData.offset())); + maxOffset = Max(maxOffset, static_cast<i64>(messageData.offset())); + + if constexpr (UseMigrationProtocol) { + TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo( + messageData.offset(), + batch.source_id(), + messageData.seq_no(), + TInstant::MilliSeconds(messageData.create_timestamp_ms()), + batchWriteTimestamp, + batch.ip(), + meta, + messageData.uncompressed_size() + ); + if (DoDecompress) { + messages->emplace_back( + messageData.data(), + GetDecompressionError(CurrentReadingMessage.first, CurrentReadingMessage.second), + messageInfo, + partitionStream, + messageData.partition_key(), + messageData.explicit_hash() + ); + } else { + compressedMessages->emplace_back( + static_cast<ECodec>(messageData.codec()), + messageData.data(), + TVector<TReadSessionEvent::TDataReceivedEvent::TMessageInformation>{messageInfo}, + partitionStream, + messageData.partition_key(), + messageData.explicit_hash() + ); + } + } else { + NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo( + messageData.offset(), + batch.producer_id(), + messageData.seq_no(), + TInstant::MilliSeconds(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(messageData.created_at())), + batchWriteTimestamp, + meta, + messageData.uncompressed_size(), + messageData.message_group_id() + ); + if (DoDecompress) { + messages->emplace_back( + messageData.data(), + GetDecompressionError(CurrentReadingMessage.first, CurrentReadingMessage.second), + messageInfo, + partitionStream + ); + } else { + compressedMessages->emplace_back( + static_cast<NTopic::ECodec>(batch.codec()), + messageData.data(), + messageInfo, + partitionStream + ); + } + } + + *maxByteSize -= Min(*maxByteSize, messageData.data().size()); + + // Clear data to free internal session's memory. + messageData.clear_data(); + } + + ++CurrentReadingMessage.second; + if (CurrentReadingMessage.second >= static_cast<size_t>(batch.message_data_size())) { + CurrentReadingMessage.second = 0; + do { + ++CurrentReadingMessage.first; + } while (CurrentReadingMessage.first < static_cast<size_t>(msg.batches_size()) && msg.batches(CurrentReadingMessage.first).message_data_size() == 0); + } + } + partitionStream->GetLog().Write(TLOG_DEBUG, TStringBuilder() << "Take Data. Partition " << partitionStream->GetPartitionId() + << ". Read: {" << prevReadingMessage.first << ", " << prevReadingMessage.second << "} -> {" + << CurrentReadingMessage.first << ", " << CurrentReadingMessage.second << "} (" + << minOffset << "-" << maxOffset << ")"); + return CurrentReadingMessage <= *readyThreshold; +} + +template<bool UseMigrationProtocol> +bool TDataDecompressionInfo<UseMigrationProtocol>::HasReadyUnreadData() const { + TMaybe<std::pair<size_t, size_t>> threshold = GetReadyThreshold(); + if (!threshold) { + return false; + } + return CurrentReadingMessage <= *threshold; +} + +template <bool UseMigrationProtocol> +void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::Add(size_t batch, size_t message, + size_t sourceDataSize, + size_t estimatedDecompressedSize) { + if (Messages.empty() || Messages.back().Batch != batch) { + Messages.push_back({ batch, { message, message + 1 } }); + } + Messages.back().MessageRange.second = message + 1; + SourceDataSize += sourceDataSize; + EstimatedDecompressedSize += estimatedDecompressedSize; + Ready->Batch = batch; + Ready->Message = message; +} + +template <bool UseMigrationProtocol> +TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::TDecompressionTask( + TDataDecompressionInfo* parent, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, + TReadyMessageThreshold* ready) + : Parent(parent) + , PartitionStream(std::move(partitionStream)) + , Ready(ready) { +} + +// Forward delcaration +namespace NCompressionDetails { + extern TString Decompress(const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& data); + extern TString Decompress(const Ydb::Topic::StreamReadMessage::ReadResponse::MessageData& data, Ydb::Topic::Codec codec); +} + +template<bool UseMigrationProtocol> +void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator()() { + i64 minOffset = Max<i64>(); + i64 maxOffset = 0; + const i64 partition_id = [this](){ + if constexpr (UseMigrationProtocol) { + return Parent->ServerMessage.partition(); + } else { + return Parent->ServerMessage.partition_session_id(); + } + }(); + i64 dataProcessed = 0; + size_t messagesProcessed = 0; + for (const TMessageRange& messages : Messages) { + auto& batch = *Parent->ServerMessage.mutable_batches(messages.Batch); + for (size_t i = messages.MessageRange.first; i < messages.MessageRange.second; ++i) { + auto& data = *batch.mutable_message_data(i); + + ++messagesProcessed; + dataProcessed += static_cast<i64>(data.data().size()); + minOffset = Min(minOffset, static_cast<i64>(data.offset())); + maxOffset = Max(maxOffset, static_cast<i64>(data.offset())); + + try { + + if constexpr (UseMigrationProtocol) { + if (Parent->DoDecompress + && data.codec() != Ydb::PersQueue::V1::CODEC_RAW + && data.codec() != Ydb::PersQueue::V1::CODEC_UNSPECIFIED + ) { + TString decompressed = NCompressionDetails::Decompress(data); + data.set_data(decompressed); + data.set_codec(Ydb::PersQueue::V1::CODEC_RAW); + } + } else { + if (Parent->DoDecompress + && static_cast<Ydb::Topic::Codec>(batch.codec()) != Ydb::Topic::CODEC_RAW + && static_cast<Ydb::Topic::Codec>(batch.codec()) != Ydb::Topic::CODEC_UNSPECIFIED + ) { + TString decompressed = NCompressionDetails::Decompress(data, static_cast<Ydb::Topic::Codec>(batch.codec())); + data.set_data(decompressed); + } + } + + DecompressedSize += data.data().size(); + } catch (...) { + Parent->PutDecompressionError(std::current_exception(), messages.Batch, i); + data.clear_data(); // Free memory, because we don't count it. + + std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Parent->Session.lock(); + if (session) { + session->GetLog() << TLOG_INFO << "Error decompressing data: " << CurrentExceptionMessage(); + } + } + } + } + if (auto session = Parent->Session.lock()) { + session->GetLog().Write(TLOG_DEBUG, TStringBuilder() << "Decompression task done. Partition/PartitionSessionId: " + << partition_id << " (" << minOffset << "-" + << maxOffset << ")"); + } + Y_ASSERT(dataProcessed == SourceDataSize); + std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Parent->Session.lock(); + + if (session) { + session->OnDataDecompressed(SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed); + } + + Parent->SourceDataNotProcessed -= dataProcessed; + Ready->Ready = true; + + if (session) { + session->GetEventsQueue()->SignalReadyEvents(PartitionStream.Get()); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TRawPartitionStreamEvent + +template <bool UseMigrationProtocol> +void TRawPartitionStreamEvent<UseMigrationProtocol>::Signal(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, + TReadSessionEventsQueue<UseMigrationProtocol>* queue, + TDeferredActions<UseMigrationProtocol>& deferred) { + if (!Signalled) { + Signalled = true; + queue->SignalEventImpl(partitionStream, deferred); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TDeferredActions + +template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::DeferReadFromProcessor(const typename IProcessor<UseMigrationProtocol>::TPtr& processor, + TServerMessage<UseMigrationProtocol>* dst, + typename IProcessor<UseMigrationProtocol>::TReadCallback callback) +{ + Y_ASSERT(!Processor); + Y_ASSERT(!ReadDst); + Y_ASSERT(!ReadCallback); + Processor = processor; + ReadDst = dst; + ReadCallback = std::move(callback); +} + +template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::DeferStartExecutorTask(const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, typename IAExecutor<UseMigrationProtocol>::TFunction task) { + ExecutorsTasks.emplace_back(executor, std::move(task)); +} + +template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) { + ErrorHandler = errorHandler; + SessionClosedEvent.ConstructInPlace(std::move(closeEvent)); +} + +template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, EStatus statusCode, NYql::TIssues&& issues) { + DeferAbortSession(errorHandler, TASessionClosedEvent<UseMigrationProtocol>(statusCode, std::move(issues))); +} + +template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, EStatus statusCode, const TString& message) { + NYql::TIssues issues; + issues.AddIssue(message); + DeferAbortSession(errorHandler, statusCode, std::move(issues)); +} + +template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TPlainStatus&& status) { + DeferAbortSession(errorHandler, TASessionClosedEvent<UseMigrationProtocol>(std::move(status))); +} + +template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TPlainStatus&& status) { + Session = std::move(session); + ErrorHandler = errorHandler; + ReconnectionStatus = std::move(status); +} + +template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::DeferStartSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session) { + Sessions.push_back(std::move(session)); +} + +template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::DeferSignalWaiter(TWaiter&& waiter) { + Waiters.emplace_back(std::move(waiter)); +} + +template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::DoActions() { + Read(); + StartExecutorTasks(); + AbortSession(); + Reconnect(); + SignalWaiters(); + StartSessions(); +} + +template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::StartSessions() { + for (auto& session : Sessions) { + session->Start(); + } +} + +template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::Read() { + if (ReadDst) { + Y_ASSERT(Processor); + Y_ASSERT(ReadCallback); + Processor->Read(ReadDst, std::move(ReadCallback)); + } +} + +template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::StartExecutorTasks() { + for (auto&& [executor, task] : ExecutorsTasks) { + executor->Post(std::move(task)); + } +} + +template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::AbortSession() { + if (SessionClosedEvent) { + Y_ASSERT(ErrorHandler); + ErrorHandler->AbortSession(std::move(*SessionClosedEvent)); + } +} + +template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::Reconnect() { + if (Session) { + Y_ASSERT(ErrorHandler); + if (!Session->Reconnect(ReconnectionStatus)) { + ErrorHandler->AbortSession(std::move(ReconnectionStatus)); + } + } +} + +template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::SignalWaiters() { + for (auto& w : Waiters) { + w.Signal(); + } +} + +template<bool UseMigrationProtocol> +void TErrorHandler<UseMigrationProtocol>::AbortSession(TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) { + if (auto session = Session.lock()) { + session->Abort(std::move(closeEvent)); + } +} + +#define HISTOGRAM_SETUP NMonitoring::ExplicitHistogram({0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}) + +template <typename TReaderCounters> +void MakeCountersNotNull(TReaderCounters& counters) { + if (!counters.Errors) { + counters.Errors = MakeIntrusive<NMonitoring::TCounterForPtr>(true); + } + + if (!counters.CurrentSessionLifetimeMs) { + counters.CurrentSessionLifetimeMs = MakeIntrusive<NMonitoring::TCounterForPtr>(false); + } + + if (!counters.BytesRead) { + counters.BytesRead = MakeIntrusive<NMonitoring::TCounterForPtr>(true); + } + + if (!counters.MessagesRead) { + counters.MessagesRead = MakeIntrusive<NMonitoring::TCounterForPtr>(true); + } + + if (!counters.BytesReadCompressed) { + counters.BytesReadCompressed = MakeIntrusive<NMonitoring::TCounterForPtr>(true); + } + + if (!counters.BytesInflightUncompressed) { + counters.BytesInflightUncompressed = MakeIntrusive<NMonitoring::TCounterForPtr>(false); + } + + if (!counters.BytesInflightCompressed) { + counters.BytesInflightCompressed = MakeIntrusive<NMonitoring::TCounterForPtr>(false); + } + + if (!counters.BytesInflightTotal) { + counters.BytesInflightTotal = MakeIntrusive<NMonitoring::TCounterForPtr>(false); + } + + if (!counters.MessagesInflight) { + counters.MessagesInflight = MakeIntrusive<NMonitoring::TCounterForPtr>(false); + } + + + if (!counters.TotalBytesInflightUsageByTime) { + counters.TotalBytesInflightUsageByTime = MakeIntrusive<NMonitoring::THistogramCounter>(HISTOGRAM_SETUP); + } + + if (!counters.UncompressedBytesInflightUsageByTime) { + counters.UncompressedBytesInflightUsageByTime = MakeIntrusive<NMonitoring::THistogramCounter>(HISTOGRAM_SETUP); + } + + if (!counters.CompressedBytesInflightUsageByTime) { + counters.CompressedBytesInflightUsageByTime = MakeIntrusive<NMonitoring::THistogramCounter>(HISTOGRAM_SETUP); + } +} + +#undef HISTOGRAM_SETUP + +template <typename TReaderCounters> +bool HasNullCounters(TReaderCounters& counters) { + return !counters.Errors + || !counters.CurrentSessionLifetimeMs + || !counters.BytesRead + || !counters.MessagesRead + || !counters.BytesReadCompressed + || !counters.BytesInflightUncompressed + || !counters.BytesInflightCompressed + || !counters.BytesInflightTotal + || !counters.MessagesInflight + || !counters.TotalBytesInflightUsageByTime + || !counters.UncompressedBytesInflightUsageByTime + || !counters.CompressedBytesInflightUsageByTime; +} + +} // namespace NYdb::NPersQueue diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session_messages.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session_messages.cpp index 2f0fd93a8f..5e216cccc6 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session_messages.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session_messages.cpp @@ -163,7 +163,7 @@ TReadSessionEvent::TDataReceivedEvent::TMessage::TMessage(const TString& data, } void TReadSessionEvent::TDataReceivedEvent::TMessage::Commit() { - static_cast<TPartitionStreamImpl*>(PartitionStream.Get())->Commit(Information.Offset, Information.Offset + 1); + static_cast<TPartitionStreamImpl<true>*>(PartitionStream.Get())->Commit(Information.Offset, Information.Offset + 1); } ui64 TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetBlocksCount() const { @@ -233,7 +233,7 @@ TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::TCompressedMessage(EC {} void TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::Commit() { - static_cast<TPartitionStreamImpl*>(PartitionStream.Get())->Commit( + static_cast<TPartitionStreamImpl<true>*>(PartitionStream.Get())->Commit( Information.front().Offset, Information.back().Offset + 1 ); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h index 05824e111c..89616f2a2e 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h @@ -21,8 +21,9 @@ inline const TString& GetCodecId(const ECodec codec) { return idByCodec[codec]; } -class TWriteSessionEventsQueue : public TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent> { - using TParent = TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent>; +class TWriteSessionEventsQueue: public TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor> { + using TParent = TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor>; + public: TWriteSessionEventsQueue(const TWriteSessionSettings& settings) : TParent(settings) diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/CMakeLists.darwin.txt b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/CMakeLists.darwin.txt index 3c4c7b5d3a..518d94d355 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/CMakeLists.darwin.txt +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/CMakeLists.darwin.txt @@ -44,7 +44,6 @@ target_sources(ydb-public-sdk-cpp-client-ydb_persqueue_core-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/compress_executor_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/compression_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.cpp ) add_test( NAME diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/CMakeLists.linux.txt b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/CMakeLists.linux.txt index 83e39b61d9..f5554fd19f 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/CMakeLists.linux.txt +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/CMakeLists.linux.txt @@ -48,7 +48,6 @@ target_sources(ydb-public-sdk-cpp-client-ydb_persqueue_core-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/compress_executor_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/compression_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.cpp ) add_test( NAME diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp index 2260d1acb7..0619536fb6 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp @@ -1,4 +1,4 @@ -#include "ut_utils.h" +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h> #include <library/cpp/testing/unittest/registar.h> #include <library/cpp/threading/future/future.h> diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/compress_executor_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/compress_executor_ut.cpp index 8a85c85eda..9fc0106996 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/compress_executor_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/compress_executor_ut.cpp @@ -1,4 +1,4 @@ -#include "ut_utils.h" +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h> namespace NYdb::NPersQueue::NTests { @@ -102,4 +102,3 @@ Y_UNIT_TEST_SUITE(CompressExecutor) { } } }; - diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/compression_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/compression_ut.cpp index 0603c709f6..5ebadf19ba 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/compression_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/compression_ut.cpp @@ -1,4 +1,4 @@ -#include "ut_utils.h" +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h> namespace NYdb::NPersQueue::NTests { @@ -48,7 +48,7 @@ Y_UNIT_TEST_SUITE(Compression) { rr->add_supported_codecs(Ydb::PersQueue::V1::CODEC_GZIP); rr->add_supported_codecs(Ydb::PersQueue::V1::CODEC_ZSTD); rr->set_version(1); - + Ydb::PersQueue::V1::AlterTopicResponse response; grpc::ClientContext rcontext; auto status = stub->AlterTopic(&rcontext, request, &response); @@ -102,7 +102,7 @@ Y_UNIT_TEST_SUITE(Compression) { ++totalReceived; } } - Cerr << Endl << "totalReceived: " << totalReceived << " wait: " << messages.size() << Endl << Endl; + Cerr << Endl << "totalReceived: " << totalReceived << " wait: " << messages.size() << Endl << Endl; if (totalReceived == messages.size()) checkedPromise.SetValue(); } @@ -112,7 +112,7 @@ Y_UNIT_TEST_SUITE(Compression) { checkedPromise.GetFuture().GetValueSync(); } - void WriteWithOneCodec(TPersQueueYdbSdkTestSetup& setup, ECodec codec) { + void WriteWithOneCodec(TPersQueueYdbSdkTestSetup& setup, ECodec codec) { AlterTopic(setup); // add zstd support auto messages = GetTestMessages(); @@ -123,17 +123,17 @@ Y_UNIT_TEST_SUITE(Compression) { Y_UNIT_TEST(WriteRAW) { TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME); - WriteWithOneCodec(setup, ECodec::RAW); + WriteWithOneCodec(setup, ECodec::RAW); } Y_UNIT_TEST(WriteGZIP) { TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME); - WriteWithOneCodec(setup, ECodec::GZIP); + WriteWithOneCodec(setup, ECodec::GZIP); } Y_UNIT_TEST(WriteZSTD) { TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME); - WriteWithOneCodec(setup, ECodec::ZSTD); + WriteWithOneCodec(setup, ECodec::ZSTD); } Y_UNIT_TEST(WriteWithMixedCodecs) { diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp index 3583786c63..10c5ec57f1 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp @@ -1,4 +1,4 @@ -#include "ut_utils.h" +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h> #define INCLUDE_YDB_INTERNAL_H #include <ydb/public/sdk/cpp/client/impl/ydb_internal/logger/log.h> @@ -450,7 +450,7 @@ public: using IReadSessionConnectionProcessorFactory = ISessionConnectionProcessorFactory<Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>; using TMockProcessorFactory = ::TMockProcessorFactory<Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>; - struct TMockErrorHandler : public IErrorHandler { + struct TMockErrorHandler : public IErrorHandler<true> { MOCK_METHOD(void, AbortSession, (TSessionClosedEvent&& closeEvent), (override)); }; @@ -482,9 +482,9 @@ public: TReadSessionImplTestSetup(); ~TReadSessionImplTestSetup() noexcept(false); // Performs extra validation and UNIT_ASSERTs - TSingleClusterReadSessionImpl* GetSession(); + TSingleClusterReadSessionImpl<true>* GetSession(); - std::shared_ptr<TReadSessionEventsQueue> GetEventsQueue(); + std::shared_ptr<TReadSessionEventsQueue<true>> GetEventsQueue(); ::IExecutor::TPtr GetDefaultExecutor(); void SuccessfulInit(bool flag = true); @@ -498,14 +498,14 @@ public: TReadSessionSettings Settings; TString ClusterName = "cluster"; TLog Log = CreateLogBackend("cerr"); - std::shared_ptr<TReadSessionEventsQueue> EventsQueue; + std::shared_ptr<TReadSessionEventsQueue<true>> EventsQueue; TIntrusivePtr<testing::StrictMock<TMockErrorHandler>> MockErrorHandler = MakeIntrusive<testing::StrictMock<TMockErrorHandler>>(); std::shared_ptr<TFakeContext> FakeContext = std::make_shared<TFakeContext>(); std::shared_ptr<TMockProcessorFactory> MockProcessorFactory = std::make_shared<TMockProcessorFactory>(); TIntrusivePtr<TMockReadSessionProcessor> MockProcessor = MakeIntrusive<TMockReadSessionProcessor>(); ui64 PartitionIdStart = 1; ui64 PartitionIdStep = 1; - TSingleClusterReadSessionImpl::TPtr Session; + typename TSingleClusterReadSessionImpl<true>::TPtr Session; std::shared_ptr<TThreadPool> ThreadPool; ::IExecutor::TPtr DefaultExecutor; }; @@ -609,7 +609,7 @@ TReadSessionImplTestSetup::~TReadSessionImplTestSetup() noexcept(false) { return DefaultExecutor; } -TSingleClusterReadSessionImpl* TReadSessionImplTestSetup::GetSession() { +TSingleClusterReadSessionImpl<true>* TReadSessionImplTestSetup::GetSession() { if (!Session) { if (!Settings.DecompressionExecutor_) { Settings.DecompressionExecutor(GetDefaultExecutor()); @@ -617,7 +617,7 @@ TSingleClusterReadSessionImpl* TReadSessionImplTestSetup::GetSession() { if (!Settings.EventHandlers_.HandlersExecutor_) { Settings.EventHandlers_.HandlersExecutor(GetDefaultExecutor()); } - Session = std::make_shared<TSingleClusterReadSessionImpl>( + Session = std::make_shared<TSingleClusterReadSessionImpl<true>>( Settings, "db", "sessionid", @@ -632,9 +632,9 @@ TSingleClusterReadSessionImpl* TReadSessionImplTestSetup::GetSession() { return Session.get(); } -std::shared_ptr<TReadSessionEventsQueue> TReadSessionImplTestSetup::GetEventsQueue() { +std::shared_ptr<TReadSessionEventsQueue<true>> TReadSessionImplTestSetup::GetEventsQueue() { if (!EventsQueue) { - EventsQueue = std::make_shared<TReadSessionEventsQueue>(Settings, std::weak_ptr<IUserRetrievedEventCallback>()); + EventsQueue = std::make_shared<TReadSessionEventsQueue<true>>(Settings, std::weak_ptr<IUserRetrievedEventCallback<true>>()); } return EventsQueue; } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp index 50d050e973..2e98ba792c 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp @@ -1,4 +1,4 @@ -#include "ut_utils.h" +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h> #include <library/cpp/threading/future/future.h> #include <library/cpp/testing/unittest/registar.h> diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/CMakeLists.txt index 9d80c1d532..95b05a2af5 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/CMakeLists.txt +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/CMakeLists.txt @@ -27,4 +27,5 @@ target_link_libraries(ydb_persqueue_core-ut-ut_utils PUBLIC target_sources(ydb_persqueue_core-ut-ut_utils PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.cpp ) diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.cpp index 6a853b6fc5..6a853b6fc5 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.cpp diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h index 69d62dfa48..69d62dfa48 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/with_offset_ranges_mode_ut/CMakeLists.darwin.txt b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/with_offset_ranges_mode_ut/CMakeLists.darwin.txt index 133670b9ec..099d7fe3b3 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/with_offset_ranges_mode_ut/CMakeLists.darwin.txt +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/with_offset_ranges_mode_ut/CMakeLists.darwin.txt @@ -36,7 +36,6 @@ target_sources(with_offset_ranges_mode_ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/compress_executor_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.cpp ) add_test( NAME diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/with_offset_ranges_mode_ut/CMakeLists.linux.txt b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/with_offset_ranges_mode_ut/CMakeLists.linux.txt index b1134e6c96..f46e606864 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/with_offset_ranges_mode_ut/CMakeLists.linux.txt +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/with_offset_ranges_mode_ut/CMakeLists.linux.txt @@ -40,7 +40,6 @@ target_sources(with_offset_ranges_mode_ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/compress_executor_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.cpp ) add_test( NAME diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/CMakeLists.txt index 00830fd9a3..ace9fc7214 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/CMakeLists.txt +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/CMakeLists.txt @@ -14,6 +14,7 @@ target_link_libraries(client-ydb_persqueue_public-codecs PUBLIC cpp-streams-zstd public-issue-protos api-grpc-draft + api-grpc api-protos ) target_sources(client-ydb_persqueue_public-codecs PRIVATE diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.cpp index 9c3bcbd25c..51c16f84f7 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.cpp @@ -25,6 +25,21 @@ IInputStream* CreateDecompressorStream(TInputStreamVariant& inputStreamStorage, } } +IInputStream* CreateDecompressorStream(TInputStreamVariant& inputStreamStorage, Ydb::Topic::Codec codec, IInputStream* origin) { + switch (codec) { + case Ydb::Topic::CODEC_GZIP: + return &inputStreamStorage.emplace<TZLibDecompress>(origin); + case Ydb::Topic::CODEC_LZOP: + throw yexception() << "LZO codec is disabled"; + case Ydb::Topic::CODEC_ZSTD: + return &inputStreamStorage.emplace<TZstdDecompress>(origin); + default: + //case Ydb::Topic::CODEC_RAW: + //case Ydb::Topic::CODEC_UNSPECIFIED: + throw yexception() << "unsupported codec value : " << ui64(codec); + } +} + TString Decompress(const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& data) { TMemoryInput input(data.data().data(), data.data().size()); TString result; @@ -34,6 +49,14 @@ TString Decompress(const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage return result; } +TString Decompress(const Ydb::Topic::StreamReadMessage::ReadResponse::MessageData& data, Ydb::Topic::Codec codec) { + TMemoryInput input(data.data().data(), data.data().size()); + TString result; + TStringOutput resultOutput(result); + TInputStreamVariant inputStreamStorage; + TransferData(CreateDecompressorStream(inputStreamStorage, codec, &input), &resultOutput); + return result; +} class TZLibToStringCompressor: private TEmbedPolicy<TBufferOutput>, public TZLibCompress { public: diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.h b/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.h index e3fc717765..b18d14119e 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.h @@ -2,16 +2,17 @@ #include <util/stream/output.h> #include <ydb/public/api/protos/ydb_persqueue_v1.pb.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> namespace NYdb::NPersQueue { namespace NCompressionDetails { extern TString Decompress(const Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::MessageData& data); +extern TString Decompress(const Ydb::Topic::StreamReadMessage::ReadResponse::MessageData& data, Ydb::Topic::Codec codec); THolder<IOutputStream> CreateCoder(ECodec codec, TBuffer& result, int quality); } // namespace NDecompressionDetails } // namespace NYdb::NPersQueue - diff --git a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt index 98ca02cc8d..315ac7e779 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt @@ -12,6 +12,7 @@ target_link_libraries(cpp-client-ydb_topic PUBLIC contrib-libs-cxxsupp yutil tools-enum_parser-enum_serialization_runtime + client-ydb_persqueue_public-codecs library-cpp-retry client-ydb_topic-impl cpp-client-ydb_proto diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt index 5afb6a64ea..8a4e2595b3 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt @@ -23,6 +23,11 @@ target_link_libraries(client-ydb_topic-impl PUBLIC ) target_sources(client-ydb_topic-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/counters.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/deferred_commit.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/event_handlers.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp ) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/counters.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/counters.cpp new file mode 100644 index 0000000000..da51b921a2 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/counters.cpp @@ -0,0 +1,29 @@ +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +namespace NYdb::NTopic { + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NTopic::TReaderCounters + +TReaderCounters::TReaderCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters) { + Errors = counters->GetCounter("errors", true); + CurrentSessionLifetimeMs = counters->GetCounter("currentSessionLifetimeMs", false); + BytesRead = counters->GetCounter("bytesRead", true); + MessagesRead = counters->GetCounter("messagesRead", true); + BytesReadCompressed = counters->GetCounter("bytesReadCompressed", true); + BytesInflightUncompressed = counters->GetCounter("bytesInflightUncompressed", false); + BytesInflightCompressed = counters->GetCounter("bytesInflightCompressed", false); + BytesInflightTotal = counters->GetCounter("bytesInflightTotal", false); + MessagesInflight = counters->GetCounter("messagesInflight", false); + +#define HISTOGRAM_SETUP NMonitoring::ExplicitHistogram({0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}) + + TotalBytesInflightUsageByTime = counters->GetHistogram("totalBytesInflightUsageByTime", HISTOGRAM_SETUP); + UncompressedBytesInflightUsageByTime = counters->GetHistogram("uncompressedBytesInflightUsageByTime", HISTOGRAM_SETUP); + CompressedBytesInflightUsageByTime = counters->GetHistogram("compressedBytesInflightUsageByTime", HISTOGRAM_SETUP); + +#undef HISTOGRAM_SETUP + +} + +} diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/deferred_commit.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/deferred_commit.cpp new file mode 100644 index 0000000000..c7c9316f6b --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/deferred_commit.cpp @@ -0,0 +1,131 @@ +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h> + +#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h> + +namespace NYdb::NTopic { + +std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent, ui64 index); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NTopic::TDeferredCommit + +class TDeferredCommit::TImpl { +public: + + void Add(const TPartitionSession::TPtr& partitionStream, ui64 startOffset, ui64 endOffset); + void Add(const TPartitionSession::TPtr& partitionStream, ui64 offset); + + void Add(const TReadSessionEvent::TDataReceivedEvent::TMessage& message); + void Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent); + + void Commit(); + +private: + static void Add(const TPartitionSession::TPtr& partitionStream, TDisjointIntervalTree<ui64>& offsetSet, ui64 startOffset, ui64 endOffset); + +private: + // Partition stream -> offsets set. + THashMap<TPartitionSession::TPtr, TDisjointIntervalTree<ui64>> Offsets; +}; + +TDeferredCommit::TDeferredCommit() { +} + +TDeferredCommit::TDeferredCommit(TDeferredCommit&&) = default; + +TDeferredCommit& TDeferredCommit::operator=(TDeferredCommit&&) = default; + +TDeferredCommit::~TDeferredCommit() { +} + +#define GET_IMPL() \ + if (!Impl) { \ + Impl = MakeHolder<TImpl>(); \ + } \ + Impl + +void TDeferredCommit::Add(const TPartitionSession::TPtr& partitionStream, ui64 startOffset, ui64 endOffset) { + GET_IMPL()->Add(partitionStream, startOffset, endOffset); +} + +void TDeferredCommit::Add(const TPartitionSession::TPtr& partitionStream, ui64 offset) { + GET_IMPL()->Add(partitionStream, offset); +} + +void TDeferredCommit::Add(const TReadSessionEvent::TDataReceivedEvent::TMessage& message) { + GET_IMPL()->Add(message); +} + +void TDeferredCommit::Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent) { + GET_IMPL()->Add(dataReceivedEvent); +} + +#undef GET_IMPL + +void TDeferredCommit::Commit() { + if (Impl) { + Impl->Commit(); + } +} + +void TDeferredCommit::TImpl::Add(const TReadSessionEvent::TDataReceivedEvent::TMessage& message) { + Y_ASSERT(message.GetPartitionSession()); + Add(message.GetPartitionSession(), message.GetOffset()); +} + +void TDeferredCommit::TImpl::Add(const TPartitionSession::TPtr& partitionStream, TDisjointIntervalTree<ui64>& offsetSet, ui64 startOffset, ui64 endOffset) { + if (offsetSet.Intersects(startOffset, endOffset)) { + ThrowFatalError(TStringBuilder() << "Commit set already has some offsets from half-interval [" + << startOffset << "; " << endOffset + << ") for partition stream with id " << partitionStream->GetPartitionSessionId()); + } else { + offsetSet.InsertInterval(startOffset, endOffset); + } +} + +void TDeferredCommit::TImpl::Add(const TPartitionSession::TPtr& partitionStream, ui64 startOffset, ui64 endOffset) { + Y_ASSERT(partitionStream); + Add(partitionStream, Offsets[partitionStream], startOffset, endOffset); +} + +void TDeferredCommit::TImpl::Add(const TPartitionSession::TPtr& partitionStream, ui64 offset) { + Y_ASSERT(partitionStream); + auto& offsetSet = Offsets[partitionStream]; + if (offsetSet.Has(offset)) { + ThrowFatalError(TStringBuilder() << "Commit set already has offset " << offset + << " for partition stream with id " << partitionStream->GetPartitionSessionId()); + } else { + offsetSet.Insert(offset); + } +} + +void TDeferredCommit::TImpl::Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent) { + const TPartitionSession::TPtr& partitionStream = dataReceivedEvent.GetPartitionSession(); + Y_ASSERT(partitionStream); + auto& offsetSet = Offsets[partitionStream]; + auto [startOffset, endOffset] = GetMessageOffsetRange(dataReceivedEvent, 0); + for (size_t i = 1; i < dataReceivedEvent.GetMessagesCount(); ++i) { + auto msgOffsetRange = GetMessageOffsetRange(dataReceivedEvent, i); + if (msgOffsetRange.first == endOffset) { + endOffset= msgOffsetRange.second; + } else { + Add(partitionStream, offsetSet, startOffset, endOffset); + startOffset = msgOffsetRange.first; + endOffset = msgOffsetRange.second; + } + } + Add(partitionStream, offsetSet, startOffset, endOffset); +} + +void TDeferredCommit::TImpl::Commit() { + for (auto&& [partitionStream, offsetRanges] : Offsets) { + for (auto&& [startOffset, endOffset] : offsetRanges) { + static_cast<NPersQueue::TPartitionStreamImpl<false>*>(partitionStream.Get())->Commit(startOffset, endOffset); + } + } + Offsets.clear(); +} + +} diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/event_handlers.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/event_handlers.cpp new file mode 100644 index 0000000000..60ef8ba4b6 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/event_handlers.cpp @@ -0,0 +1,142 @@ +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h> + +namespace NYdb::NTopic { + +std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent, ui64 index); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NTopic::TReadSessionEventHandlers + +class TGracefulReleasingSimpleDataHandlers : public TThrRefBase { +public: + explicit TGracefulReleasingSimpleDataHandlers(std::function<void(TReadSessionEvent::TDataReceivedEvent&)> dataHandler, bool commitAfterProcessing) + : DataHandler(std::move(dataHandler)) + , CommitAfterProcessing(commitAfterProcessing) + { + } + + void OnDataReceived(TReadSessionEvent::TDataReceivedEvent& event) { + Y_ASSERT(event.GetMessagesCount()); + TDeferredCommit deferredCommit; + with_lock (Lock) { + auto& offsetSet = PartitionStreamToUncommittedOffsets[event.GetPartitionSession()->GetPartitionSessionId()]; + // Messages could contain holes in offset, but later commit ack will tell us right border. + // So we can easily insert the whole interval with holes included. + // It will be removed from set by specifying proper right border. + auto firstMessageOffsets = GetMessageOffsetRange(event, 0); + auto lastMessageOffsets = GetMessageOffsetRange(event, event.GetMessagesCount() - 1); + + offsetSet.InsertInterval(firstMessageOffsets.first, lastMessageOffsets.second); + + if (CommitAfterProcessing) { + deferredCommit.Add(event); + } + } + DataHandler(event); + deferredCommit.Commit(); + } + + void OnCommitAcknowledgement(TReadSessionEvent::TCommitOffsetAcknowledgementEvent& event) { + with_lock (Lock) { + const ui64 partitionStreamId = event.GetPartitionSession()->GetPartitionSessionId(); + auto& offsetSet = PartitionStreamToUncommittedOffsets[partitionStreamId]; + if (offsetSet.EraseInterval(0, event.GetCommittedOffset() + 1)) { // Remove some offsets. + if (offsetSet.Empty()) { // No offsets left. + auto unconfirmedDestroyIt = UnconfirmedDestroys.find(partitionStreamId); + if (unconfirmedDestroyIt != UnconfirmedDestroys.end()) { + // Confirm and forget about this partition stream. + unconfirmedDestroyIt->second.Confirm(); + UnconfirmedDestroys.erase(unconfirmedDestroyIt); + PartitionStreamToUncommittedOffsets.erase(partitionStreamId); + } + } + } + } + } + + void OnCreatePartitionStream(TReadSessionEvent::TStartPartitionSessionEvent& event) { + with_lock (Lock) { + Y_VERIFY(PartitionStreamToUncommittedOffsets[event.GetPartitionSession()->GetPartitionSessionId()].Empty()); + } + event.Confirm(); + } + + void OnDestroyPartitionStream(TReadSessionEvent::TStopPartitionSessionEvent& event) { + with_lock (Lock) { + const ui64 partitionStreamId = event.GetPartitionSession()->GetPartitionSessionId(); + Y_VERIFY(UnconfirmedDestroys.find(partitionStreamId) == UnconfirmedDestroys.end()); + if (PartitionStreamToUncommittedOffsets[partitionStreamId].Empty()) { + PartitionStreamToUncommittedOffsets.erase(partitionStreamId); + event.Confirm(); + } else { + UnconfirmedDestroys.emplace(partitionStreamId, std::move(event)); + } + } + } + + void OnPartitionStreamClosed(TReadSessionEvent::TPartitionSessionClosedEvent& event) { + with_lock (Lock) { + const ui64 partitionStreamId = event.GetPartitionSession()->GetPartitionSessionId(); + PartitionStreamToUncommittedOffsets.erase(partitionStreamId); + UnconfirmedDestroys.erase(partitionStreamId); + } + } + +private: + TAdaptiveLock Lock; // For the case when user gave us multithreaded executor. + const std::function<void(TReadSessionEvent::TDataReceivedEvent&)> DataHandler; + const bool CommitAfterProcessing; + THashMap<ui64, TDisjointIntervalTree<ui64>> PartitionStreamToUncommittedOffsets; // Partition stream id -> set of offsets. + THashMap<ui64, TReadSessionEvent::TStopPartitionSessionEvent> UnconfirmedDestroys; // Partition stream id -> destroy events. +}; + +TReadSessionSettings::TEventHandlers& TReadSessionSettings::TEventHandlers::SimpleDataHandlers(std::function<void(TReadSessionEvent::TDataReceivedEvent&)> dataHandler, + bool commitDataAfterProcessing, + bool gracefulReleaseAfterCommit) { + Y_ASSERT(dataHandler); + + PartitionSessionStatusHandler([](TReadSessionEvent::TPartitionSessionStatusEvent&){}); + + if (gracefulReleaseAfterCommit) { + auto handlers = MakeIntrusive<TGracefulReleasingSimpleDataHandlers>(std::move(dataHandler), commitDataAfterProcessing); + DataReceivedHandler([handlers](TReadSessionEvent::TDataReceivedEvent& event) { + handlers->OnDataReceived(event); + }); + StartPartitionSessionHandler([handlers](TReadSessionEvent::TStartPartitionSessionEvent& event) { + handlers->OnCreatePartitionStream(event); + }); + StopPartitionSessionHandler([handlers](TReadSessionEvent::TStopPartitionSessionEvent& event) { + handlers->OnDestroyPartitionStream(event); + }); + CommitOffsetAcknowledgementHandler([handlers](TReadSessionEvent::TCommitOffsetAcknowledgementEvent& event) { + handlers->OnCommitAcknowledgement(event); + }); + PartitionSessionClosedHandler([handlers](TReadSessionEvent::TPartitionSessionClosedEvent& event) { + handlers->OnPartitionStreamClosed(event); + }); + } else { + if (commitDataAfterProcessing) { + DataReceivedHandler([dataHandler = std::move(dataHandler)](TReadSessionEvent::TDataReceivedEvent& event) { + TDeferredCommit deferredCommit; + deferredCommit.Add(event); + dataHandler(event); + deferredCommit.Commit(); + }); + } else { + DataReceivedHandler(std::move(dataHandler)); + } + StartPartitionSessionHandler([](TReadSessionEvent::TStartPartitionSessionEvent& event) { + event.Confirm(); + }); + StopPartitionSessionHandler([](TReadSessionEvent::TStopPartitionSessionEvent& event) { + event.Confirm(); + }); + CommitOffsetAcknowledgementHandler([](TReadSessionEvent::TCommitOffsetAcknowledgementEvent&){}); + PartitionSessionClosedHandler([](TReadSessionEvent::TPartitionSessionClosedEvent&){}); + } + return *this; +} + +} diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp new file mode 100644 index 0000000000..f01af91a29 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp @@ -0,0 +1,344 @@ +#include "read_session.h" + +#define INCLUDE_YDB_INTERNAL_H +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/logger/log.h> +#undef INCLUDE_YDB_INTERNAL_H + +#include <util/generic/guid.h> + +namespace NYdb::NTopic { + +static const TString DRIVER_IS_STOPPING_DESCRIPTION = "Driver is stopping"; + +void MakeCountersNotNull(TReaderCounters& counters); +bool HasNullCounters(TReaderCounters& counters); + +TReadSession::TReadSession(const TReadSessionSettings& settings, + std::shared_ptr<TTopicClient::TImpl> client, + std::shared_ptr<TGRpcConnectionsImpl> connections, + TDbDriverStatePtr dbDriverState) + : Settings(settings) + , SessionId(CreateGuidAsString()) + , Log(settings.Log_.GetOrElse(dbDriverState->Log)) + , Client(std::move(client)) + , Connections(std::move(connections)) + , DbDriverState(std::move(dbDriverState)) +{ + if (!Settings.RetryPolicy_) { + Settings.RetryPolicy_ = IRetryPolicy::GetDefaultPolicy(); + } + + MakeCountersIfNeeded(); +} + +TReadSession::~TReadSession() { + Abort(EStatus::ABORTED, "Aborted"); + WaitAllDecompressionTasks(); + ClearAllEvents(); +} + +void TReadSession::Start() { + ErrorHandler = MakeIntrusive<NPersQueue::TErrorHandler<false>>(weak_from_this()); + EventsQueue = std::make_shared<NPersQueue::TReadSessionEventsQueue<false>>(Settings, weak_from_this()); + + if (!ValidateSettings()) { + return; + } + + Log.Write(TLOG_INFO, GetLogPrefix() << "Starting read session"); + + NPersQueue::TDeferredActions<false> deferred; + with_lock (Lock) { + if (Aborting) { + return; + } + Topics = Settings.Topics_; + CreateClusterSessionsImpl(deferred); + } + ScheduleDumpCountersToLog(); +} + +void TReadSession::CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false>& deferred) { + // Create cluster sessions. + Log.Write( + TLOG_DEBUG, + GetLogPrefix() << "Starting single session" + ); + auto context = Client->CreateContext(); + if (!context) { + AbortImpl(EStatus::ABORTED, DRIVER_IS_STOPPING_DESCRIPTION, deferred); + return; + } + Session = std::make_shared<NPersQueue::TSingleClusterReadSessionImpl<false>>( + Settings, + DbDriverState->Database, + SessionId, + "", + Log, + Client->CreateReadSessionConnectionProcessorFactory(), + EventsQueue, + ErrorHandler, + context, + 1, 1); + + deferred.DeferStartSession(Session); +} + +bool TReadSession::ValidateSettings() { + NYql::TIssues issues; + if (Settings.Topics_.empty()) { + issues.AddIssue("Empty topics list."); + } + + if (Settings.ConsumerName_.empty()) { + issues.AddIssue("No consumer specified."); + } + + if (Settings.MaxMemoryUsageBytes_ < 1_MB) { + issues.AddIssue("Too small max memory usage. Valid values start from 1 megabyte."); + } + + if (issues) { + Abort(EStatus::BAD_REQUEST, NPersQueue::MakeIssueWithSubIssues("Invalid read session settings", issues)); + return false; + } else { + return true; + } +} + +NThreading::TFuture<void> TReadSession::WaitEvent() { + return EventsQueue->WaitEvent(); +} + +TVector<TReadSessionEvent::TEvent> TReadSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) { + return EventsQueue->GetEvents(block, maxEventsCount, maxByteSize); +} + +TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size_t maxByteSize) { + return EventsQueue->GetEvent(block, maxByteSize); +} + +bool TReadSession::Close(TDuration timeout) { + Log.Write(TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout); + // Log final counters. + DumpCountersToLog(); + with_lock (Lock) { + if (DumpCountersContext) { + DumpCountersContext->Cancel(); + DumpCountersContext.reset(); + } + } + + NPersQueue::TSingleClusterReadSessionImpl<false>::TPtr session; + NThreading::TPromise<bool> promise = NThreading::NewPromise<bool>(); + auto callback = [=]() mutable { + promise.TrySetValue(true); + }; + + NPersQueue::TDeferredActions<false> deferred; + with_lock (Lock) { + if (Closing || Aborting) { + return false; + } + + if (!timeout) { + AbortImpl(EStatus::ABORTED, "Close with zero timeout", deferred); + return false; + } + + Closing = true; + session = Session; + } + session->Close(callback); + + callback(); // For the case when there are no subsessions yet. + + auto timeoutCallback = [=](bool) mutable { + promise.TrySetValue(false); + }; + + auto timeoutContext = Connections->CreateContext(); + if (!timeoutContext) { + AbortImpl(EStatus::ABORTED, DRIVER_IS_STOPPING_DESCRIPTION, deferred); + return false; + } + Connections->ScheduleCallback(timeout, + std::move(timeoutCallback), + timeoutContext); + + // Wait. + NThreading::TFuture<bool> resultFuture = promise.GetFuture(); + const bool result = resultFuture.GetValueSync(); + if (result) { + NPersQueue::Cancel(timeoutContext); + + NYql::TIssues issues; + issues.AddIssue("Session was gracefully closed"); + EventsQueue->Close(TSessionClosedEvent(EStatus::SUCCESS, std::move(issues)), deferred); + } else { + ++*Settings.Counters_->Errors; + session->Abort(); + + NYql::TIssues issues; + issues.AddIssue(TStringBuilder() << "Session was closed after waiting " << timeout); + EventsQueue->Close(TSessionClosedEvent(EStatus::TIMEOUT, std::move(issues)), deferred); + } + + with_lock (Lock) { + Aborting = true; // Set abort flag for doing nothing on destructor. + } + return result; +} + +void TReadSession::WaitAllDecompressionTasks() { + if (Session) { + Session->WaitAllDecompressionTasks(); + } +} + +void TReadSession::ClearAllEvents() { + EventsQueue->ClearAllEvents(); +} + +TStringBuilder TReadSession::GetLogPrefix() const { + return TStringBuilder() << GetDatabaseLogPrefix(DbDriverState->Database) << "[" << SessionId << "] "; +} + +static ELogPriority GetEventLogPriority(const TReadSessionEvent::TEvent& event) { + if (std::holds_alternative<TReadSessionEvent::TStartPartitionSessionEvent>(event) + || std::holds_alternative<TReadSessionEvent::TStopPartitionSessionEvent>(event) + || std::holds_alternative<TReadSessionEvent::TPartitionSessionClosedEvent>(event) + || std::holds_alternative<TSessionClosedEvent>(event)) + { // Control event. + return TLOG_INFO; + } else { + return TLOG_DEBUG; + } +} + +void TReadSession::OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) { + Log.Write(GetEventLogPriority(event), GetLogPrefix() << "Read session event " << DebugString(event)); +} + +void TReadSession::MakeCountersIfNeeded() { + if (!Settings.Counters_ || NPersQueue::HasNullCounters(*Settings.Counters_)) { + TReaderCounters::TPtr counters = MakeIntrusive<TReaderCounters>(); + if (Settings.Counters_) { + *counters = *Settings.Counters_; // Copy all counters that have been set by user. + } + NPersQueue::MakeCountersNotNull(*counters); + Settings.Counters(counters); + } +} + +void TReadSession::DumpCountersToLog(size_t timeNumber) { + const bool logCounters = timeNumber % 60 == 0; // Every 1 minute. + const bool dumpSessionsStatistics = timeNumber % 600 == 0; // Every 10 minutes. + + *Settings.Counters_->CurrentSessionLifetimeMs = (TInstant::Now() - StartSessionTime).MilliSeconds(); + NPersQueue::TSingleClusterReadSessionImpl<false>::TPtr session; + with_lock (Lock) { + if (Closing || Aborting) { + return; + } + + session = Session; + } + + { + TMaybe<TLogElement> log; + if (dumpSessionsStatistics) { + log.ConstructInPlace(&Log, TLOG_INFO); + (*log) << "Read/commit by partition streams (cluster:topic:partition:stream-id:read-offset:committed-offset):"; + } + session->UpdateMemoryUsageStatistics(); + if (dumpSessionsStatistics) { + session->DumpStatisticsToLog(*log); + } + } + +#define C(counter) \ + << " " Y_STRINGIZE(counter) ": " \ + << Settings.Counters_->counter->Val() \ + /**/ + + if (logCounters) { + Log.Write(TLOG_INFO, + GetLogPrefix() << "Counters: {" + C(Errors) + C(CurrentSessionLifetimeMs) + C(BytesRead) + C(MessagesRead) + C(BytesReadCompressed) + C(BytesInflightUncompressed) + C(BytesInflightCompressed) + C(BytesInflightTotal) + C(MessagesInflight) + << " }" + ); + } + +#undef C + + ScheduleDumpCountersToLog(timeNumber + 1); +} + +void TReadSession::ScheduleDumpCountersToLog(size_t timeNumber) { + with_lock(Lock) { + DumpCountersContext = Connections->CreateContext(); + } + if (DumpCountersContext) { + auto callback = [self = weak_from_this(), timeNumber](bool ok) { + if (ok) { + if (auto sharedSelf = self.lock()) { + sharedSelf->DumpCountersToLog(timeNumber); + } + } + }; + Connections->ScheduleCallback(TDuration::Seconds(1), + std::move(callback), + DumpCountersContext); + } +} + +void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, NPersQueue::TDeferredActions<false>& deferred) { + if (!Aborting) { + Aborting = true; + Log.Write(TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString()); + if (DumpCountersContext) { + DumpCountersContext->Cancel(); + DumpCountersContext.reset(); + } + Session->Abort(); + EventsQueue->Close(std::move(closeEvent), deferred); + } +} + +void TReadSession::AbortImpl(EStatus statusCode, NYql::TIssues&& issues, NPersQueue::TDeferredActions<false>& deferred) { + AbortImpl(TSessionClosedEvent(statusCode, std::move(issues)), deferred); +} + +void TReadSession::AbortImpl(EStatus statusCode, const TString& message, NPersQueue::TDeferredActions<false>& deferred) { + NYql::TIssues issues; + issues.AddIssue(message); + AbortImpl(statusCode, std::move(issues), deferred); +} + +void TReadSession::Abort(EStatus statusCode, NYql::TIssues&& issues) { + Abort(TSessionClosedEvent(statusCode, std::move(issues))); +} + +void TReadSession::Abort(EStatus statusCode, const TString& message) { + NYql::TIssues issues; + issues.AddIssue(message); + Abort(statusCode, std::move(issues)); +} + +void TReadSession::Abort(TSessionClosedEvent&& closeEvent) { + NPersQueue::TDeferredActions<false> deferred; + with_lock (Lock) { + AbortImpl(std::move(closeEvent), deferred); + } +} + +} diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h new file mode 100644 index 0000000000..bd7329f276 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h @@ -0,0 +1,136 @@ +#pragma once + +#include "topic_impl.h" + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h> + +namespace NYdb::NTopic { + +class TDummyReadSession: public IReadSession, public std::enable_shared_from_this<TDummyReadSession> { +public: + TDummyReadSession() = default; + + inline TDummyReadSession(const TReadSessionSettings& settings) { + (void)settings; + } + + inline NThreading::TFuture<void> WaitEvent() override { + Y_VERIFY(false); + + NThreading::TPromise<void> promise = NThreading::NewPromise<void>(); + return promise.GetFuture(); + } + + inline TVector<TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) override { + Y_VERIFY(false); + + (void)block; + (void)maxEventsCount; + (void)maxByteSize; + return {}; + } + + inline TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override { + Y_VERIFY(false); + + (void)block; + (void)maxByteSize; + return {}; + } + + inline bool Close(TDuration timeout) override { + Y_VERIFY(false); + + return !(bool)timeout; + } + + inline TString GetSessionId() const override { + Y_VERIFY(false); + + return "dummy_session_id"; + } + + inline TReaderCounters::TPtr GetCounters() const override { + Y_VERIFY(false); + + return nullptr; + } +}; + +class TReadSession : public IReadSession, + public NPersQueue::IUserRetrievedEventCallback<false>, + public std::enable_shared_from_this<TReadSession> { +public: + TReadSession(const TReadSessionSettings& settings, + std::shared_ptr<TTopicClient::TImpl> client, + std::shared_ptr<TGRpcConnectionsImpl> connections, + TDbDriverStatePtr dbDriverState); + + ~TReadSession(); + + void Start(); + + NThreading::TFuture<void> WaitEvent() override; + TVector<TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) override; + TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override; + + bool Close(TDuration timeout) override; + + inline TString GetSessionId() const override { + return SessionId; + } + + inline TReaderCounters::TPtr GetCounters() const override { + return Settings.Counters_; // Always not nullptr. + } + + void Abort(TSessionClosedEvent&& closeEvent); + + void WaitAllDecompressionTasks(); + void ClearAllEvents(); + +private: + TStringBuilder GetLogPrefix() const; + + // Start + bool ValidateSettings(); + + void CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false>& deferred); + + void OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) override; + + void MakeCountersIfNeeded(); + void DumpCountersToLog(size_t timeNumber = 0); + void ScheduleDumpCountersToLog(size_t timeNumber = 0); + + // Shutdown. + void Abort(EStatus statusCode, NYql::TIssues&& issues); + void Abort(EStatus statusCode, const TString& message); + + void AbortImpl(TSessionClosedEvent&& closeEvent, NPersQueue::TDeferredActions<false>& deferred); + void AbortImpl(EStatus statusCode, NYql::TIssues&& issues, NPersQueue::TDeferredActions<false>& deferred); + void AbortImpl(EStatus statusCode, const TString& message, NPersQueue::TDeferredActions<false>& deferred); + +private: + TReadSessionSettings Settings; + const TString SessionId; + const TInstant StartSessionTime = TInstant::Now(); + TLog Log; + std::shared_ptr<TTopicClient::TImpl> Client; + std::shared_ptr<TGRpcConnectionsImpl> Connections; + NPersQueue::IErrorHandler<false>::TPtr ErrorHandler; + TDbDriverStatePtr DbDriverState; + TAdaptiveLock Lock; + std::shared_ptr<NPersQueue::TReadSessionEventsQueue<false>> EventsQueue; + + NPersQueue::TSingleClusterReadSessionImpl<false>::TPtr Session; + TVector<TTopicReadSettings> Topics; + + NGrpc::IQueueClientContextPtr DumpCountersContext; + + // Exiting. + bool Aborting = false; + bool Closing = false; +}; + +} diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp new file mode 100644 index 0000000000..adb43a2c0d --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp @@ -0,0 +1,392 @@ +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h> + +namespace NYdb::NTopic { + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Helpers + +std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent, ui64 index) { + if (dataReceivedEvent.HasCompressedMessages()) { + const auto& msg = dataReceivedEvent.GetCompressedMessages()[index]; + return {msg.GetOffset(), msg.GetOffset() + 1}; + } + const auto& msg = dataReceivedEvent.GetMessages()[index]; + return {msg.GetOffset(), msg.GetOffset() + 1}; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation + +TReadSessionEvent::TDataReceivedEvent::TMessageInformation::TMessageInformation( + ui64 offset, + TString producerId, + ui64 seqNo, + TInstant createTime, + TInstant writeTime, + TWriteSessionMeta::TPtr meta, + ui64 uncompressedSize, + TString messageGroupId +) + : Offset(offset) + , ProducerId(producerId) + , SeqNo(seqNo) + , CreateTime(createTime) + , WriteTime(writeTime) + , Meta(meta) + , UncompressedSize(uncompressedSize) + , MessageGroupId(messageGroupId) +{} + +static void DebugStringImpl(const TReadSessionEvent::TDataReceivedEvent::TMessageInformation& info, TStringBuilder& ret) { + ret << " Information: {" + << " Offset: " << info.Offset + << " ProducerId: \"" << info.ProducerId << "\"" + << " SeqNo: " << info.SeqNo + << " CreateTime: " << info.CreateTime + << " WriteTime: " << info.WriteTime + << " UncompressedSize: " << info.UncompressedSize + << " MessageGroupId: \"" << info.MessageGroupId << "\""; + ret << " Meta: {"; + bool firstKey = true; + for (const auto& [k, v] : info.Meta->Fields) { + ret << (firstKey ? " \"" : ", \"") << k << "\": \"" << v << "\""; + firstKey = false; + } + ret << " } }"; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NTopic::TReadSessionEvent::TDataReceivedEvent::IMessage + +TReadSessionEvent::TDataReceivedEvent::IMessage::IMessage(const TString& data, + TPartitionSession::TPtr partitionSession) + : Data(data) + , PartitionSession(partitionSession) +{} + +const TString& TReadSessionEvent::TDataReceivedEvent::IMessage::GetData() const { + return Data; +} + +const TPartitionSession::TPtr& TReadSessionEvent::TDataReceivedEvent::IMessage::GetPartitionSession() const { + return PartitionSession; +} + +TString TReadSessionEvent::TDataReceivedEvent::IMessage::DebugString(bool printData) const { + TStringBuilder ret; + DebugString(ret, printData); + return std::move(ret); +} + +template <class TSerializeInformationFunc> +static void DebugStringImpl(TStringBuilder& ret, + const TString& name, + const TReadSessionEvent::TDataReceivedEvent::IMessage& msg, + bool printData, + TSerializeInformationFunc serializeInformationFunc, + std::optional<ECodec> codec = std::nullopt) +{ + ret << name << " {"; + try { + const TString& data = msg.GetData(); + if (printData) { + ret << " Data: \"" << data << "\""; + } else { + ret << " Data: .." << data.size() << " bytes.."; + } + } catch (...) { + ret << " DataDecompressionError: \"" << CurrentExceptionMessage() << "\""; + } + auto partitionSession = msg.GetPartitionSession(); + ret << " Partition session id: " << partitionSession->GetPartitionSessionId() + << " Topic: \"" << partitionSession->GetTopicPath() << "\"" + << " Partition: " << partitionSession->GetPartitionId(); + if (codec) { + ret << " Codec: " << codec.value(); + } + serializeInformationFunc(ret); + ret << " }"; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage + +TReadSessionEvent::TDataReceivedEvent::TMessage::TMessage(const TString& data, + std::exception_ptr decompressionException, + const TMessageInformation& information, + TPartitionSession::TPtr partitionSession) + : IMessage(data, partitionSession) + , DecompressionException(std::move(decompressionException)) + , Information(information) +{ +} + +const TString& TReadSessionEvent::TDataReceivedEvent::TMessage::GetData() const { + if (DecompressionException) { + std::rethrow_exception(DecompressionException); + } + return IMessage::GetData(); +} + +bool TReadSessionEvent::TDataReceivedEvent::TMessage::HasException() const { + return DecompressionException != nullptr; +} + +ui64 TReadSessionEvent::TDataReceivedEvent::TMessage::GetOffset() const { + return Information.Offset; +} + +const TString& TReadSessionEvent::TDataReceivedEvent::TMessage::GetProducerId() const { + return Information.ProducerId; +} + +const TString& TReadSessionEvent::TDataReceivedEvent::TMessage::GetMessageGroupId() const { + return Information.MessageGroupId; +} + +ui64 TReadSessionEvent::TDataReceivedEvent::TMessage::GetSeqNo() const { + return Information.SeqNo; +} + +TInstant TReadSessionEvent::TDataReceivedEvent::TMessage::GetCreateTime() const { + return Information.CreateTime; +} + +TInstant TReadSessionEvent::TDataReceivedEvent::TMessage::GetWriteTime() const { + return Information.WriteTime; +} + +const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TMessage::GetMeta() const { + return Information.Meta; +} + +void TReadSessionEvent::TDataReceivedEvent::TMessage::Commit() { + static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get()) + ->Commit(Information.Offset, Information.Offset + 1); +} + +void TReadSessionEvent::TDataReceivedEvent::TMessage::DebugString(TStringBuilder& ret, bool printData) const { + DebugStringImpl(ret, "Message", *this, printData, [this](TStringBuilder& ret) { + DebugStringImpl(this->Information, ret); + }); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage + +TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::TCompressedMessage(ECodec codec, + const TString& data, + const TMessageInformation& information, + TPartitionSession::TPtr partitionSession) + : IMessage(data, partitionSession) + , Codec(codec) + , Information(information) +{} + + +ECodec TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetCodec() const { + return Codec; +} + +ui64 TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetOffset() const { + return Information.Offset; +} + +const TString& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetProducerId() const { + return Information.ProducerId; +} + +const TString& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetMessageGroupId() const { + return Information.MessageGroupId; +} + +ui64 TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetSeqNo() const { + return Information.SeqNo; +} + +TInstant TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetCreateTime() const { + return Information.CreateTime; +} + +TInstant TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetWriteTime() const { + return Information.WriteTime; +} + +const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetMeta() const { + return Information.Meta; +} + +ui64 TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetUncompressedSize() const { + return Information.UncompressedSize; +} + +void TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::Commit() { + static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get()) + ->Commit(Information.Offset, Information.Offset + 1); +} + +void TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::DebugString(TStringBuilder& ret, bool printData) const { + DebugStringImpl( + ret, "CompressedMessage", *this, printData, + [this](TStringBuilder& ret) { DebugStringImpl(this->Information, ret); }, Codec); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NTopic::TReadSessionEvent::TDataReceivedEvent + +TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(TVector<TMessage> messages, + TVector<TCompressedMessage> compressedMessages, + TPartitionSession::TPtr partitionSession) + : Messages(std::move(messages)) + , CompressedMessages(std::move(compressedMessages)) + , PartitionSession(std::move(partitionSession)) +{ + for (size_t i = 0; i < GetMessagesCount(); ++i) { + auto [from, to] = GetMessageOffsetRange(*this, i); + if (OffsetRanges.empty() || OffsetRanges.back().second != from) { + OffsetRanges.emplace_back(from, to); + } else { + OffsetRanges.back().second = to; + } + } +} + +void TReadSessionEvent::TDataReceivedEvent::Commit() { + for (auto [from, to] : OffsetRanges) { + static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get())->Commit(from, to); + } +} + +TString TReadSessionEvent::TDataReceivedEvent::DebugString(bool printData) const { + TStringBuilder ret; + ret << "DataReceived { PartitionSessionId: " << GetPartitionSession()->GetPartitionSessionId() + << " PartitionId: " << GetPartitionSession()->GetPartitionId(); + for (const auto& message : Messages) { + ret << " "; + message.DebugString(ret, printData); + } + for (const auto& message : CompressedMessages) { + ret << " "; + message.DebugString(ret, printData); + } + ret << " }"; + return std::move(ret); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent + +TReadSessionEvent::TCommitOffsetAcknowledgementEvent::TCommitOffsetAcknowledgementEvent(TPartitionSession::TPtr partitionSession, ui64 committedOffset) + : PartitionSession(std::move(partitionSession)) + , CommittedOffset(committedOffset) +{ +} + + +TString TReadSessionEvent::TCommitOffsetAcknowledgementEvent::DebugString() const { + return TStringBuilder() << "CommitAcknowledgement { PartitionSessionId: " << GetPartitionSession()->GetPartitionSessionId() + << " PartitionId: " << GetPartitionSession()->GetPartitionId() + << " CommittedOffset: " << GetCommittedOffset() + << " }"; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NTopic::TReadSessionEvent::TStartPartitionSessionEvent + +TReadSessionEvent::TStartPartitionSessionEvent::TStartPartitionSessionEvent(TPartitionSession::TPtr partitionSession, + ui64 committedOffset, ui64 endOffset) + : PartitionSession(std::move(partitionSession)) + , CommittedOffset(committedOffset) + , EndOffset(endOffset) { +} + +void TReadSessionEvent::TStartPartitionSessionEvent::Confirm(TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset) { + if (PartitionSession) { + static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get()) + ->ConfirmCreate(readOffset, commitOffset); + } +} + +TString TReadSessionEvent::TStartPartitionSessionEvent::DebugString() const { + return TStringBuilder() << "CreatePartitionSession { PartitionSessionId: " + << GetPartitionSession()->GetPartitionSessionId() + << " TopicPath: " << GetPartitionSession()->GetTopicPath() + << " PartitionId: " << GetPartitionSession()->GetPartitionId() + << " CommittedOffset: " << GetCommittedOffset() + << " EndOffset: " << GetEndOffset() << " }"; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NTopic::TReadSessionEvent::TStopPartitionSessionEvent + +TReadSessionEvent::TStopPartitionSessionEvent::TStopPartitionSessionEvent(TPartitionSession::TPtr partitionSession, + bool committedOffset) + : PartitionSession(std::move(partitionSession)) + , CommittedOffset(committedOffset) { +} + +void TReadSessionEvent::TStopPartitionSessionEvent::Confirm() { + if (PartitionSession) { + static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get())->ConfirmDestroy(); + } +} + +TString TReadSessionEvent::TStopPartitionSessionEvent::DebugString() const { + return TStringBuilder() << "DestroyPartitionSession { PartitionSessionId: " + << GetPartitionSession()->GetPartitionSessionId() + << " PartitionId: " << GetPartitionSession()->GetPartitionId() + << " CommittedOffset: " << GetCommittedOffset() << " }"; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NTopic::TReadSessionEvent::TPartitionSessionStatusEvent + +TReadSessionEvent::TPartitionSessionStatusEvent::TPartitionSessionStatusEvent(TPartitionSession::TPtr partitionSession, + ui64 committedOffset, ui64 readOffset, + ui64 endOffset, + TInstant writeTimeHighWatermark) + : PartitionSession(std::move(partitionSession)) + , CommittedOffset(committedOffset) + , ReadOffset(readOffset) + , EndOffset(endOffset) + , WriteTimeHighWatermark(writeTimeHighWatermark) { +} + +TString TReadSessionEvent::TPartitionSessionStatusEvent::DebugString() const { + return TStringBuilder() << "PartitionSessionStatus { PartitionSessionId: " + << GetPartitionSession()->GetPartitionSessionId() + << " PartitionId: " << GetPartitionSession()->GetPartitionId() + << " CommittedOffset: " << GetCommittedOffset() << " ReadOffset: " << GetReadOffset() + << " EndOffset: " << GetEndOffset() + << " WriteWatermark: " << GetWriteTimeHighWatermark() << " }"; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NTopic::TReadSessionEvent::TPartitionSessionClosedEvent + +TReadSessionEvent::TPartitionSessionClosedEvent::TPartitionSessionClosedEvent(TPartitionSession::TPtr partitionSession, EReason reason) + : PartitionSession(std::move(partitionSession)) + , Reason(reason) +{ +} + +TString TReadSessionEvent::TPartitionSessionClosedEvent::DebugString() const { + return TStringBuilder() << "PartitionSessionClosed { PartitionSessionId: " + << GetPartitionSession()->GetPartitionSessionId() + << " PartitionId: " << GetPartitionSession()->GetPartitionId() + << " Reason: " << GetReason() << " }"; +} + +TString TSessionClosedEvent::DebugString() const { + return + TStringBuilder() << "SessionClosed { Status: " << GetStatus() + << " Issues: \"" << NPersQueue::IssuesSingleLineString(GetIssues()) + << "\" }"; +} + +TString DebugString(const TReadSessionEvent::TEvent& event) { + return std::visit([](const auto& ev) { return ev.DebugString(); }, event); +} + +} // namespace NYdb::NPersQueue diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp index f7bc2fac56..b18aab2530 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp @@ -1,124 +1,29 @@ #include "topic_impl.h" + #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h> -namespace NYdb::NTopic { - -class TDummyReadSession: public IReadSession, public std::enable_shared_from_this<TDummyReadSession> { -public: - TDummyReadSession() = default; - TDummyReadSession(const TReadSessionSettings& settings) { - (void)settings; - } - NThreading::TFuture<void> WaitEvent() override { - Y_VERIFY(false); - - NThreading::TPromise<void> promise = NThreading::NewPromise<void>(); - return promise.GetFuture(); - } - TVector<TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) override { - Y_VERIFY(false); - - (void)block; - (void)maxEventsCount; - (void)maxByteSize; - return {}; - } - TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override { - Y_VERIFY(false); - - (void)block; - (void)maxByteSize; - return {}; - } - - bool Close(TDuration timeout) override { - Y_VERIFY(false); - - return !(bool)timeout; - } - - TString GetSessionId() const override { - Y_VERIFY(false); +#include "read_session.h" - return "dummy_session_id"; - } - - TReaderCounters::TPtr GetCounters() const override { - Y_VERIFY(false); - - return nullptr; - } -}; - -class TReadSession : public IReadSession, - public NPersQueue::IUserRetrievedEventCallback, - public std::enable_shared_from_this<TReadSession> { -public: - TReadSession(const TReadSessionSettings& settings, - std::shared_ptr<TTopicClient::TImpl> client, - std::shared_ptr<TGRpcConnectionsImpl> connections, - TDbDriverStatePtr dbDriverState); - - ~TReadSession(); - - void Start(); - - NThreading::TFuture<void> WaitEvent() override; - TVector<TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) override; - TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override; - - bool Close(TDuration timeout) override; - - TString GetSessionId() const override { - return SessionId; - } - - TReaderCounters::TPtr GetCounters() const override { - return Settings.Counters_; // Always not nullptr. - } - - void Abort(TSessionClosedEvent&& closeEvent); - - void WaitAllDecompressionTasks(); - void ClearAllEvents(); - - - -private: - TReadSessionSettings Settings; - const TString SessionId; - const TInstant StartSessionTime = TInstant::Now(); - TLog Log; - std::shared_ptr<TTopicClient::TImpl> Client; - std::shared_ptr<TGRpcConnectionsImpl> Connections; - NPersQueue::IErrorHandler::TPtr ErrorHandler; - TDbDriverStatePtr DbDriverState; - TAdaptiveLock Lock; - std::shared_ptr<NPersQueue::TReadSessionEventsQueue> EventsQueue; - - NPersQueue::TSingleClusterReadSessionImpl::TPtr Session; - TVector<TTopicReadSettings> Topics; - -}; +namespace NYdb::NTopic { std::shared_ptr<IReadSession> TTopicClient::TImpl::CreateReadSession(const TReadSessionSettings& settings) { - // TMaybe<TReadSessionSettings> maybeSettings; - // if (!settings.DecompressionExecutor_ || !settings.EventHandlers_.HandlersExecutor_) { - // maybeSettings = settings; - // with_lock (Lock) { - // if (!settings.DecompressionExecutor_) { - // maybeSettings->DecompressionExecutor(Settings.DefaultCompressionExecutor_); - // } - // if (!settings.EventHandlers_.HandlersExecutor_) { - // maybeSettings->EventHandlers_.HandlersExecutor(Settings.DefaultHandlersExecutor_); - // } - // } - // } - // auto session = std::make_shared<TReadSession>(maybeSettings.GetOrElse(settings), shared_from_this(), Connections_, DbDriverState_); - // session->Start(); - // return std::move(session); - return std::make_shared<TDummyReadSession>(settings); + TMaybe<TReadSessionSettings> maybeSettings; + if (!settings.DecompressionExecutor_ || !settings.EventHandlers_.HandlersExecutor_) { + maybeSettings = settings; + with_lock (Lock) { + if (!settings.DecompressionExecutor_) { + maybeSettings->DecompressionExecutor(Settings.DefaultCompressionExecutor_); + } + if (!settings.EventHandlers_.HandlersExecutor_) { + maybeSettings->EventHandlers_.HandlersExecutor(Settings.DefaultHandlersExecutor_); + } + } + } + auto session = std::make_shared<TReadSession>(maybeSettings.GetOrElse(settings), shared_from_this(), Connections_, DbDriverState_); + session->Start(); + return std::move(session); + // return std::make_shared<TDummyReadSession>(settings); } std::shared_ptr<TTopicClient::TImpl::IReadSessionConnectionProcessorFactory> TTopicClient::TImpl::CreateReadSessionConnectionProcessorFactory() { diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index ace3bafbf7..09e18fda6a 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -471,7 +471,7 @@ public: //! Unique identifier of partition session. //! It is unique within one read session. - ui64 GetPartitionSessionId() const { + i64 GetPartitionSessionId() const { return PartitionSessionId; } @@ -481,14 +481,14 @@ public: } //! Partition id. - ui64 GetPartitionId() const { + i64 GetPartitionId() const { return PartitionId; } protected: - ui64 PartitionSessionId; + i64 PartitionSessionId; TString TopicPath; - ui64 PartitionId; + i64 PartitionId; }; //! Events for read session. @@ -497,8 +497,14 @@ struct TReadSessionEvent { //! Contains batch of messages from single partition session. struct TDataReceivedEvent { struct TMessageInformation { - TMessageInformation(ui64 offset, TString producerId, ui64 seqNo, TInstant createTime, TInstant writeTime, - TWriteSessionMeta::TPtr meta, ui64 uncompressedSize, TString messageGroupId); + TMessageInformation(ui64 offset, + TString producerId, + ui64 seqNo, + TInstant createTime, + TInstant writeTime, + TWriteSessionMeta::TPtr meta, + ui64 uncompressedSize, + TString messageGroupId); ui64 Offset; TString ProducerId; ui64 SeqNo; @@ -575,37 +581,38 @@ struct TReadSessionEvent { }; struct TCompressedMessage: public IMessage { + TCompressedMessage(ECodec codec, const TString& data, const TMessageInformation& information, + TPartitionSession::TPtr partitionSession); + + virtual ~TCompressedMessage() { + } + //! Message codec ECodec GetCodec() const; //! Message offset. - ui64 GetOffset(ui64 index) const; + ui64 GetOffset() const; //! Producer id - const TString& GetProducerId(ui64 index) const; + const TString& GetProducerId() const; //! Message group id. - const TString& GetMessageGroupId(ui64 index) const; + const TString& GetMessageGroupId() const; //! Sequence number. - ui64 GetSeqNo(ui64 index) const; + ui64 GetSeqNo() const; //! Message creation timestamp. - TInstant GetCreateTime(ui64 index) const; + TInstant GetCreateTime() const; //! Message write timestamp. - TInstant GetWriteTime(ui64 index) const; + TInstant GetWriteTime() const; //! Metainfo. - const TWriteSessionMeta::TPtr& GetMeta(ui64 index) const; + const TWriteSessionMeta::TPtr& GetMeta() const; //! Uncompressed size. - ui64 GetUncompressedSize(ui64 index) const; - - virtual ~TCompressedMessage() { - } - TCompressedMessage(ECodec codec, const TString& data, const TVector<TMessageInformation>& information, - TPartitionSession::TPtr partitionSession); + ui64 GetUncompressedSize() const; //! Commits all offsets in compressed message. void Commit() override; @@ -615,9 +622,13 @@ struct TReadSessionEvent { private: ECodec Codec; - TVector<TMessageInformation> Information; + TMessageInformation Information; }; + public: + TDataReceivedEvent(TVector<TMessage> messages, TVector<TCompressedMessage> compressedMessages, + TPartitionSession::TPtr partitionSession); + //! Partition session. const TPartitionSession::TPtr& GetPartitionSession() const { return PartitionSession; @@ -658,9 +669,6 @@ struct TReadSessionEvent { TString DebugString(bool printData = false) const; - TDataReceivedEvent(TVector<TMessage> messages, TVector<TCompressedMessage> compressedMessages, - TPartitionSession::TPtr partitionSession); - private: void CheckMessagesFilled(bool compressed) const { Y_VERIFY(!Messages.empty() || !CompressedMessages.empty()); @@ -739,6 +747,8 @@ struct TReadSessionEvent { //! Server can destroy partition session gracefully //! for rebalancing among all topic clients. struct TStopPartitionSessionEvent { + TStopPartitionSessionEvent(TPartitionSession::TPtr partitionSession, bool committedOffset); + const TPartitionSession::TPtr& GetPartitionSession() const { return PartitionSession; } @@ -754,8 +764,6 @@ struct TReadSessionEvent { TString DebugString() const; - TStopPartitionSessionEvent(TPartitionSession::TPtr partitionSession, bool committedOffset); - private: TPartitionSession::TPtr PartitionSession; ui64 CommittedOffset; @@ -812,6 +820,9 @@ struct TReadSessionEvent { ConnectionLost, }; + public: + TPartitionSessionClosedEvent(TPartitionSession::TPtr partitionSession, EReason reason); + const TPartitionSession::TPtr& GetPartitionSession() const { return PartitionSession; } @@ -822,15 +833,17 @@ struct TReadSessionEvent { TString DebugString() const; - TPartitionSessionClosedEvent(TPartitionSession::TPtr partitionSession, EReason reason); - private: TPartitionSession::TPtr PartitionSession; EReason Reason; }; - using TEvent = std::variant<TDataReceivedEvent, TCommitOffsetAcknowledgementEvent, TStartPartitionSessionEvent, - TStopPartitionSessionEvent, TPartitionSessionStatusEvent, TPartitionSessionClosedEvent, + using TEvent = std::variant<TDataReceivedEvent, + TCommitOffsetAcknowledgementEvent, + TStartPartitionSessionEvent, + TStopPartitionSessionEvent, + TPartitionSessionStatusEvent, + TPartitionSessionClosedEvent, TSessionClosedEvent>; }; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp new file mode 100644 index 0000000000..990fc5abb6 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp @@ -0,0 +1,86 @@ +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h> + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h> + +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/testing/unittest/tests_data.h> +#include <library/cpp/threading/future/future.h> +#include <library/cpp/threading/future/async.h> + +namespace NYdb::NTopic::NTests { + +Y_UNIT_TEST_SUITE(BasicUsage) { + + Y_UNIT_TEST(WriteAndReadSomeMessagesWithSyncCompression) { + + auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); + + NPersQueue::TWriteSessionSettings writeSettings; + writeSettings.Path(setup->GetTestTopic()).MessageGroupId("src_id"); + NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor(); + writeSettings.CompressionExecutor(executor); + + ui64 count = 100u; + TMaybe<bool> shouldCaptureData = {true}; + + auto& client = setup->GetPersQueueClient(); + auto session = client.CreateSimpleBlockingWriteSession(writeSettings); + TString messageBase = "message-"; + TVector<TString> sentMessages; + + for (auto i = 0u; i < count; i++) { + sentMessages.emplace_back(messageBase * (i+1) + ToString(i)); + auto res = session->Write(sentMessages.back()); + UNIT_ASSERT(res); + } + { + auto sessionAdapter = NPersQueue::NTests::TSimpleWriteSessionTestAdapter( + dynamic_cast<NPersQueue::TSimpleBlockingWriteSession *>(session.get())); + if (shouldCaptureData.Defined()) { + TStringBuilder msg; + msg << "Session has captured " << sessionAdapter.GetAcquiredMessagesCount() + << " messages, capturing was expected: " << *shouldCaptureData << Endl; + UNIT_ASSERT_VALUES_EQUAL_C(sessionAdapter.GetAcquiredMessagesCount() > 0, *shouldCaptureData, msg.c_str()); + } + } + session->Close(); + + std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession; + + // Create topic client. + NYdb::NTopic::TTopicClient topicClient(setup->GetDriver()); + + // Create read session. + NYdb::NTopic::TReadSessionSettings readSettings; + readSettings + .ConsumerName(setup->GetTestClient()) + .AppendTopics(setup->GetTestTopic()); + + Cerr << "Session was created" << Endl; + + NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>(); + auto totalReceived = 0u; + readSettings.EventHandlers_.SimpleDataHandlers([&](NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) { + auto& messages = ev.GetMessages(); + for (size_t i = 0u; i < messages.size(); ++i) { + auto& message = messages[i]; + UNIT_ASSERT_VALUES_EQUAL(message.GetData(), sentMessages[totalReceived]); + totalReceived++; + } + if (totalReceived == sentMessages.size()) + checkedPromise.SetValue(); + }); + + ReadSession = topicClient.CreateReadSession(readSettings); + + checkedPromise.GetFuture().GetValueSync(); + ReadSession->Close(TDuration::Zero()); + } +} + +} diff --git a/ydb/public/sdk/cpp/examples/topic_reader/eventloop/CMakeLists.darwin.txt b/ydb/public/sdk/cpp/examples/topic_reader/eventloop/CMakeLists.darwin.txt new file mode 100644 index 0000000000..900ee09290 --- /dev/null +++ b/ydb/public/sdk/cpp/examples/topic_reader/eventloop/CMakeLists.darwin.txt @@ -0,0 +1,29 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(persqueue_reader_eventloop) +target_link_libraries(persqueue_reader_eventloop PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-client-ydb_topic + library-cpp-getopt +) +target_link_options(persqueue_reader_eventloop PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(persqueue_reader_eventloop PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/examples/topic_reader/eventloop/main.cpp +) +vcs_info(persqueue_reader_eventloop) diff --git a/ydb/public/sdk/cpp/examples/topic_reader/eventloop/CMakeLists.linux.txt b/ydb/public/sdk/cpp/examples/topic_reader/eventloop/CMakeLists.linux.txt new file mode 100644 index 0000000000..04eb961985 --- /dev/null +++ b/ydb/public/sdk/cpp/examples/topic_reader/eventloop/CMakeLists.linux.txt @@ -0,0 +1,33 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(persqueue_reader_eventloop) +target_link_libraries(persqueue_reader_eventloop PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-client-ydb_topic + library-cpp-getopt +) +target_link_options(persqueue_reader_eventloop PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(persqueue_reader_eventloop PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/examples/topic_reader/eventloop/main.cpp +) +vcs_info(persqueue_reader_eventloop) diff --git a/ydb/public/sdk/cpp/examples/topic_reader/eventloop/CMakeLists.txt b/ydb/public/sdk/cpp/examples/topic_reader/eventloop/CMakeLists.txt new file mode 100644 index 0000000000..fc7b1ee73c --- /dev/null +++ b/ydb/public/sdk/cpp/examples/topic_reader/eventloop/CMakeLists.txt @@ -0,0 +1,13 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (APPLE) + include(CMakeLists.darwin.txt) +elseif (UNIX AND NOT APPLE) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/public/sdk/cpp/examples/topic_reader/eventloop/main.cpp b/ydb/public/sdk/cpp/examples/topic_reader/eventloop/main.cpp new file mode 100644 index 0000000000..3b801149ff --- /dev/null +++ b/ydb/public/sdk/cpp/examples/topic_reader/eventloop/main.cpp @@ -0,0 +1,114 @@ +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +#include <library/cpp/getopt/last_getopt.h> + +#include <util/stream/output.h> +#include <util/system/env.h> + +struct TOptions { + TString Endpoint; + TString Database; + TString TopicPath; + TString ConsumerName; + bool CommitAfterProcessing = false; + bool DisableClusterDiscovery = false; + bool UseSecureConnection = false; + + TOptions(int argc, const char* argv[]) { + NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); + opts.AddHelpOption('h'); + opts.AddLongOption('e', "endpoint", "YDB endpoint").Required().RequiredArgument("HOST:PORT") + .StoreResult(&Endpoint); + opts.AddLongOption('d', "database", "YDB database name").DefaultValue("/Root").RequiredArgument("PATH") + .StoreResult(&Database); + opts.AddLongOption('t', "topic-path", "Topic path for reading").Required().RequiredArgument("PATH") + .StoreResult(&TopicPath); + opts.AddLongOption('c', "consumer-name", "Consumer name").Required().RequiredArgument("CONSUMER") + .StoreResult(&ConsumerName); + opts.AddLongOption("commit-after-processing", "Commit data after processing") + .SetFlag(&CommitAfterProcessing).NoArgument(); + opts.AddLongOption("secure-connection", "Use secure connection") + .SetFlag(&UseSecureConnection).NoArgument(); + opts.SetFreeArgsNum(0); + + NLastGetopt::TOptsParseResult res(&opts, argc, argv); + } +}; + +std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession; + +void StopHandler(int) { + Cerr << "Stopping session" << Endl; + if (ReadSession) { + ReadSession->Close(TDuration::Seconds(3)); + } else { + exit(1); + } +} + +int main(int argc, const char* argv[]) { + signal(SIGINT, &StopHandler); + signal(SIGTERM, &StopHandler); + + TOptions opts(argc, argv); + + // Create driver instance. + auto driverConfig = NYdb::TDriverConfig() + .SetNetworkThreadsNum(2) + .SetEndpoint(opts.Endpoint) + .SetDatabase(opts.Database) + .SetAuthToken(GetEnv("YDB_TOKEN")) + .SetLog(CreateLogBackend("cerr")); + + if (opts.UseSecureConnection) { + driverConfig.UseSecureConnection(); + } + + NYdb::TDriver driver(driverConfig); + + // Create topic client. + NYdb::NTopic::TTopicClient topicClient(driver); + + // Create read session. + NYdb::NTopic::TReadSessionSettings settings; + settings + .ConsumerName(opts.ConsumerName) + .AppendTopics(opts.TopicPath); + + ReadSession = topicClient.CreateReadSession(settings); + + Cerr << "Session was created" << Endl; + + // [BEGIN read session process events] + // Event loop + while (true) { + auto future = ReadSession->WaitEvent(); + // Wait for next event or ten seconds + future.Wait(TDuration::Seconds(10)); + // future.Subscribe([](){ + // Cerr << ...; + // }); + // Get event + TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent(true/*block - will block if no event received yet*/); + Cerr << "Got new read session event: " << DebugString(*event) << Endl; + + if (auto* dataEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) { + for (const auto& message : dataEvent->GetMessages()) { + Cerr << "Data message: \"" << message.GetData() << "\"" << Endl; + } + + if (opts.CommitAfterProcessing) { + dataEvent->Commit(); + } + } else if (auto* startPartitionSessionEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) { + startPartitionSessionEvent->Confirm(); + } else if (auto* stopPartitionSessionEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) { + stopPartitionSessionEvent->Confirm(); + } else if (auto* closeSessionEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) { + break; + } + } + // [END read session process events] + // Stop the driver. + driver.Stop(); +} diff --git a/ydb/public/sdk/cpp/examples/topic_reader/simple/CMakeLists.darwin.txt b/ydb/public/sdk/cpp/examples/topic_reader/simple/CMakeLists.darwin.txt new file mode 100644 index 0000000000..9e544c49c2 --- /dev/null +++ b/ydb/public/sdk/cpp/examples/topic_reader/simple/CMakeLists.darwin.txt @@ -0,0 +1,29 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(simple_persqueue_reader) +target_link_libraries(simple_persqueue_reader PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-client-ydb_topic + library-cpp-getopt +) +target_link_options(simple_persqueue_reader PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(simple_persqueue_reader PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/examples/topic_reader/simple/main.cpp +) +vcs_info(simple_persqueue_reader) diff --git a/ydb/public/sdk/cpp/examples/topic_reader/simple/CMakeLists.linux.txt b/ydb/public/sdk/cpp/examples/topic_reader/simple/CMakeLists.linux.txt new file mode 100644 index 0000000000..eef542bff2 --- /dev/null +++ b/ydb/public/sdk/cpp/examples/topic_reader/simple/CMakeLists.linux.txt @@ -0,0 +1,33 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(simple_persqueue_reader) +target_link_libraries(simple_persqueue_reader PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-client-ydb_topic + library-cpp-getopt +) +target_link_options(simple_persqueue_reader PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(simple_persqueue_reader PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/examples/topic_reader/simple/main.cpp +) +vcs_info(simple_persqueue_reader) diff --git a/ydb/public/sdk/cpp/examples/topic_reader/simple/CMakeLists.txt b/ydb/public/sdk/cpp/examples/topic_reader/simple/CMakeLists.txt new file mode 100644 index 0000000000..fc7b1ee73c --- /dev/null +++ b/ydb/public/sdk/cpp/examples/topic_reader/simple/CMakeLists.txt @@ -0,0 +1,13 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (APPLE) + include(CMakeLists.darwin.txt) +elseif (UNIX AND NOT APPLE) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/public/sdk/cpp/examples/topic_reader/simple/main.cpp b/ydb/public/sdk/cpp/examples/topic_reader/simple/main.cpp new file mode 100644 index 0000000000..4e0d07d3c6 --- /dev/null +++ b/ydb/public/sdk/cpp/examples/topic_reader/simple/main.cpp @@ -0,0 +1,99 @@ +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +#include <library/cpp/getopt/last_getopt.h> + +#include <util/stream/output.h> +#include <util/system/env.h> + +struct TOptions { + TString Endpoint; + TString Database; + TString TopicPath; + TString ConsumerName; + bool CommitAfterProcessing = false; + bool DisableClusterDiscovery = false; + bool UseSecureConnection = false; + + TOptions(int argc, const char* argv[]) { + NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); + opts.AddHelpOption('h'); + opts.AddLongOption('e', "endpoint", "YDB endpoint").Required().RequiredArgument("HOST:PORT") + .StoreResult(&Endpoint); + opts.AddLongOption('d', "database", "YDB database name").DefaultValue("/Root").RequiredArgument("PATH") + .StoreResult(&Database); + opts.AddLongOption('t', "topic-path", "Topic path for reading").Required().RequiredArgument("PATH") + .StoreResult(&TopicPath); + opts.AddLongOption('c', "consumer-name", "Consumer name").Required().RequiredArgument("CONSUMER") + .StoreResult(&ConsumerName); + opts.AddLongOption("commit-after-processing", "Commit data after processing") + .SetFlag(&CommitAfterProcessing).NoArgument(); + opts.AddLongOption("secure-connection", "Use secure connection") + .SetFlag(&UseSecureConnection).NoArgument(); + opts.SetFreeArgsNum(0); + + NLastGetopt::TOptsParseResult res(&opts, argc, argv); + } +}; + +std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession; + +void StopHandler(int) { + Cerr << "Stopping session" << Endl; + if (ReadSession) { + ReadSession->Close(TDuration::Seconds(3)); + } else { + exit(1); + } +} + +int main(int argc, const char* argv[]) { + signal(SIGINT, &StopHandler); + signal(SIGTERM, &StopHandler); + + TOptions opts(argc, argv); + + // Create driver instance. + auto driverConfig = NYdb::TDriverConfig() + .SetNetworkThreadsNum(2) + .SetEndpoint(opts.Endpoint) + .SetDatabase(opts.Database) + .SetAuthToken(GetEnv("YDB_TOKEN")) + .SetLog(CreateLogBackend("cerr")); + + if (opts.UseSecureConnection) { + driverConfig.UseSecureConnection(); + } + + NYdb::TDriver driver(driverConfig); + + // Create topic client. + NYdb::NTopic::TTopicClient topicClient(driver); + + // [BEGIN Create read session] + // Create read session. + NYdb::NTopic::TReadSessionSettings settings; + settings + .ConsumerName(opts.ConsumerName) + .AppendTopics(opts.TopicPath); + + // auto handlers = TEventHandlers::SimpleDataHandlers(); + + // settings.SetEventHandlers(handlers); + + // settings.SetSimpleDataHandlers( + settings.EventHandlers_.SimpleDataHandlers( + [](NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& event) { + Cerr << "Data event " << DebugString(event); + }, opts.CommitAfterProcessing); + + ReadSession = topicClient.CreateReadSession(settings); + // [END Create read session] + + Cerr << "Session was created" << Endl; + + // Wait SessionClosed event. + ReadSession->GetEvent(/*block = */true); + + // Stop the driver. + driver.Stop(); +} |