diff options
author | abcdef <akotov@ydb.tech> | 2022-09-29 09:39:03 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2022-09-29 09:39:03 +0300 |
commit | cb023f74d94535ebb12817c4c1be92c26370bcf7 (patch) | |
tree | 8d35e92afd51c37e4303009031298d9f84c72a2a | |
parent | 06f2527c743ff97dfaea75157accbb574bc2edc4 (diff) | |
download | ydb-cb023f74d94535ebb12817c4c1be92c26370bcf7.tar.gz |
10 files changed, 708 insertions, 305 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h index 69b5c1aa7a4..fcfd457fc5f 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h @@ -334,7 +334,6 @@ protected: template <class TEventType, class TFunc> void PushSpecificHandler(TEventInfo&& eventInfo, const TFunc& f) { Post(Settings.EventHandlers_.HandlersExecutor_, [func = f, event = std::move(eventInfo)]() mutable { - event.OnUserRetrievedEvent(); func(std::get<TEventType>(event.GetEvent())); }); } @@ -342,7 +341,6 @@ protected: template <class TFunc> void PushCommonHandler(TEventInfo&& eventInfo, const TFunc& f) { Post(Settings.EventHandlers_.HandlersExecutor_, [func = f, event = std::move(eventInfo)]() mutable { - event.OnUserRetrievedEvent(); func(event.GetEvent()); }); } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp index 857ea12b177..1cfc8466307 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp @@ -56,7 +56,6 @@ TReadSession::TReadSession(const TReadSessionSettings& settings, TReadSession::~TReadSession() { Abort(EStatus::ABORTED, "Aborted"); - WaitAllDecompressionTasks(); ClearAllEvents(); } @@ -450,14 +449,6 @@ void TReadSession::Abort(TSessionClosedEvent&& closeEvent) { } } -void TReadSession::WaitAllDecompressionTasks() { - for (auto& [cluster, sessionInfo] : ClusterSessions) { - if (sessionInfo.Session) { - sessionInfo.Session->WaitAllDecompressionTasks(); - } - } -} - void TReadSession::ClearAllEvents() { EventsQueue->ClearAllEvents(); } @@ -504,20 +495,13 @@ void TReadSession::ResumeReadingData() { } } -static ELogPriority GetEventLogPriority(const TReadSessionEvent::TEvent& event) { - if (std::holds_alternative<TReadSessionEvent::TCreatePartitionStreamEvent>(event) - || std::holds_alternative<TReadSessionEvent::TDestroyPartitionStreamEvent>(event) - || std::holds_alternative<TReadSessionEvent::TPartitionStreamClosedEvent>(event) - || std::holds_alternative<TSessionClosedEvent>(event)) - { // Control event. - return TLOG_INFO; - } else { - return TLOG_DEBUG; - } -} - -void TReadSession::OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) { - Log.Write(GetEventLogPriority(event), GetLogPrefix() << "Read session event " << DebugString(event)); +void TReadSession::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) { + Log.Write(TLOG_DEBUG, GetLogPrefix() + << "The application data is transferred to the client. Number of messages " + << messagesCount + << ", size " + << decompressedSize + << " bytes"); } void TReadSession::MakeCountersIfNeeded() { diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index 8425e2ed937..6228462c687 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -78,7 +78,6 @@ using TAReadSessionSettings = std::conditional_t<UseMigrationProtocol, NYdb::NPersQueue::TReadSessionSettings, NYdb::NTopic::TReadSessionSettings>; - template <bool UseMigrationProtocol> class TPartitionStreamImpl; @@ -93,6 +92,12 @@ class TReadSessionEventsQueue; class TReadSession; +template <bool UseMigrationProtocol> +class TDataDecompressionInfo; + +template <bool UseMigrationProtocol> +using TDataDecompressionInfoPtr = typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr; + template <bool UseMigrationProtocol> struct IErrorHandler : public TThrRefBase { @@ -132,6 +137,7 @@ public: void DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TPlainStatus&& status); void DeferStartSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session); void DeferSignalWaiter(TWaiter&& waiter); + void DeferDestroyDecompressionInfos(std::vector<TDataDecompressionInfoPtr<UseMigrationProtocol>>&& infos); private: @@ -166,6 +172,8 @@ private: // Session to start std::vector<std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>>> Sessions; + + std::vector<TDataDecompressionInfoPtr<UseMigrationProtocol>> DecompressionInfos; }; template <bool UseMigrationProtocol> @@ -181,20 +189,20 @@ public: bool doDecompress, i64 serverBytesSize = 0 // to increment read request bytes size ); + ~TDataDecompressionInfo(); i64 StartDecompressionTasks(const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, i64 availableMemory, - double averageCompressionRatio, - const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>& partitionStream, TDeferredActions<UseMigrationProtocol>& deferred); + void PlanDecompressionTasks(double averageCompressionRatio, + TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream); bool IsReady() const { return SourceDataNotProcessed == 0; } bool AllDecompressionTasksStarted() const { - Y_VERIFY(ServerMessage.batches_size() > 0); - return CurrentDecompressingMessage.first >= static_cast<size_t>(ServerMessage.batches_size()); + return Tasks.empty(); } i64 GetCompressedDataSize() const { @@ -236,12 +244,6 @@ public: return BatchesMeta[batchIndex]; } - // Takes data. Returns true if event has more unpacked data. - bool TakeData(const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>& partitionStream, - TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages, - TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages, - size_t* maxByteSize); - bool HasMoreData() const { return CurrentReadingMessage.first < static_cast<size_t>(GetServerMessage().batches_size()); } @@ -251,6 +253,9 @@ public: void PutDecompressionError(std::exception_ptr error, size_t batch, size_t message); std::exception_ptr GetDecompressionError(size_t batch, size_t message); + void OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount, i64 serverBytesSize = 0); + void OnUserRetrievedEvent(i64 decompressedDataSize, size_t messagesCount); + private: // Special struct for marking (batch/message) as ready. struct TReadyMessageThreshold { @@ -274,6 +279,10 @@ private: return Messages.size(); } + i64 GetEstimatedDecompressedSize() const { + return EstimatedDecompressedSize; + } + private: TDataDecompressionInfo::TPtr Parent; TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> PartitionStream; @@ -295,7 +304,6 @@ private: std::vector<typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr> BatchesMeta; std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session; bool DoDecompress; - i64 CompressedDataSize = 0; i64 ServerBytesSize = 0; std::atomic<i64> SourceDataNotProcessed = 0; std::pair<size_t, size_t> CurrentDecompressingMessage = {0, 0}; // (Batch, Message) @@ -307,12 +315,42 @@ private: std::atomic<bool> DecompressionErrorsStructCreated = false; TAdaptiveLock DecompressionErrorsStructLock; std::vector<std::vector<std::exception_ptr>> DecompressionErrors; + + std::atomic<i64> MessagesInflight = 0; + std::atomic<i64> CompressedDataSize = 0; + std::atomic<i64> DecompressedDataSize = 0; + + std::deque<TDecompressionTask> Tasks; +}; + +template <bool UseMigrationProtocol> +struct TUserRetrievedEventInfoAccumulator { + void OnTakeData(TDataDecompressionInfoPtr<UseMigrationProtocol> info, i64 decompressedSize) { + Y_VERIFY(info); + + auto& counter = Counters[info]; + + counter.DecompressedSize += decompressedSize; + ++counter.MessagesCount; + } + void OnUserRetrievedEvent() { + for (auto& [info, counter] : Counters) { + info->OnUserRetrievedEvent(counter.DecompressedSize, counter.MessagesCount); + } + } + + struct TCounter { + i64 DecompressedSize = 0; + size_t MessagesCount = 0; + }; + + TMap<TDataDecompressionInfoPtr<UseMigrationProtocol>, TCounter> Counters; }; template <bool UseMigrationProtocol> class TDataDecompressionEvent { public: - TDataDecompressionEvent(size_t batch, size_t message, typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent, std::atomic<bool> &ready) : + TDataDecompressionEvent(size_t batch, size_t message, TDataDecompressionInfoPtr<UseMigrationProtocol> parent, std::atomic<bool>& ready) : Batch{batch}, Message{message}, Parent{std::move(parent)}, @@ -324,23 +362,28 @@ public: return Ready; } - bool TakeData(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, + void TakeData(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages, TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages, - size_t* maxByteSize) const; + size_t* maxByteSize, + TUserRetrievedEventInfoAccumulator<UseMigrationProtocol>* accumulator) const; + + TDataDecompressionInfoPtr<UseMigrationProtocol> GetParent() const { + return Parent; + } private: size_t Batch; size_t Message; - typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr Parent; - std::atomic<bool> &Ready; + TDataDecompressionInfoPtr<UseMigrationProtocol> Parent; + std::atomic<bool>& Ready; }; template <bool UseMigrationProtocol> struct IUserRetrievedEventCallback { virtual ~IUserRetrievedEventCallback() = default; - virtual void OnUserRetrievedEvent(const typename TAReadSessionEvent<UseMigrationProtocol>::TEvent& event) = 0; + virtual void OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) = 0; }; template <bool UseMigrationProtocol> @@ -381,8 +424,6 @@ struct TReadSessionEventInfo { return *Event; } - void OnUserRetrievedEvent(); - bool IsSessionClosedEvent() const { return Event && std::holds_alternative<TASessionClosedEvent<UseMigrationProtocol>>(*Event); } @@ -400,7 +441,7 @@ struct TRawPartitionStreamEvent { TRawPartitionStreamEvent(size_t batch, size_t message, - typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent, + TDataDecompressionInfoPtr<UseMigrationProtocol> parent, std::atomic<bool> &ready) : Event(std::in_place_type_t<TDataDecompressionEvent<UseMigrationProtocol>>(), batch, @@ -484,7 +525,7 @@ public: void SignalReadyEvents(TPartitionStreamImpl<UseMigrationProtocol>& stream, TReadSessionEventsQueue<UseMigrationProtocol>& queue, TDeferredActions<UseMigrationProtocol>& deferred); - void DeleteNotReadyTail(); + void DeleteNotReadyTail(TDeferredActions<UseMigrationProtocol>& deferred); private: std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> Ready; @@ -585,7 +626,7 @@ public: void InsertDataEvent(size_t batch, size_t message, - typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent, + TDataDecompressionInfoPtr<UseMigrationProtocol> parent, std::atomic<bool> &ready) { ++DataDecompressionEventsCount; @@ -678,7 +719,7 @@ public: return true; } - void DeleteNotReadyTail(); + void DeleteNotReadyTail(TDeferredActions<UseMigrationProtocol>& deferred); private: const TKey Key; @@ -711,9 +752,12 @@ public: explicit TReadSessionEventsQueue(const TAReadSessionSettings<UseMigrationProtocol>& settings, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session); typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent - GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, size_t* maxByteSize); // Assumes that we're under lock. + GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, + size_t* maxByteSize, + TUserRetrievedEventInfoAccumulator<UseMigrationProtocol>* accumulator); // Assumes that we're under lock. - TReadSessionEventInfo<UseMigrationProtocol> GetEventImpl(size_t* maxByteSize) { // Assumes that we're under lock. + TReadSessionEventInfo<UseMigrationProtocol> GetEventImpl(size_t* maxByteSize, + TUserRetrievedEventInfoAccumulator<UseMigrationProtocol>* accumulator = nullptr) { // Assumes that we're under lock. Y_ASSERT(TParent::HasEventsImpl()); if (!TParent::Events.empty()) { @@ -727,7 +771,7 @@ public: TMaybe<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> event; if (partitionStream->TopEvent().IsDataEvent()) { - event = GetDataEventImpl(partitionStream, maxByteSize); + event = GetDataEventImpl(partitionStream, maxByteSize, accumulator); } else { event = std::move(partitionStream->TopEvent().GetEvent()); partitionStream->PopEvent(); @@ -748,6 +792,8 @@ public: TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing(), size_t maxByteSize = std::numeric_limits<size_t>::max()) { TVector<TReadSessionEventInfo<UseMigrationProtocol>> eventInfos; const size_t maxCount = maxEventsCount ? *maxEventsCount : std::numeric_limits<size_t>::max(); + TUserRetrievedEventInfoAccumulator<UseMigrationProtocol> accumulator; + TDeferredActions<UseMigrationProtocol> deferred; with_lock (TParent::Mutex) { eventInfos.reserve(Min(TParent::Events.size() + TParent::CloseEvent.Defined(), maxCount)); @@ -759,7 +805,7 @@ public: ApplyCallbacksToReadyEventsImpl(deferred); while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) { - TReadSessionEventInfo<UseMigrationProtocol> event = GetEventImpl(&maxByteSize); + TReadSessionEventInfo<UseMigrationProtocol> event = GetEventImpl(&maxByteSize, &accumulator); eventInfos.emplace_back(std::move(event)); if (eventInfos.back().IsSessionClosedEvent()) { break; @@ -772,14 +818,18 @@ public: TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> result; result.reserve(eventInfos.size()); for (TReadSessionEventInfo<UseMigrationProtocol>& eventInfo : eventInfos) { - eventInfo.OnUserRetrievedEvent(); result.emplace_back(std::move(eventInfo.GetEvent())); } + + accumulator.OnUserRetrievedEvent(); + return result; } TMaybe<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> GetEvent(bool block = false, size_t maxByteSize = std::numeric_limits<size_t>::max()) { TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> eventInfo; + TUserRetrievedEventInfoAccumulator<UseMigrationProtocol> accumulator; + TDeferredActions<UseMigrationProtocol> deferred; with_lock (TParent::Mutex) { do { @@ -790,19 +840,21 @@ public: const bool appliedCallbacks = ApplyCallbacksToReadyEventsImpl(deferred); if (TParent::HasEventsImpl()) { - eventInfo = GetEventImpl(&maxByteSize); + eventInfo = GetEventImpl(&maxByteSize, &accumulator); } else if (!appliedCallbacks) { return Nothing(); } } while (block && !eventInfo); ApplyCallbacksToReadyEventsImpl(deferred); } + if (eventInfo) { - eventInfo->OnUserRetrievedEvent(); + accumulator.OnUserRetrievedEvent(); + return std::move(eventInfo->Event); - } else { - return Nothing(); } + + return Nothing(); } void Close(const TASessionClosedEvent<UseMigrationProtocol>& event, TDeferredActions<UseMigrationProtocol>& deferred) { @@ -832,7 +884,7 @@ public: void PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, size_t batch, size_t message, - typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent, + TDataDecompressionInfoPtr<UseMigrationProtocol> parent, std::atomic<bool> &ready); void SignalEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, @@ -943,6 +995,7 @@ public: using TPtr = std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>>; using IProcessor = typename IReadSessionConnectionProcessorFactory<UseMigrationProtocol>::IProcessor; + friend class TPartitionStreamImpl<UseMigrationProtocol>; TSingleClusterReadSessionImpl( @@ -981,13 +1034,15 @@ public: void Commit(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, ui64 startOffset, ui64 endOffset); void OnCreateNewDecompressionTask(); + void OnDecompressionInfoDestroy(i64 compressedSize, i64 decompressedSize, i64 messagesCount); + void OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount, i64 serverBytesSize = 0); TReadSessionEventsQueue<UseMigrationProtocol>* GetEventsQueue() { return EventsQueue.get(); } - void OnUserRetrievedEvent(const typename TAReadSessionEvent<UseMigrationProtocol>::TEvent& event) override; + void OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) override; void Abort(); void Close(std::function<void()> callback); @@ -997,8 +1052,6 @@ public: void StopReadingData(); void ResumeReadingData(); - void WaitAllDecompressionTasks(); - void DumpStatisticsToLog(TLogElement& log); void UpdateMemoryUsageStatistics(); @@ -1140,14 +1193,14 @@ private: }; struct TDecompressionQueueItem { - TDecompressionQueueItem(typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr batchInfo, + TDecompressionQueueItem(TDataDecompressionInfoPtr<UseMigrationProtocol> batchInfo, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream) : BatchInfo(std::move(batchInfo)) , PartitionStream(std::move(partitionStream)) { } - typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr BatchInfo; + TDataDecompressionInfoPtr<UseMigrationProtocol> BatchInfo; TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> PartitionStream; }; @@ -1261,7 +1314,6 @@ public: void Abort(TSessionClosedEvent&& closeEvent); - void WaitAllDecompressionTasks(); void ClearAllEvents(); private: @@ -1287,7 +1339,7 @@ private: void AbortImpl(EStatus statusCode, NYql::TIssues&& issues, TDeferredActions<true>& deferred); void AbortImpl(EStatus statusCode, const TString& message, TDeferredActions<true>& deferred); - void OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) override; + void OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) override; void MakeCountersIfNeeded(); void DumpCountersToLog(size_t timeNumber = 0); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index 01a699a4129..afa8aa982a5 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -133,9 +133,9 @@ void TPartitionStreamImpl<UseMigrationProtocol>::SignalReadyEvents(TReadSessionE } template<bool UseMigrationProtocol> -void TPartitionStreamImpl<UseMigrationProtocol>::DeleteNotReadyTail() +void TPartitionStreamImpl<UseMigrationProtocol>::DeleteNotReadyTail(TDeferredActions<UseMigrationProtocol>& deferred) { - EventsQueue.DeleteNotReadyTail(); + EventsQueue.DeleteNotReadyTail(deferred); } template<bool UseMigrationProtocol> @@ -154,19 +154,26 @@ void TRawPartitionStreamEventQueue<UseMigrationProtocol>::SignalReadyEvents(TPar } template<bool UseMigrationProtocol> -void TRawPartitionStreamEventQueue<UseMigrationProtocol>::DeleteNotReadyTail() +void TRawPartitionStreamEventQueue<UseMigrationProtocol>::DeleteNotReadyTail(TDeferredActions<UseMigrationProtocol>& deferred) { - std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> head; + std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> ready; - for (auto& event : NotReady) { - if (!event.IsReady()) { - break; - } + auto i = NotReady.begin(); + for (; (i != NotReady.end()) && i->IsReady(); ++i) { + ready.push_back(std::move(*i)); + } + + std::vector<TDataDecompressionInfoPtr<UseMigrationProtocol>> infos; - head.push_back(std::move(event)); + for (; i != NotReady.end(); ++i) { + if (i->IsDataEvent()) { + infos.push_back(i->GetDataEvent().GetParent()); + } } - swap(head, NotReady); + deferred.DeferDestroyDecompressionInfos(std::move(infos)); + + swap(ready, NotReady); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -668,24 +675,26 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::RequestPartitionStream } template<bool UseMigrationProtocol> -void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnUserRetrievedEvent(const typename TAReadSessionEvent<UseMigrationProtocol>::TEvent& event) { - Log.Write(TLOG_DEBUG, GetLogPrefix() << "Read session event " << DebugString(event)); - const i64 bytesCount = static_cast<i64>(CalcDataSize<TAReadSessionEvent<UseMigrationProtocol>>(event)); - Y_ASSERT(bytesCount >= 0); - - if (!std::get_if<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent>(&event)) { // Event is not data event. - return; - } +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) +{ + Log.Write(TLOG_DEBUG, GetLogPrefix() + << "The application data is transferred to the client. Number of messages " + << messagesCount + << ", size " + << decompressedSize + << " bytes"); - *Settings.Counters_->MessagesInflight -= std::get<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent>(event).GetMessagesCount(); - *Settings.Counters_->BytesInflightTotal -= bytesCount; - *Settings.Counters_->BytesInflightUncompressed -= bytesCount; + *Settings.Counters_->MessagesInflight -= messagesCount; + *Settings.Counters_->BytesInflightTotal -= decompressedSize; + *Settings.Counters_->BytesInflightUncompressed -= decompressedSize; TDeferredActions<UseMigrationProtocol> deferred; with_lock (Lock) { UpdateMemoryUsageStatisticsImpl(); - Y_VERIFY(bytesCount <= DecompressedDataSize); - DecompressedDataSize -= bytesCount; + + Y_VERIFY(decompressedSize <= DecompressedDataSize); + DecompressedDataSize -= decompressedSize; + ContinueReadingDataImpl(); StartDecompressionTasksImpl(deferred); } @@ -694,6 +703,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnUserRetrievedEvent(c template <bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::WriteToProcessorImpl( TClientMessage<UseMigrationProtocol>&& req) { // Assumes that we're under lock. + if (Processor) { Processor->Write(std::move(req)); } @@ -913,10 +923,11 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( Settings.Decompress_); Y_VERIFY(decompressionInfo); - if (decompressionInfo) { - DecompressionQueue.emplace_back(decompressionInfo, partitionStream); - StartDecompressionTasksImpl(deferred); - } + decompressionInfo->PlanDecompressionTasks(AverageCompressionRatio, + partitionStream); + + DecompressionQueue.emplace_back(decompressionInfo, partitionStream); + StartDecompressionTasksImpl(deferred); } WaitingReadResponse = false; @@ -1054,7 +1065,6 @@ template <> inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( Ydb::Topic::StreamReadMessage::ReadResponse&& msg, TDeferredActions<false>& deferred) { // Assumes that we're under lock. - if (Closing || Aborting) { return; // Don't process new data. } @@ -1119,10 +1129,10 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( serverBytesSize = 0; Y_VERIFY(decompressionInfo); - if (decompressionInfo) { - DecompressionQueue.emplace_back(decompressionInfo, partitionStream); - StartDecompressionTasksImpl(deferred); - } + decompressionInfo->PlanDecompressionTasks(AverageCompressionRatio, + partitionStream); + DecompressionQueue.emplace_back(decompressionInfo, partitionStream); + StartDecompressionTasksImpl(deferred); } WaitingReadResponse = false; @@ -1246,8 +1256,6 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::StartDecompressionTask TDecompressionQueueItem& current = DecompressionQueue.front(); auto sentToDecompress = current.BatchInfo->StartDecompressionTasks(Settings.DecompressionExecutor_, Max(limit - DecompressedDataSize, static_cast<i64>(1)), - AverageCompressionRatio, - current.PartitionStream, deferred); DecompressedDataSize += sentToDecompress; if (current.BatchInfo->AllDecompressionTasksStarted()) { @@ -1281,8 +1289,22 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnCreateNewDecompressi } template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDecompressionInfoDestroy(i64 compressedSize, i64 decompressedSize, i64 messagesCount) +{ + *Settings.Counters_->MessagesInflight -= messagesCount; + *Settings.Counters_->BytesInflightUncompressed -= decompressedSize; + *Settings.Counters_->BytesInflightCompressed -= compressedSize; + *Settings.Counters_->BytesInflightTotal -= (compressedSize + decompressedSize); + + CompressedDataSize -= compressedSize; + DecompressedDataSize -= decompressedSize; +} + +template<bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount, i64 serverBytesSize) { TDeferredActions<UseMigrationProtocol> deferred; + + Y_VERIFY(DecompressionTasksInflight > 0); --DecompressionTasksInflight; *Settings.Counters_->BytesRead += decompressedSize; @@ -1397,14 +1419,6 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ResumeReadingData() { } template<bool UseMigrationProtocol> -void TSingleClusterReadSessionImpl<UseMigrationProtocol>::WaitAllDecompressionTasks() { - Y_ASSERT(DecompressionTasksInflight >= 0); - while (DecompressionTasksInflight > 0) { - Sleep(TDuration::MilliSeconds(5)); // Perform active wait because this is aborting process and there are no decompression tasks here in normal situation. - } -} - -template<bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::DumpStatisticsToLog(TLogElement& log) { with_lock (Lock) { // cluster:topic:partition:stream-id:read-offset:committed-offset @@ -1571,13 +1585,6 @@ bool TReadSessionEventInfo<UseMigrationProtocol>::IsDataEvent() const { return !IsEmpty() && PartitionStream->TopEvent().IsDataEvent(); } -template<bool UseMigrationProtocol> -void TReadSessionEventInfo<UseMigrationProtocol>::OnUserRetrievedEvent() { - if (auto session = Session.lock()) { - session->OnUserRetrievedEvent(*Event); - } -} - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TReadSessionEventsQueue @@ -1613,7 +1620,7 @@ TReadSessionEventsQueue<UseMigrationProtocol>::TReadSessionEventsQueue( template <bool UseMigrationProtocol> void TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> /*session*/, - typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event, + typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event, TDeferredActions<UseMigrationProtocol>& deferred) { if (TParent::Closed) { @@ -1628,7 +1635,7 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TIntrusivePtr<TPar >; if (std::holds_alternative<TClosedEvent>(event)) { - stream->DeleteNotReadyTail(); + stream->DeleteNotReadyTail(deferred); } stream->InsertEvent(std::move(event)); @@ -1670,14 +1677,14 @@ template <bool UseMigrationProtocol> void TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, size_t batch, size_t message, - typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent, + TDataDecompressionInfoPtr<UseMigrationProtocol> parent, std::atomic<bool>& ready) { if (this->Closed) { return; } - with_lock (this->Mutex) { + with_lock (TParent::Mutex) { partitionStream->InsertDataEvent(batch, message, parent, ready); } } @@ -1685,8 +1692,8 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent(TIntrusivePtr< template <bool UseMigrationProtocol> typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventImpl( TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, - size_t* maxByteSize) { // Assumes that we're under lock. - + size_t* maxByteSize, + TUserRetrievedEventInfoAccumulator<UseMigrationProtocol>* accumulator) { // Assumes that we're under lock. TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage> messages; TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage> compressedMessages; @@ -1698,7 +1705,7 @@ typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent TReadSessi Y_VERIFY(event.EventsCount > 0); for (; (event.EventsCount > 0) && (*maxByteSize > 0); --event.EventsCount) { - stream->TopEvent().GetDataEvent().TakeData(stream, &messages, &compressedMessages, maxByteSize); + stream->TopEvent().GetDataEvent().TakeData(stream, &messages, &compressedMessages, maxByteSize, accumulator); stream->PopEvent(); } @@ -1716,7 +1723,7 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::SignalReadyEvents( TPartitionStreamImpl<UseMigrationProtocol>* partitionStream) { Y_ASSERT(partitionStream); TDeferredActions<UseMigrationProtocol> deferred; - with_lock (TReadSessionEventsQueue<UseMigrationProtocol>::Mutex) { + with_lock (TParent::Mutex) { SignalReadyEventsImpl(partitionStream, deferred); } } @@ -1850,17 +1857,32 @@ TDataDecompressionInfo<UseMigrationProtocol>::TDataDecompressionInfo( , DoDecompress(doDecompress) , ServerBytesSize(serverBytesSize) { + i64 compressedSize = 0; + i64 messagesCount = 0; + for (const auto& batch : ServerMessage.batches()) { for (const auto& messageData : batch.message_data()) { - CompressedDataSize += messageData.data().size(); + compressedSize += messageData.data().size(); + ++messagesCount; } } - SourceDataNotProcessed = CompressedDataSize; + + MessagesInflight = messagesCount; + SourceDataNotProcessed = compressedSize; + CompressedDataSize = compressedSize; BuildBatchesMeta(); } template<bool UseMigrationProtocol> +TDataDecompressionInfo<UseMigrationProtocol>::~TDataDecompressionInfo() +{ + if (auto session = Session.lock()) { + session->OnDecompressionInfoDestroy(CompressedDataSize, DecompressedDataSize, MessagesInflight); + } +} + +template<bool UseMigrationProtocol> void TDataDecompressionInfo<UseMigrationProtocol>::BuildBatchesMeta() { BatchesMeta.reserve(ServerMessage.batches_size()); for (const auto& batch : ServerMessage.batches()) { @@ -1914,169 +1936,89 @@ std::exception_ptr TDataDecompressionInfo<UseMigrationProtocol>::GetDecompressio template <bool UseMigrationProtocol> i64 TDataDecompressionInfo<UseMigrationProtocol>::StartDecompressionTasks( const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, i64 availableMemory, - double averageCompressionRatio, const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>& partitionStream, - TDeferredActions<UseMigrationProtocol>& deferred) { + TDeferredActions<UseMigrationProtocol>& deferred) +{ + auto session = Session.lock(); + Y_ASSERT(session); + i64 used = 0; + + while (availableMemory > 0 && !Tasks.empty()) { + auto& task = Tasks.front(); + + used += task.GetEstimatedDecompressedSize(); + availableMemory -= task.GetEstimatedDecompressedSize(); + + session->OnCreateNewDecompressionTask(); + + deferred.DeferStartExecutorTask(executor, std::move(task)); + Tasks.pop_front(); + } + + return used; +} + +template<bool UseMigrationProtocol> +void TDataDecompressionInfo<UseMigrationProtocol>::PlanDecompressionTasks(double averageCompressionRatio, + TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream) +{ constexpr size_t TASK_LIMIT = 512_KB; - std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Session.lock(); + + auto session = Session.lock(); Y_ASSERT(session); + ReadyThresholds.emplace_back(); + TDecompressionTask task(TDataDecompressionInfo::shared_from_this(), partitionStream, &ReadyThresholds.back()); - i64 used = 0; - while (availableMemory > 0 && !AllDecompressionTasksStarted()) { + + while (CurrentDecompressingMessage.first < static_cast<size_t>(ServerMessage.batches_size())) { const auto& batch = ServerMessage.batches(CurrentDecompressingMessage.first); + if (CurrentDecompressingMessage.second < static_cast<size_t>(batch.message_data_size())) { const auto& messageData = batch.message_data(CurrentDecompressingMessage.second); const i64 size = static_cast<i64>(messageData.data().size()); const i64 estimatedDecompressedSize = messageData.uncompressed_size() ? static_cast<i64>(messageData.uncompressed_size()) : static_cast<i64>(size * averageCompressionRatio); - Y_VERIFY(estimatedDecompressedSize >= 0); task.Add(CurrentDecompressingMessage.first, CurrentDecompressingMessage.second, size, estimatedDecompressedSize); + session->GetEventsQueue()->PushDataEvent(partitionStream, CurrentDecompressingMessage.first, CurrentDecompressingMessage.second, TDataDecompressionInfo::shared_from_this(), ReadyThresholds.back().Ready); - - used += estimatedDecompressedSize; - availableMemory -= estimatedDecompressedSize; } + ++CurrentDecompressingMessage.second; + if (CurrentDecompressingMessage.second >= static_cast<size_t>(batch.message_data_size())) { // next batch ++CurrentDecompressingMessage.first; CurrentDecompressingMessage.second = 0; } + if (task.AddedDataSize() >= TASK_LIMIT) { - session->OnCreateNewDecompressionTask(); - deferred.DeferStartExecutorTask(executor, std::move(task)); + Tasks.push_back(std::move(task)); + ReadyThresholds.emplace_back(); task = TDecompressionTask(TDataDecompressionInfo::shared_from_this(), partitionStream, &ReadyThresholds.back()); } } + if (task.AddedMessagesCount() > 0) { - session->OnCreateNewDecompressionTask(); - deferred.DeferStartExecutorTask(executor, std::move(task)); + Tasks.push_back(std::move(task)); } else { ReadyThresholds.pop_back(); // Revert. } - return used; } template<bool UseMigrationProtocol> -bool TDataDecompressionInfo<UseMigrationProtocol>::TakeData(const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>& partitionStream, - TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages, - TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages, - size_t* maxByteSize) -{ - TMaybe<std::pair<size_t, size_t>> readyThreshold = GetReadyThreshold(); - Y_ASSERT(readyThreshold); - auto& msg = GetServerMessage(); - i64 minOffset = Max<i64>(); - i64 maxOffset = 0; - const auto prevReadingMessage = CurrentReadingMessage; - while (HasMoreData() && *maxByteSize > 0 && CurrentReadingMessage <= *readyThreshold) { - auto& batch = *msg.mutable_batches(CurrentReadingMessage.first); - if (CurrentReadingMessage.second < static_cast<size_t>(batch.message_data_size())) { - const auto& meta = GetBatchMeta(CurrentReadingMessage.first); - const TInstant batchWriteTimestamp = [&batch](){ - if constexpr (UseMigrationProtocol) { - return TInstant::MilliSeconds(batch.write_timestamp_ms()); - } else { - return TInstant::MilliSeconds( - ::google::protobuf::util::TimeUtil::TimestampToMilliseconds(batch.written_at())); - } - }(); - auto& messageData = *batch.mutable_message_data(CurrentReadingMessage.second); - minOffset = Min(minOffset, static_cast<i64>(messageData.offset())); - maxOffset = Max(maxOffset, static_cast<i64>(messageData.offset())); - - if constexpr (UseMigrationProtocol) { - TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo( - messageData.offset(), - batch.source_id(), - messageData.seq_no(), - TInstant::MilliSeconds(messageData.create_timestamp_ms()), - batchWriteTimestamp, - batch.ip(), - meta, - messageData.uncompressed_size() - ); - if (DoDecompress) { - messages->emplace_back( - messageData.data(), - GetDecompressionError(CurrentReadingMessage.first, CurrentReadingMessage.second), - messageInfo, - partitionStream, - messageData.partition_key(), - messageData.explicit_hash() - ); - } else { - compressedMessages->emplace_back( - static_cast<ECodec>(messageData.codec()), - messageData.data(), - TVector<TReadSessionEvent::TDataReceivedEvent::TMessageInformation>{messageInfo}, - partitionStream, - messageData.partition_key(), - messageData.explicit_hash() - ); - } - } else { - NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo( - messageData.offset(), - batch.producer_id(), - messageData.seq_no(), - TInstant::MilliSeconds(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(messageData.created_at())), - batchWriteTimestamp, - meta, - messageData.uncompressed_size(), - messageData.message_group_id() - ); - if (DoDecompress) { - messages->emplace_back( - messageData.data(), - GetDecompressionError(CurrentReadingMessage.first, CurrentReadingMessage.second), - messageInfo, - partitionStream - ); - } else { - compressedMessages->emplace_back( - static_cast<NTopic::ECodec>(batch.codec()), - messageData.data(), - messageInfo, - partitionStream - ); - } - } - - *maxByteSize -= Min(*maxByteSize, messageData.data().size()); - - // Clear data to free internal session's memory. - messageData.clear_data(); - } - - ++CurrentReadingMessage.second; - if (CurrentReadingMessage.second >= static_cast<size_t>(batch.message_data_size())) { - CurrentReadingMessage.second = 0; - do { - ++CurrentReadingMessage.first; - } while (CurrentReadingMessage.first < static_cast<size_t>(msg.batches_size()) && msg.batches(CurrentReadingMessage.first).message_data_size() == 0); - } - } - partitionStream->GetLog().Write(TLOG_DEBUG, TStringBuilder() << "Take Data. Partition " << partitionStream->GetPartitionId() - << ". Read: {" << prevReadingMessage.first << ", " << prevReadingMessage.second << "} -> {" - << CurrentReadingMessage.first << ", " << CurrentReadingMessage.second << "} (" - << minOffset << "-" << maxOffset << ")"); - return CurrentReadingMessage <= *readyThreshold; -} - -template<bool UseMigrationProtocol> -bool TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, +void TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages, TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages, - size_t* maxByteSize) const + size_t* maxByteSize, + TUserRetrievedEventInfoAccumulator<UseMigrationProtocol>* accumulator) const { auto& msg = Parent->GetServerMessage(); i64 minOffset = Max<i64>(); @@ -2146,6 +2088,10 @@ bool TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPart *maxByteSize -= Min(*maxByteSize, messageData.data().size()); + if (accumulator) { + accumulator->OnTakeData(Parent, messageData.data().size()); + } + // Clear data to free internal session's memory. messageData.clear_data(); @@ -2153,8 +2099,6 @@ bool TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPart << "Take Data. Partition " << partitionStream->GetPartitionId() << ". Read: {" << Batch << ", " << Message << "} (" << minOffset << "-" << maxOffset << ")"); - - return false; } template<bool UseMigrationProtocol> @@ -2166,6 +2110,28 @@ bool TDataDecompressionInfo<UseMigrationProtocol>::HasReadyUnreadData() const { return CurrentReadingMessage <= *threshold; } +template<bool UseMigrationProtocol> +void TDataDecompressionInfo<UseMigrationProtocol>::OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount, i64 serverBytesSize) +{ + CompressedDataSize -= sourceSize; + DecompressedDataSize += decompressedSize; + + if (auto session = Session.lock()) { + session->OnDataDecompressed(sourceSize, estimatedDecompressedSize, decompressedSize, messagesCount, serverBytesSize); + } +} + +template<bool UseMigrationProtocol> +void TDataDecompressionInfo<UseMigrationProtocol>::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) +{ + MessagesInflight -= messagesCount; + DecompressedDataSize -= decompressedSize; + + if (auto session = Session.lock()) { + session->OnUserRetrievedEvent(decompressedSize, messagesCount); + } +} + template <bool UseMigrationProtocol> void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::Add(size_t batch, size_t message, size_t sourceDataSize, @@ -2253,9 +2219,7 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator( Y_ASSERT(dataProcessed == SourceDataSize); std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Parent->Session.lock(); - if (session) { - session->OnDataDecompressed(SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed, Parent->ServerBytesSize); - } + Parent->OnDataDecompressed(SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed, Parent->ServerBytesSize); Parent->SourceDataNotProcessed -= dataProcessed; Ready->Ready = true; @@ -2327,6 +2291,12 @@ void TDeferredActions<UseMigrationProtocol>::DeferSignalWaiter(TWaiter&& waiter) } template<bool UseMigrationProtocol> +void TDeferredActions<UseMigrationProtocol>::DeferDestroyDecompressionInfos(std::vector<TDataDecompressionInfoPtr<UseMigrationProtocol>>&& infos) +{ + DecompressionInfos = std::move(infos); +} + +template<bool UseMigrationProtocol> void TDeferredActions<UseMigrationProtocol>::DoActions() { Read(); StartExecutorTasks(); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp index 0437f64720f..4caf3ee778c 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp @@ -1444,9 +1444,9 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { const size_t messagesInServerBatchCount = 10; // Many messages in order to fit in two 512 KB-tasks packs. const size_t messageSize = 1000000; const size_t batchLimit = std::numeric_limits<size_t>::max(); - const size_t batches = messagesInServerBatchCount; - const size_t messagesInBatch = 1; - const size_t expectedDecompressionTasksCount = messagesInServerBatchCount; + const size_t batches = 1; + const size_t messagesInBatch = 10; + const size_t expectedDecompressionTasksCount = 1; const size_t reorderedCycleSize = 1; const size_t memoryLimit = 10; PacksBatchesImpl(serverBatchesCount, messagesInServerBatchCount, messageSize, batchLimit, batches, messagesInBatch, expectedDecompressionTasksCount, reorderedCycleSize, memoryLimit); @@ -1485,16 +1485,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { UNIT_ASSERT_VALUES_EQUAL(events.size(), 1); UNIT_ASSERT_EVENT_TYPE(events[0], TReadSessionEvent::TDataReceivedEvent); TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get<TReadSessionEvent::TDataReceivedEvent>(events[0]); - UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages().size(), 28); - UNIT_ASSERT_EQUAL(dataEvent.GetPartitionStream(), stream1); - } - - { - TVector<TReadSessionEvent::TEvent> events = setup.EventsQueue->GetEvents(true); - UNIT_ASSERT_VALUES_EQUAL(events.size(), 1); - UNIT_ASSERT_EVENT_TYPE(events[0], TReadSessionEvent::TDataReceivedEvent); - TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get<TReadSessionEvent::TDataReceivedEvent>(events[0]); - UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages().size(), 22); + UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages().size(), 50); UNIT_ASSERT_EQUAL(dataEvent.GetPartitionStream(), stream1); } @@ -1503,16 +1494,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { UNIT_ASSERT_VALUES_EQUAL(events.size(), 1); UNIT_ASSERT_EVENT_TYPE(events[0], TReadSessionEvent::TDataReceivedEvent); TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get<TReadSessionEvent::TDataReceivedEvent>(events[0]); - UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages().size(), 27); - UNIT_ASSERT_EQUAL(dataEvent.GetPartitionStream(), stream2); - } - - { - TVector<TReadSessionEvent::TEvent> events = setup.EventsQueue->GetEvents(true); - UNIT_ASSERT_VALUES_EQUAL(events.size(), 1); - UNIT_ASSERT_EVENT_TYPE(events[0], TReadSessionEvent::TDataReceivedEvent); - TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get<TReadSessionEvent::TDataReceivedEvent>(events[0]); - UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages().size(), 23); + UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages().size(), 50); UNIT_ASSERT_EQUAL(dataEvent.GetPartitionStream(), stream2); } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp index f01af91a29c..8e7d930effa 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp @@ -33,7 +33,6 @@ TReadSession::TReadSession(const TReadSessionSettings& settings, TReadSession::~TReadSession() { Abort(EStatus::ABORTED, "Aborted"); - WaitAllDecompressionTasks(); ClearAllEvents(); } @@ -190,12 +189,6 @@ bool TReadSession::Close(TDuration timeout) { return result; } -void TReadSession::WaitAllDecompressionTasks() { - if (Session) { - Session->WaitAllDecompressionTasks(); - } -} - void TReadSession::ClearAllEvents() { EventsQueue->ClearAllEvents(); } @@ -204,20 +197,13 @@ TStringBuilder TReadSession::GetLogPrefix() const { return TStringBuilder() << GetDatabaseLogPrefix(DbDriverState->Database) << "[" << SessionId << "] "; } -static ELogPriority GetEventLogPriority(const TReadSessionEvent::TEvent& event) { - if (std::holds_alternative<TReadSessionEvent::TStartPartitionSessionEvent>(event) - || std::holds_alternative<TReadSessionEvent::TStopPartitionSessionEvent>(event) - || std::holds_alternative<TReadSessionEvent::TPartitionSessionClosedEvent>(event) - || std::holds_alternative<TSessionClosedEvent>(event)) - { // Control event. - return TLOG_INFO; - } else { - return TLOG_DEBUG; - } -} - -void TReadSession::OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) { - Log.Write(GetEventLogPriority(event), GetLogPrefix() << "Read session event " << DebugString(event)); +void TReadSession::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) { + Log.Write(TLOG_DEBUG, GetLogPrefix() + << "The application data is transferred to the client. Number of messages " + << messagesCount + << ", size " + << decompressedSize + << " bytes"); } void TReadSession::MakeCountersIfNeeded() { diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h index bd7329f2760..1ac885691be 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h @@ -86,7 +86,6 @@ public: void Abort(TSessionClosedEvent&& closeEvent); - void WaitAllDecompressionTasks(); void ClearAllEvents(); private: @@ -97,7 +96,7 @@ private: void CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false>& deferred); - void OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) override; + void OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) override; void MakeCountersIfNeeded(); void DumpCountersToLog(size_t timeNumber = 0); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 64f58ded093..e0c5dc6564c 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -2125,7 +2125,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Y_UNIT_TEST(CheckKillBalancer) { NPersQueue::TTestServer server; server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::PQ_READ_PROXY}); - PrepareForGrpc(server); + server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 2); auto driver = server.AnnoyingClient->GetDriver(); auto decompressor = CreateThreadPoolExecutorWrapper(2); @@ -2135,6 +2135,20 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { settings.DecompressionExecutor(decompressor); auto reader = CreateReader(*driver, settings); + auto counters = reader->GetCounters(); + + auto DumpCounters = [&](const char *message) { + Cerr << "===== " << message << " =====" << Endl; + Cerr << "MessagesInflight: " << counters->MessagesInflight->Val() << Endl; + Cerr << "BytesInflightUncompressed: " << counters->BytesInflightUncompressed->Val() << Endl; + Cerr << "BytesInflightCompressed: " << counters->BytesInflightCompressed->Val() << Endl; + Cerr << "BytesInflightTotal: " << counters->BytesInflightTotal->Val() << Endl; + Cerr << "============" << Endl; + }; + + DumpCounters("CreateReader"); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0); for (ui32 i = 0; i < 2; ++i) { auto msg = reader->GetEvent(true, 1); @@ -2158,7 +2172,9 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT(res); } + DumpCounters("Write"); + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 10); ui32 createEv = 0, destroyEv = 0, dataEv = 0; std::vector<ui32> gotDestroy{0, 0}; @@ -2196,37 +2212,440 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { decompressor->StartFuncs({0, 1, 2, 3, 4}); + DumpCounters("StartFuncs-1"); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 10); + for (ui32 i = 0; i < 5; ++i) { doRead(); } UNIT_ASSERT_VALUES_EQUAL(dataEv, 5); - server.AnnoyingClient->RestartBalancerTablet(server.CleverServer->GetRuntime(), "rt3.dc1--topic1"); - Cerr << "Balancer killed\n"; + DumpCounters("doRead(5)"); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 5); + + Cerr << ">>>> Restart balancer" << Endl; + server.AnnoyingClient->RestartBalancerTablet(server.CleverServer->GetRuntime(), DEFAULT_TOPIC_NAME); + Cerr << ">>>> Balancer restarted" << Endl; Sleep(TDuration::Seconds(5)); + decompressor->StartFuncs({5, 6, 7, 8, 9}); + + DumpCounters("StartFuncs-2"); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 5); + for (ui32 i = 0; i < 4; ++i) { doRead(); } UNIT_ASSERT_VALUES_EQUAL(createEv, 2); UNIT_ASSERT_VALUES_EQUAL(destroyEv, 2); - UNIT_ASSERT_VALUES_EQUAL(dataEv, 5); - decompressor->StartFuncs({5, 6, 7, 8, 9}); + DumpCounters("doRead(4)"); Sleep(TDuration::Seconds(5)); auto msg = reader->GetEvent(false, 1); - UNIT_ASSERT(!msg); UNIT_ASSERT(!reader->WaitEvent().Wait(TDuration::Seconds(1))); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightUncompressed->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), 0); + + DumpCounters("End"); + } + + Y_UNIT_TEST(CheckDeleteTopic) { + NPersQueue::TTestServer server; + server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::PQ_READ_PROXY}); + server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 2); + + auto driver = server.AnnoyingClient->GetDriver(); + auto decompressor = CreateThreadPoolExecutorWrapper(2); + + NYdb::NPersQueue::TReadSessionSettings settings; + settings.ConsumerName("shared/user").AppendTopics(SHORT_TOPIC_NAME).ReadOriginal({"dc1"}); + settings.DecompressionExecutor(decompressor); + auto reader = CreateReader(*driver, settings); + + auto counters = reader->GetCounters(); + + auto DumpCounters = [&](const char *message) { + Cerr << "===== " << message << " =====" << Endl; + Cerr << "MessagesInflight: " << counters->MessagesInflight->Val() << Endl; + Cerr << "BytesInflightUncompressed: " << counters->BytesInflightUncompressed->Val() << Endl; + Cerr << "BytesInflightCompressed: " << counters->BytesInflightCompressed->Val() << Endl; + Cerr << "BytesInflightTotal: " << counters->BytesInflightTotal->Val() << Endl; + Cerr << "============" << Endl; + }; + + DumpCounters("CreateReader"); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0); + + for (ui32 i = 0; i < 2; ++i) { + auto msg = reader->GetEvent(true, 1); + UNIT_ASSERT(msg); + + Cerr << NYdb::NPersQueue::DebugString(*msg) << "\n"; + + auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*msg); + + UNIT_ASSERT(ev); + + ev->Confirm(); + } + + + for (ui32 i = 0; i < 10; ++i) { + auto writer = CreateSimpleWriter(*driver, SHORT_TOPIC_NAME, TStringBuilder() << "source" << i); + bool res = writer->Write("valuevaluevalue", 1); + UNIT_ASSERT(res); + res = writer->Close(TDuration::Seconds(10)); + UNIT_ASSERT(res); + } + + DumpCounters("Write"); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 10); + + ui32 createEv = 0, destroyEv = 0, dataEv = 0; + std::vector<ui32> gotDestroy{0, 0}; + + auto doRead = [&]() { + auto msg = reader->GetEvent(true, 1); + UNIT_ASSERT(msg); + + Cerr << "Got message: " << NYdb::NPersQueue::DebugString(*msg) << "\n"; + + + if (std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&*msg)) { + ++dataEv; + return; + } + + auto ev1 = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent>(&*msg); + auto ev2 = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*msg); + + UNIT_ASSERT(ev1 || ev2); + + if (ev1) { + ++destroyEv; + UNIT_ASSERT(ev1->GetPartitionStream()->GetPartitionId() < 2); + gotDestroy[ev1->GetPartitionStream()->GetPartitionId()]++; + } + if (ev2) { + ev2->Confirm(ev2->GetEndOffset()); + ++createEv; + UNIT_ASSERT(ev2->GetPartitionStream()->GetPartitionId() < 2); + UNIT_ASSERT_VALUES_EQUAL(gotDestroy[ev2->GetPartitionStream()->GetPartitionId()], 1); + + } + }; + + decompressor->StartFuncs({0, 1, 2, 3, 4}); + + DumpCounters("StartFuncs-1"); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 10); + + for (ui32 i = 0; i < 5; ++i) { + doRead(); + } + + UNIT_ASSERT_VALUES_EQUAL(dataEv, 5); + + DumpCounters("doRead(5)"); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 5); + + Cerr << ">>>> Delete topic" << Endl; + server.AnnoyingClient->DeleteTopic2(DEFAULT_TOPIC_NAME); + Cerr << ">>>> Topic deleted" << Endl; + + Sleep(TDuration::Seconds(5)); + + doRead(); + doRead(); + + // + // there should be 2 TPartitionStreamClosedEvent events in the queue + // + UNIT_ASSERT_VALUES_EQUAL(createEv, 0); + UNIT_ASSERT_VALUES_EQUAL(destroyEv, 2); + UNIT_ASSERT_VALUES_EQUAL(dataEv, 5); + + decompressor->StartFuncs({5, 6, 7, 8, 9}); + + DumpCounters("StartFuncs-2"); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 5); + + Sleep(TDuration::Seconds(5)); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightUncompressed->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), 0); + + DumpCounters("End"); + } + + enum WhenTheTopicIsDeletedMode { + AFTER_WRITES, + AFTER_START_TASKS, + AFTER_DOREAD + }; + + void WhenTheTopicIsDeletedImpl(WhenTheTopicIsDeletedMode mode, i64 maxMemoryUsageSize, bool decompress, i64 decompressedSize, i64 compressedSize) { + NPersQueue::TTestServer server; + server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::PQ_READ_PROXY}); + server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1); + + auto driver = server.AnnoyingClient->GetDriver(); + auto decompressor = CreateThreadPoolExecutorWrapper(2); + + NYdb::NPersQueue::TReadSessionSettings settings; + settings.ConsumerName("shared/user").AppendTopics(SHORT_TOPIC_NAME).ReadOriginal({"dc1"}); + settings.DecompressionExecutor(decompressor); + settings.MaxMemoryUsageBytes(maxMemoryUsageSize); + settings.Decompress(decompress); + + auto reader = CreateReader(*driver, settings); + auto counters = reader->GetCounters(); + + auto DumpCounters = [&](const char *message) { + Cerr << "===== " << message << " =====" << Endl; + Cerr << "MessagesInflight: " << counters->MessagesInflight->Val() << Endl; + Cerr << "BytesInflightUncompressed: " << counters->BytesInflightUncompressed->Val() << Endl; + Cerr << "BytesInflightCompressed: " << counters->BytesInflightCompressed->Val() << Endl; + Cerr << "BytesInflightTotal: " << counters->BytesInflightTotal->Val() << Endl; + Cerr << "============" << Endl; + }; + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0); + + // + // there should be 1 TCreatePartitionStreamEvent events in the queue + // + { + auto msg = reader->GetEvent(true, 1); + UNIT_ASSERT(msg); + + Cerr << ">>>> message: " << NYdb::NPersQueue::DebugString(*msg) << Endl; + + auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*msg); + UNIT_ASSERT(ev); + + ev->Confirm(); + } + + for (ui32 i = 0; i < 2; ++i) { + std::optional<TString> codec; + if (!decompress) { + codec = "raw"; + } + + auto writer = CreateSimpleWriter(*driver, SHORT_TOPIC_NAME, TStringBuilder() << "source" << i, {}, codec); + + std::string message(decompressedSize, 'x'); + + bool res = writer->Write(message, 1); + UNIT_ASSERT(res); + + res = writer->Close(TDuration::Seconds(10)); + UNIT_ASSERT(res); + } + + Sleep(TDuration::Seconds(1)); + + DumpCounters("write"); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 1); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), compressedSize); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), compressedSize); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightUncompressed->Val(), 0); + + if (mode == AFTER_WRITES) { + Cerr << ">>>> Delete topic" << Endl; + server.AnnoyingClient->DeleteTopic2(DEFAULT_TOPIC_NAME); + Cerr << ">>>> Topic deleted" << Endl; + + auto msg = reader->GetEvent(true, 1); + UNIT_ASSERT(msg); + UNIT_ASSERT(std::get_if<NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent>(&*msg)); + + decompressor->RunAllTasks(); + Sleep(TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightUncompressed->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), 0); + + return; + } + + ui32 dataEv = 0; + + auto doRead = [&]() { + auto msg = reader->GetEvent(true, 1); + UNIT_ASSERT(msg); + + if (!std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&*msg)) { + UNIT_FAIL("a TDataReceivedEvent event is expected"); + } + + ++dataEv; + }; + + decompressor->StartFuncs({0}); + Sleep(TDuration::Seconds(1)); + + DumpCounters("task #0"); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 2); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), compressedSize + decompressedSize); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), compressedSize); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightUncompressed->Val(), decompressedSize); + + if (mode == AFTER_START_TASKS) { + Cerr << ">>>> Delete topic" << Endl; + server.AnnoyingClient->DeleteTopic2(DEFAULT_TOPIC_NAME); + Cerr << ">>>> Topic deleted" << Endl; + + auto msg = reader->GetEvent(true, 1); + UNIT_ASSERT(msg); + UNIT_ASSERT(std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&*msg)); + + msg = reader->GetEvent(true, 1); + UNIT_ASSERT(msg); + UNIT_ASSERT(std::get_if<NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent>(&*msg)); + + decompressor->RunAllTasks(); + Sleep(TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightUncompressed->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), 0); + + return; + } + + doRead(); + Sleep(TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(dataEv, 1); + + DumpCounters("read"); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 1); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), compressedSize); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), compressedSize); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightUncompressed->Val(), 0); + + if (mode == AFTER_DOREAD) { + Cerr << ">>>> Delete topic" << Endl; + server.AnnoyingClient->DeleteTopic2(DEFAULT_TOPIC_NAME); + Cerr << ">>>> Topic deleted" << Endl; + + auto msg = reader->GetEvent(true, 1); + UNIT_ASSERT(msg); + UNIT_ASSERT(std::get_if<NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent>(&*msg)); + + decompressor->RunAllTasks(); + Sleep(TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightUncompressed->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), 0); + + return; + } + + UNIT_FAIL("incorrect mode"); + } + + Y_UNIT_TEST(WhenTheTopicIsDeletedBeforeDataIsDecompressed_Compressed) { + WhenTheTopicIsDeletedImpl(AFTER_WRITES, 1_MB + 1, true, 1_MB - 1_KB, 1050); + } + + Y_UNIT_TEST(WhenTheTopicIsDeletedAfterDecompressingTheData_Compressed) { + WhenTheTopicIsDeletedImpl(AFTER_START_TASKS, 1_MB + 1, true, 1_MB - 1_KB, 1050); + } + + Y_UNIT_TEST(WhenTheTopicIsDeletedAfterReadingTheData_Compressed) { + WhenTheTopicIsDeletedImpl(AFTER_DOREAD, 1_MB + 1, true, 1_MB - 1_KB, 1050); } + + Y_UNIT_TEST(WhenTheTopicIsDeletedBeforeDataIsDecompressed_Uncompressed) { + WhenTheTopicIsDeletedImpl(AFTER_WRITES, 1_MB + 1, false, 1_MB - 1_KB, 1_MB - 1_KB); + } + + Y_UNIT_TEST(WhenTheTopicIsDeletedAfterDecompressingTheData_Uncompressed) { + WhenTheTopicIsDeletedImpl(AFTER_START_TASKS, 1_MB + 1, false, 1_MB - 1_KB, 1_MB - 1_KB); + } + + Y_UNIT_TEST(WhenTheTopicIsDeletedAfterReadingTheData_Uncompressed) { + WhenTheTopicIsDeletedImpl(AFTER_DOREAD, 1_MB + 1, false, 1_MB - 1_KB, 1_MB - 1_KB); + } + + Y_UNIT_TEST(CheckDecompressionTasksWithoutSession) { + NPersQueue::TTestServer server; + server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::PQ_READ_PROXY}); + server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1); + + auto driver = server.AnnoyingClient->GetDriver(); + auto decompressor = CreateThreadPoolExecutorWrapper(2); + + NYdb::NPersQueue::TReadSessionSettings settings; + settings.ConsumerName("shared/user").AppendTopics(SHORT_TOPIC_NAME).ReadOriginal({"dc1"}); + settings.DecompressionExecutor(decompressor); + + auto reader = CreateReader(*driver, settings); + auto counters = reader->GetCounters(); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0); + { + auto msg = reader->GetEvent(true, 1); + UNIT_ASSERT(msg); + + auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*msg); + UNIT_ASSERT(ev); + + ev->Confirm(); + } + + for (ui32 i = 0; i < 2; ++i) { + auto writer = CreateSimpleWriter(*driver, SHORT_TOPIC_NAME, TStringBuilder() << "source" << i); + + bool res = writer->Write("abracadabra", 1); + UNIT_ASSERT(res); + + res = writer->Close(TDuration::Seconds(10)); + UNIT_ASSERT(res); + } + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 2); + + reader = nullptr; + + decompressor->RunAllTasks(); + Sleep(TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 2); + } Y_UNIT_TEST(TestWriteStat) { auto testWriteStat = [](const TString& originallyProvidedConsumerName, diff --git a/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp b/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp index f8177513507..336168c3690 100644 --- a/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp +++ b/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp @@ -43,6 +43,17 @@ size_t FunctionExecutorWrapper::GetFuncsCount() const } } +void FunctionExecutorWrapper::RunAllTasks() +{ + with_lock (Mutex) { + for (auto& func : Funcs) { + if (func) { + Executor->Post(std::move(func)); + } + } + } +} + TIntrusivePtr<FunctionExecutorWrapper> CreateThreadPoolExecutorWrapper(size_t threads) { return MakeIntrusive<FunctionExecutorWrapper>(NYdb::NPersQueue::CreateThreadPoolExecutor(threads)); diff --git a/ydb/services/persqueue_v1/ut/functions_executor_wrapper.h b/ydb/services/persqueue_v1/ut/functions_executor_wrapper.h index c895ee15d53..665d2d154f9 100644 --- a/ydb/services/persqueue_v1/ut/functions_executor_wrapper.h +++ b/ydb/services/persqueue_v1/ut/functions_executor_wrapper.h @@ -20,6 +20,8 @@ public: size_t GetFuncsCount() const; + void RunAllTasks(); + private: void DoStart() override; |