aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2022-09-29 09:39:03 +0300
committerabcdef <akotov@ydb.tech>2022-09-29 09:39:03 +0300
commitcb023f74d94535ebb12817c4c1be92c26370bcf7 (patch)
tree8d35e92afd51c37e4303009031298d9f84c72a2a
parent06f2527c743ff97dfaea75157accbb574bc2edc4 (diff)
downloadydb-cb023f74d94535ebb12817c4c1be92c26370bcf7.tar.gz
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp30
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h134
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp344
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp28
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp28
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h3
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp431
-rw-r--r--ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp11
-rw-r--r--ydb/services/persqueue_v1/ut/functions_executor_wrapper.h2
10 files changed, 708 insertions, 305 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h
index 69b5c1aa7a4..fcfd457fc5f 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h
@@ -334,7 +334,6 @@ protected:
template <class TEventType, class TFunc>
void PushSpecificHandler(TEventInfo&& eventInfo, const TFunc& f) {
Post(Settings.EventHandlers_.HandlersExecutor_, [func = f, event = std::move(eventInfo)]() mutable {
- event.OnUserRetrievedEvent();
func(std::get<TEventType>(event.GetEvent()));
});
}
@@ -342,7 +341,6 @@ protected:
template <class TFunc>
void PushCommonHandler(TEventInfo&& eventInfo, const TFunc& f) {
Post(Settings.EventHandlers_.HandlersExecutor_, [func = f, event = std::move(eventInfo)]() mutable {
- event.OnUserRetrievedEvent();
func(event.GetEvent());
});
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
index 857ea12b177..1cfc8466307 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
@@ -56,7 +56,6 @@ TReadSession::TReadSession(const TReadSessionSettings& settings,
TReadSession::~TReadSession() {
Abort(EStatus::ABORTED, "Aborted");
- WaitAllDecompressionTasks();
ClearAllEvents();
}
@@ -450,14 +449,6 @@ void TReadSession::Abort(TSessionClosedEvent&& closeEvent) {
}
}
-void TReadSession::WaitAllDecompressionTasks() {
- for (auto& [cluster, sessionInfo] : ClusterSessions) {
- if (sessionInfo.Session) {
- sessionInfo.Session->WaitAllDecompressionTasks();
- }
- }
-}
-
void TReadSession::ClearAllEvents() {
EventsQueue->ClearAllEvents();
}
@@ -504,20 +495,13 @@ void TReadSession::ResumeReadingData() {
}
}
-static ELogPriority GetEventLogPriority(const TReadSessionEvent::TEvent& event) {
- if (std::holds_alternative<TReadSessionEvent::TCreatePartitionStreamEvent>(event)
- || std::holds_alternative<TReadSessionEvent::TDestroyPartitionStreamEvent>(event)
- || std::holds_alternative<TReadSessionEvent::TPartitionStreamClosedEvent>(event)
- || std::holds_alternative<TSessionClosedEvent>(event))
- { // Control event.
- return TLOG_INFO;
- } else {
- return TLOG_DEBUG;
- }
-}
-
-void TReadSession::OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) {
- Log.Write(GetEventLogPriority(event), GetLogPrefix() << "Read session event " << DebugString(event));
+void TReadSession::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) {
+ Log.Write(TLOG_DEBUG, GetLogPrefix()
+ << "The application data is transferred to the client. Number of messages "
+ << messagesCount
+ << ", size "
+ << decompressedSize
+ << " bytes");
}
void TReadSession::MakeCountersIfNeeded() {
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index 8425e2ed937..6228462c687 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -78,7 +78,6 @@ using TAReadSessionSettings = std::conditional_t<UseMigrationProtocol,
NYdb::NPersQueue::TReadSessionSettings,
NYdb::NTopic::TReadSessionSettings>;
-
template <bool UseMigrationProtocol>
class TPartitionStreamImpl;
@@ -93,6 +92,12 @@ class TReadSessionEventsQueue;
class TReadSession;
+template <bool UseMigrationProtocol>
+class TDataDecompressionInfo;
+
+template <bool UseMigrationProtocol>
+using TDataDecompressionInfoPtr = typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr;
+
template <bool UseMigrationProtocol>
struct IErrorHandler : public TThrRefBase {
@@ -132,6 +137,7 @@ public:
void DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TPlainStatus&& status);
void DeferStartSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session);
void DeferSignalWaiter(TWaiter&& waiter);
+ void DeferDestroyDecompressionInfos(std::vector<TDataDecompressionInfoPtr<UseMigrationProtocol>>&& infos);
private:
@@ -166,6 +172,8 @@ private:
// Session to start
std::vector<std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>>> Sessions;
+
+ std::vector<TDataDecompressionInfoPtr<UseMigrationProtocol>> DecompressionInfos;
};
template <bool UseMigrationProtocol>
@@ -181,20 +189,20 @@ public:
bool doDecompress,
i64 serverBytesSize = 0 // to increment read request bytes size
);
+ ~TDataDecompressionInfo();
i64 StartDecompressionTasks(const typename IAExecutor<UseMigrationProtocol>::TPtr& executor,
i64 availableMemory,
- double averageCompressionRatio,
- const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>& partitionStream,
TDeferredActions<UseMigrationProtocol>& deferred);
+ void PlanDecompressionTasks(double averageCompressionRatio,
+ TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream);
bool IsReady() const {
return SourceDataNotProcessed == 0;
}
bool AllDecompressionTasksStarted() const {
- Y_VERIFY(ServerMessage.batches_size() > 0);
- return CurrentDecompressingMessage.first >= static_cast<size_t>(ServerMessage.batches_size());
+ return Tasks.empty();
}
i64 GetCompressedDataSize() const {
@@ -236,12 +244,6 @@ public:
return BatchesMeta[batchIndex];
}
- // Takes data. Returns true if event has more unpacked data.
- bool TakeData(const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>& partitionStream,
- TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages,
- TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages,
- size_t* maxByteSize);
-
bool HasMoreData() const {
return CurrentReadingMessage.first < static_cast<size_t>(GetServerMessage().batches_size());
}
@@ -251,6 +253,9 @@ public:
void PutDecompressionError(std::exception_ptr error, size_t batch, size_t message);
std::exception_ptr GetDecompressionError(size_t batch, size_t message);
+ void OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount, i64 serverBytesSize = 0);
+ void OnUserRetrievedEvent(i64 decompressedDataSize, size_t messagesCount);
+
private:
// Special struct for marking (batch/message) as ready.
struct TReadyMessageThreshold {
@@ -274,6 +279,10 @@ private:
return Messages.size();
}
+ i64 GetEstimatedDecompressedSize() const {
+ return EstimatedDecompressedSize;
+ }
+
private:
TDataDecompressionInfo::TPtr Parent;
TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> PartitionStream;
@@ -295,7 +304,6 @@ private:
std::vector<typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr> BatchesMeta;
std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session;
bool DoDecompress;
- i64 CompressedDataSize = 0;
i64 ServerBytesSize = 0;
std::atomic<i64> SourceDataNotProcessed = 0;
std::pair<size_t, size_t> CurrentDecompressingMessage = {0, 0}; // (Batch, Message)
@@ -307,12 +315,42 @@ private:
std::atomic<bool> DecompressionErrorsStructCreated = false;
TAdaptiveLock DecompressionErrorsStructLock;
std::vector<std::vector<std::exception_ptr>> DecompressionErrors;
+
+ std::atomic<i64> MessagesInflight = 0;
+ std::atomic<i64> CompressedDataSize = 0;
+ std::atomic<i64> DecompressedDataSize = 0;
+
+ std::deque<TDecompressionTask> Tasks;
+};
+
+template <bool UseMigrationProtocol>
+struct TUserRetrievedEventInfoAccumulator {
+ void OnTakeData(TDataDecompressionInfoPtr<UseMigrationProtocol> info, i64 decompressedSize) {
+ Y_VERIFY(info);
+
+ auto& counter = Counters[info];
+
+ counter.DecompressedSize += decompressedSize;
+ ++counter.MessagesCount;
+ }
+ void OnUserRetrievedEvent() {
+ for (auto& [info, counter] : Counters) {
+ info->OnUserRetrievedEvent(counter.DecompressedSize, counter.MessagesCount);
+ }
+ }
+
+ struct TCounter {
+ i64 DecompressedSize = 0;
+ size_t MessagesCount = 0;
+ };
+
+ TMap<TDataDecompressionInfoPtr<UseMigrationProtocol>, TCounter> Counters;
};
template <bool UseMigrationProtocol>
class TDataDecompressionEvent {
public:
- TDataDecompressionEvent(size_t batch, size_t message, typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent, std::atomic<bool> &ready) :
+ TDataDecompressionEvent(size_t batch, size_t message, TDataDecompressionInfoPtr<UseMigrationProtocol> parent, std::atomic<bool>& ready) :
Batch{batch},
Message{message},
Parent{std::move(parent)},
@@ -324,23 +362,28 @@ public:
return Ready;
}
- bool TakeData(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
+ void TakeData(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages,
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages,
- size_t* maxByteSize) const;
+ size_t* maxByteSize,
+ TUserRetrievedEventInfoAccumulator<UseMigrationProtocol>* accumulator) const;
+
+ TDataDecompressionInfoPtr<UseMigrationProtocol> GetParent() const {
+ return Parent;
+ }
private:
size_t Batch;
size_t Message;
- typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr Parent;
- std::atomic<bool> &Ready;
+ TDataDecompressionInfoPtr<UseMigrationProtocol> Parent;
+ std::atomic<bool>& Ready;
};
template <bool UseMigrationProtocol>
struct IUserRetrievedEventCallback {
virtual ~IUserRetrievedEventCallback() = default;
- virtual void OnUserRetrievedEvent(const typename TAReadSessionEvent<UseMigrationProtocol>::TEvent& event) = 0;
+ virtual void OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) = 0;
};
template <bool UseMigrationProtocol>
@@ -381,8 +424,6 @@ struct TReadSessionEventInfo {
return *Event;
}
- void OnUserRetrievedEvent();
-
bool IsSessionClosedEvent() const {
return Event && std::holds_alternative<TASessionClosedEvent<UseMigrationProtocol>>(*Event);
}
@@ -400,7 +441,7 @@ struct TRawPartitionStreamEvent {
TRawPartitionStreamEvent(size_t batch,
size_t message,
- typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent,
+ TDataDecompressionInfoPtr<UseMigrationProtocol> parent,
std::atomic<bool> &ready)
: Event(std::in_place_type_t<TDataDecompressionEvent<UseMigrationProtocol>>(),
batch,
@@ -484,7 +525,7 @@ public:
void SignalReadyEvents(TPartitionStreamImpl<UseMigrationProtocol>& stream,
TReadSessionEventsQueue<UseMigrationProtocol>& queue,
TDeferredActions<UseMigrationProtocol>& deferred);
- void DeleteNotReadyTail();
+ void DeleteNotReadyTail(TDeferredActions<UseMigrationProtocol>& deferred);
private:
std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> Ready;
@@ -585,7 +626,7 @@ public:
void InsertDataEvent(size_t batch,
size_t message,
- typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent,
+ TDataDecompressionInfoPtr<UseMigrationProtocol> parent,
std::atomic<bool> &ready)
{
++DataDecompressionEventsCount;
@@ -678,7 +719,7 @@ public:
return true;
}
- void DeleteNotReadyTail();
+ void DeleteNotReadyTail(TDeferredActions<UseMigrationProtocol>& deferred);
private:
const TKey Key;
@@ -711,9 +752,12 @@ public:
explicit TReadSessionEventsQueue(const TAReadSessionSettings<UseMigrationProtocol>& settings, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session);
typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent
- GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, size_t* maxByteSize); // Assumes that we're under lock.
+ GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream,
+ size_t* maxByteSize,
+ TUserRetrievedEventInfoAccumulator<UseMigrationProtocol>* accumulator); // Assumes that we're under lock.
- TReadSessionEventInfo<UseMigrationProtocol> GetEventImpl(size_t* maxByteSize) { // Assumes that we're under lock.
+ TReadSessionEventInfo<UseMigrationProtocol> GetEventImpl(size_t* maxByteSize,
+ TUserRetrievedEventInfoAccumulator<UseMigrationProtocol>* accumulator = nullptr) { // Assumes that we're under lock.
Y_ASSERT(TParent::HasEventsImpl());
if (!TParent::Events.empty()) {
@@ -727,7 +771,7 @@ public:
TMaybe<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> event;
if (partitionStream->TopEvent().IsDataEvent()) {
- event = GetDataEventImpl(partitionStream, maxByteSize);
+ event = GetDataEventImpl(partitionStream, maxByteSize, accumulator);
} else {
event = std::move(partitionStream->TopEvent().GetEvent());
partitionStream->PopEvent();
@@ -748,6 +792,8 @@ public:
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing(), size_t maxByteSize = std::numeric_limits<size_t>::max()) {
TVector<TReadSessionEventInfo<UseMigrationProtocol>> eventInfos;
const size_t maxCount = maxEventsCount ? *maxEventsCount : std::numeric_limits<size_t>::max();
+ TUserRetrievedEventInfoAccumulator<UseMigrationProtocol> accumulator;
+
TDeferredActions<UseMigrationProtocol> deferred;
with_lock (TParent::Mutex) {
eventInfos.reserve(Min(TParent::Events.size() + TParent::CloseEvent.Defined(), maxCount));
@@ -759,7 +805,7 @@ public:
ApplyCallbacksToReadyEventsImpl(deferred);
while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) {
- TReadSessionEventInfo<UseMigrationProtocol> event = GetEventImpl(&maxByteSize);
+ TReadSessionEventInfo<UseMigrationProtocol> event = GetEventImpl(&maxByteSize, &accumulator);
eventInfos.emplace_back(std::move(event));
if (eventInfos.back().IsSessionClosedEvent()) {
break;
@@ -772,14 +818,18 @@ public:
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> result;
result.reserve(eventInfos.size());
for (TReadSessionEventInfo<UseMigrationProtocol>& eventInfo : eventInfos) {
- eventInfo.OnUserRetrievedEvent();
result.emplace_back(std::move(eventInfo.GetEvent()));
}
+
+ accumulator.OnUserRetrievedEvent();
+
return result;
}
TMaybe<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> GetEvent(bool block = false, size_t maxByteSize = std::numeric_limits<size_t>::max()) {
TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> eventInfo;
+ TUserRetrievedEventInfoAccumulator<UseMigrationProtocol> accumulator;
+
TDeferredActions<UseMigrationProtocol> deferred;
with_lock (TParent::Mutex) {
do {
@@ -790,19 +840,21 @@ public:
const bool appliedCallbacks = ApplyCallbacksToReadyEventsImpl(deferred);
if (TParent::HasEventsImpl()) {
- eventInfo = GetEventImpl(&maxByteSize);
+ eventInfo = GetEventImpl(&maxByteSize, &accumulator);
} else if (!appliedCallbacks) {
return Nothing();
}
} while (block && !eventInfo);
ApplyCallbacksToReadyEventsImpl(deferred);
}
+
if (eventInfo) {
- eventInfo->OnUserRetrievedEvent();
+ accumulator.OnUserRetrievedEvent();
+
return std::move(eventInfo->Event);
- } else {
- return Nothing();
}
+
+ return Nothing();
}
void Close(const TASessionClosedEvent<UseMigrationProtocol>& event, TDeferredActions<UseMigrationProtocol>& deferred) {
@@ -832,7 +884,7 @@ public:
void PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream,
size_t batch,
size_t message,
- typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent,
+ TDataDecompressionInfoPtr<UseMigrationProtocol> parent,
std::atomic<bool> &ready);
void SignalEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
@@ -943,6 +995,7 @@ public:
using TPtr = std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>>;
using IProcessor = typename IReadSessionConnectionProcessorFactory<UseMigrationProtocol>::IProcessor;
+
friend class TPartitionStreamImpl<UseMigrationProtocol>;
TSingleClusterReadSessionImpl(
@@ -981,13 +1034,15 @@ public:
void Commit(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, ui64 startOffset, ui64 endOffset);
void OnCreateNewDecompressionTask();
+ void OnDecompressionInfoDestroy(i64 compressedSize, i64 decompressedSize, i64 messagesCount);
+
void OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount, i64 serverBytesSize = 0);
TReadSessionEventsQueue<UseMigrationProtocol>* GetEventsQueue() {
return EventsQueue.get();
}
- void OnUserRetrievedEvent(const typename TAReadSessionEvent<UseMigrationProtocol>::TEvent& event) override;
+ void OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) override;
void Abort();
void Close(std::function<void()> callback);
@@ -997,8 +1052,6 @@ public:
void StopReadingData();
void ResumeReadingData();
- void WaitAllDecompressionTasks();
-
void DumpStatisticsToLog(TLogElement& log);
void UpdateMemoryUsageStatistics();
@@ -1140,14 +1193,14 @@ private:
};
struct TDecompressionQueueItem {
- TDecompressionQueueItem(typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr batchInfo,
+ TDecompressionQueueItem(TDataDecompressionInfoPtr<UseMigrationProtocol> batchInfo,
TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream)
: BatchInfo(std::move(batchInfo))
, PartitionStream(std::move(partitionStream))
{
}
- typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr BatchInfo;
+ TDataDecompressionInfoPtr<UseMigrationProtocol> BatchInfo;
TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> PartitionStream;
};
@@ -1261,7 +1314,6 @@ public:
void Abort(TSessionClosedEvent&& closeEvent);
- void WaitAllDecompressionTasks();
void ClearAllEvents();
private:
@@ -1287,7 +1339,7 @@ private:
void AbortImpl(EStatus statusCode, NYql::TIssues&& issues, TDeferredActions<true>& deferred);
void AbortImpl(EStatus statusCode, const TString& message, TDeferredActions<true>& deferred);
- void OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) override;
+ void OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) override;
void MakeCountersIfNeeded();
void DumpCountersToLog(size_t timeNumber = 0);
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index 01a699a4129..afa8aa982a5 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -133,9 +133,9 @@ void TPartitionStreamImpl<UseMigrationProtocol>::SignalReadyEvents(TReadSessionE
}
template<bool UseMigrationProtocol>
-void TPartitionStreamImpl<UseMigrationProtocol>::DeleteNotReadyTail()
+void TPartitionStreamImpl<UseMigrationProtocol>::DeleteNotReadyTail(TDeferredActions<UseMigrationProtocol>& deferred)
{
- EventsQueue.DeleteNotReadyTail();
+ EventsQueue.DeleteNotReadyTail(deferred);
}
template<bool UseMigrationProtocol>
@@ -154,19 +154,26 @@ void TRawPartitionStreamEventQueue<UseMigrationProtocol>::SignalReadyEvents(TPar
}
template<bool UseMigrationProtocol>
-void TRawPartitionStreamEventQueue<UseMigrationProtocol>::DeleteNotReadyTail()
+void TRawPartitionStreamEventQueue<UseMigrationProtocol>::DeleteNotReadyTail(TDeferredActions<UseMigrationProtocol>& deferred)
{
- std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> head;
+ std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> ready;
- for (auto& event : NotReady) {
- if (!event.IsReady()) {
- break;
- }
+ auto i = NotReady.begin();
+ for (; (i != NotReady.end()) && i->IsReady(); ++i) {
+ ready.push_back(std::move(*i));
+ }
+
+ std::vector<TDataDecompressionInfoPtr<UseMigrationProtocol>> infos;
- head.push_back(std::move(event));
+ for (; i != NotReady.end(); ++i) {
+ if (i->IsDataEvent()) {
+ infos.push_back(i->GetDataEvent().GetParent());
+ }
}
- swap(head, NotReady);
+ deferred.DeferDestroyDecompressionInfos(std::move(infos));
+
+ swap(ready, NotReady);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -668,24 +675,26 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::RequestPartitionStream
}
template<bool UseMigrationProtocol>
-void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnUserRetrievedEvent(const typename TAReadSessionEvent<UseMigrationProtocol>::TEvent& event) {
- Log.Write(TLOG_DEBUG, GetLogPrefix() << "Read session event " << DebugString(event));
- const i64 bytesCount = static_cast<i64>(CalcDataSize<TAReadSessionEvent<UseMigrationProtocol>>(event));
- Y_ASSERT(bytesCount >= 0);
-
- if (!std::get_if<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent>(&event)) { // Event is not data event.
- return;
- }
+void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount)
+{
+ Log.Write(TLOG_DEBUG, GetLogPrefix()
+ << "The application data is transferred to the client. Number of messages "
+ << messagesCount
+ << ", size "
+ << decompressedSize
+ << " bytes");
- *Settings.Counters_->MessagesInflight -= std::get<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent>(event).GetMessagesCount();
- *Settings.Counters_->BytesInflightTotal -= bytesCount;
- *Settings.Counters_->BytesInflightUncompressed -= bytesCount;
+ *Settings.Counters_->MessagesInflight -= messagesCount;
+ *Settings.Counters_->BytesInflightTotal -= decompressedSize;
+ *Settings.Counters_->BytesInflightUncompressed -= decompressedSize;
TDeferredActions<UseMigrationProtocol> deferred;
with_lock (Lock) {
UpdateMemoryUsageStatisticsImpl();
- Y_VERIFY(bytesCount <= DecompressedDataSize);
- DecompressedDataSize -= bytesCount;
+
+ Y_VERIFY(decompressedSize <= DecompressedDataSize);
+ DecompressedDataSize -= decompressedSize;
+
ContinueReadingDataImpl();
StartDecompressionTasksImpl(deferred);
}
@@ -694,6 +703,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnUserRetrievedEvent(c
template <bool UseMigrationProtocol>
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::WriteToProcessorImpl(
TClientMessage<UseMigrationProtocol>&& req) { // Assumes that we're under lock.
+
if (Processor) {
Processor->Write(std::move(req));
}
@@ -913,10 +923,11 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
Settings.Decompress_);
Y_VERIFY(decompressionInfo);
- if (decompressionInfo) {
- DecompressionQueue.emplace_back(decompressionInfo, partitionStream);
- StartDecompressionTasksImpl(deferred);
- }
+ decompressionInfo->PlanDecompressionTasks(AverageCompressionRatio,
+ partitionStream);
+
+ DecompressionQueue.emplace_back(decompressionInfo, partitionStream);
+ StartDecompressionTasksImpl(deferred);
}
WaitingReadResponse = false;
@@ -1054,7 +1065,6 @@ template <>
inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
Ydb::Topic::StreamReadMessage::ReadResponse&& msg,
TDeferredActions<false>& deferred) { // Assumes that we're under lock.
-
if (Closing || Aborting) {
return; // Don't process new data.
}
@@ -1119,10 +1129,10 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
serverBytesSize = 0;
Y_VERIFY(decompressionInfo);
- if (decompressionInfo) {
- DecompressionQueue.emplace_back(decompressionInfo, partitionStream);
- StartDecompressionTasksImpl(deferred);
- }
+ decompressionInfo->PlanDecompressionTasks(AverageCompressionRatio,
+ partitionStream);
+ DecompressionQueue.emplace_back(decompressionInfo, partitionStream);
+ StartDecompressionTasksImpl(deferred);
}
WaitingReadResponse = false;
@@ -1246,8 +1256,6 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::StartDecompressionTask
TDecompressionQueueItem& current = DecompressionQueue.front();
auto sentToDecompress = current.BatchInfo->StartDecompressionTasks(Settings.DecompressionExecutor_,
Max(limit - DecompressedDataSize, static_cast<i64>(1)),
- AverageCompressionRatio,
- current.PartitionStream,
deferred);
DecompressedDataSize += sentToDecompress;
if (current.BatchInfo->AllDecompressionTasksStarted()) {
@@ -1281,8 +1289,22 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnCreateNewDecompressi
}
template<bool UseMigrationProtocol>
+void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDecompressionInfoDestroy(i64 compressedSize, i64 decompressedSize, i64 messagesCount)
+{
+ *Settings.Counters_->MessagesInflight -= messagesCount;
+ *Settings.Counters_->BytesInflightUncompressed -= decompressedSize;
+ *Settings.Counters_->BytesInflightCompressed -= compressedSize;
+ *Settings.Counters_->BytesInflightTotal -= (compressedSize + decompressedSize);
+
+ CompressedDataSize -= compressedSize;
+ DecompressedDataSize -= decompressedSize;
+}
+
+template<bool UseMigrationProtocol>
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount, i64 serverBytesSize) {
TDeferredActions<UseMigrationProtocol> deferred;
+
+ Y_VERIFY(DecompressionTasksInflight > 0);
--DecompressionTasksInflight;
*Settings.Counters_->BytesRead += decompressedSize;
@@ -1397,14 +1419,6 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ResumeReadingData() {
}
template<bool UseMigrationProtocol>
-void TSingleClusterReadSessionImpl<UseMigrationProtocol>::WaitAllDecompressionTasks() {
- Y_ASSERT(DecompressionTasksInflight >= 0);
- while (DecompressionTasksInflight > 0) {
- Sleep(TDuration::MilliSeconds(5)); // Perform active wait because this is aborting process and there are no decompression tasks here in normal situation.
- }
-}
-
-template<bool UseMigrationProtocol>
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::DumpStatisticsToLog(TLogElement& log) {
with_lock (Lock) {
// cluster:topic:partition:stream-id:read-offset:committed-offset
@@ -1571,13 +1585,6 @@ bool TReadSessionEventInfo<UseMigrationProtocol>::IsDataEvent() const {
return !IsEmpty() && PartitionStream->TopEvent().IsDataEvent();
}
-template<bool UseMigrationProtocol>
-void TReadSessionEventInfo<UseMigrationProtocol>::OnUserRetrievedEvent() {
- if (auto session = Session.lock()) {
- session->OnUserRetrievedEvent(*Event);
- }
-}
-
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TReadSessionEventsQueue
@@ -1613,7 +1620,7 @@ TReadSessionEventsQueue<UseMigrationProtocol>::TReadSessionEventsQueue(
template <bool UseMigrationProtocol>
void TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream,
std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> /*session*/,
- typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event,
+ typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event,
TDeferredActions<UseMigrationProtocol>& deferred)
{
if (TParent::Closed) {
@@ -1628,7 +1635,7 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TIntrusivePtr<TPar
>;
if (std::holds_alternative<TClosedEvent>(event)) {
- stream->DeleteNotReadyTail();
+ stream->DeleteNotReadyTail(deferred);
}
stream->InsertEvent(std::move(event));
@@ -1670,14 +1677,14 @@ template <bool UseMigrationProtocol>
void TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
size_t batch,
size_t message,
- typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent,
+ TDataDecompressionInfoPtr<UseMigrationProtocol> parent,
std::atomic<bool>& ready)
{
if (this->Closed) {
return;
}
- with_lock (this->Mutex) {
+ with_lock (TParent::Mutex) {
partitionStream->InsertDataEvent(batch, message, parent, ready);
}
}
@@ -1685,8 +1692,8 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent(TIntrusivePtr<
template <bool UseMigrationProtocol>
typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventImpl(
TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream,
- size_t* maxByteSize) { // Assumes that we're under lock.
-
+ size_t* maxByteSize,
+ TUserRetrievedEventInfoAccumulator<UseMigrationProtocol>* accumulator) { // Assumes that we're under lock.
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage> messages;
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage> compressedMessages;
@@ -1698,7 +1705,7 @@ typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent TReadSessi
Y_VERIFY(event.EventsCount > 0);
for (; (event.EventsCount > 0) && (*maxByteSize > 0); --event.EventsCount) {
- stream->TopEvent().GetDataEvent().TakeData(stream, &messages, &compressedMessages, maxByteSize);
+ stream->TopEvent().GetDataEvent().TakeData(stream, &messages, &compressedMessages, maxByteSize, accumulator);
stream->PopEvent();
}
@@ -1716,7 +1723,7 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::SignalReadyEvents(
TPartitionStreamImpl<UseMigrationProtocol>* partitionStream) {
Y_ASSERT(partitionStream);
TDeferredActions<UseMigrationProtocol> deferred;
- with_lock (TReadSessionEventsQueue<UseMigrationProtocol>::Mutex) {
+ with_lock (TParent::Mutex) {
SignalReadyEventsImpl(partitionStream, deferred);
}
}
@@ -1850,17 +1857,32 @@ TDataDecompressionInfo<UseMigrationProtocol>::TDataDecompressionInfo(
, DoDecompress(doDecompress)
, ServerBytesSize(serverBytesSize)
{
+ i64 compressedSize = 0;
+ i64 messagesCount = 0;
+
for (const auto& batch : ServerMessage.batches()) {
for (const auto& messageData : batch.message_data()) {
- CompressedDataSize += messageData.data().size();
+ compressedSize += messageData.data().size();
+ ++messagesCount;
}
}
- SourceDataNotProcessed = CompressedDataSize;
+
+ MessagesInflight = messagesCount;
+ SourceDataNotProcessed = compressedSize;
+ CompressedDataSize = compressedSize;
BuildBatchesMeta();
}
template<bool UseMigrationProtocol>
+TDataDecompressionInfo<UseMigrationProtocol>::~TDataDecompressionInfo()
+{
+ if (auto session = Session.lock()) {
+ session->OnDecompressionInfoDestroy(CompressedDataSize, DecompressedDataSize, MessagesInflight);
+ }
+}
+
+template<bool UseMigrationProtocol>
void TDataDecompressionInfo<UseMigrationProtocol>::BuildBatchesMeta() {
BatchesMeta.reserve(ServerMessage.batches_size());
for (const auto& batch : ServerMessage.batches()) {
@@ -1914,169 +1936,89 @@ std::exception_ptr TDataDecompressionInfo<UseMigrationProtocol>::GetDecompressio
template <bool UseMigrationProtocol>
i64 TDataDecompressionInfo<UseMigrationProtocol>::StartDecompressionTasks(
const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, i64 availableMemory,
- double averageCompressionRatio, const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>& partitionStream,
- TDeferredActions<UseMigrationProtocol>& deferred) {
+ TDeferredActions<UseMigrationProtocol>& deferred)
+{
+ auto session = Session.lock();
+ Y_ASSERT(session);
+ i64 used = 0;
+
+ while (availableMemory > 0 && !Tasks.empty()) {
+ auto& task = Tasks.front();
+
+ used += task.GetEstimatedDecompressedSize();
+ availableMemory -= task.GetEstimatedDecompressedSize();
+
+ session->OnCreateNewDecompressionTask();
+
+ deferred.DeferStartExecutorTask(executor, std::move(task));
+ Tasks.pop_front();
+ }
+
+ return used;
+}
+
+template<bool UseMigrationProtocol>
+void TDataDecompressionInfo<UseMigrationProtocol>::PlanDecompressionTasks(double averageCompressionRatio,
+ TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream)
+{
constexpr size_t TASK_LIMIT = 512_KB;
- std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Session.lock();
+
+ auto session = Session.lock();
Y_ASSERT(session);
+
ReadyThresholds.emplace_back();
+
TDecompressionTask task(TDataDecompressionInfo::shared_from_this(), partitionStream, &ReadyThresholds.back());
- i64 used = 0;
- while (availableMemory > 0 && !AllDecompressionTasksStarted()) {
+
+ while (CurrentDecompressingMessage.first < static_cast<size_t>(ServerMessage.batches_size())) {
const auto& batch = ServerMessage.batches(CurrentDecompressingMessage.first);
+
if (CurrentDecompressingMessage.second < static_cast<size_t>(batch.message_data_size())) {
const auto& messageData = batch.message_data(CurrentDecompressingMessage.second);
const i64 size = static_cast<i64>(messageData.data().size());
const i64 estimatedDecompressedSize = messageData.uncompressed_size()
? static_cast<i64>(messageData.uncompressed_size())
: static_cast<i64>(size * averageCompressionRatio);
-
Y_VERIFY(estimatedDecompressedSize >= 0);
task.Add(CurrentDecompressingMessage.first, CurrentDecompressingMessage.second, size, estimatedDecompressedSize);
+
session->GetEventsQueue()->PushDataEvent(partitionStream,
CurrentDecompressingMessage.first,
CurrentDecompressingMessage.second,
TDataDecompressionInfo::shared_from_this(),
ReadyThresholds.back().Ready);
-
- used += estimatedDecompressedSize;
- availableMemory -= estimatedDecompressedSize;
}
+
++CurrentDecompressingMessage.second;
+
if (CurrentDecompressingMessage.second >= static_cast<size_t>(batch.message_data_size())) { // next batch
++CurrentDecompressingMessage.first;
CurrentDecompressingMessage.second = 0;
}
+
if (task.AddedDataSize() >= TASK_LIMIT) {
- session->OnCreateNewDecompressionTask();
- deferred.DeferStartExecutorTask(executor, std::move(task));
+ Tasks.push_back(std::move(task));
+
ReadyThresholds.emplace_back();
task = TDecompressionTask(TDataDecompressionInfo::shared_from_this(), partitionStream, &ReadyThresholds.back());
}
}
+
if (task.AddedMessagesCount() > 0) {
- session->OnCreateNewDecompressionTask();
- deferred.DeferStartExecutorTask(executor, std::move(task));
+ Tasks.push_back(std::move(task));
} else {
ReadyThresholds.pop_back(); // Revert.
}
- return used;
}
template<bool UseMigrationProtocol>
-bool TDataDecompressionInfo<UseMigrationProtocol>::TakeData(const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>& partitionStream,
- TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages,
- TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages,
- size_t* maxByteSize)
-{
- TMaybe<std::pair<size_t, size_t>> readyThreshold = GetReadyThreshold();
- Y_ASSERT(readyThreshold);
- auto& msg = GetServerMessage();
- i64 minOffset = Max<i64>();
- i64 maxOffset = 0;
- const auto prevReadingMessage = CurrentReadingMessage;
- while (HasMoreData() && *maxByteSize > 0 && CurrentReadingMessage <= *readyThreshold) {
- auto& batch = *msg.mutable_batches(CurrentReadingMessage.first);
- if (CurrentReadingMessage.second < static_cast<size_t>(batch.message_data_size())) {
- const auto& meta = GetBatchMeta(CurrentReadingMessage.first);
- const TInstant batchWriteTimestamp = [&batch](){
- if constexpr (UseMigrationProtocol) {
- return TInstant::MilliSeconds(batch.write_timestamp_ms());
- } else {
- return TInstant::MilliSeconds(
- ::google::protobuf::util::TimeUtil::TimestampToMilliseconds(batch.written_at()));
- }
- }();
- auto& messageData = *batch.mutable_message_data(CurrentReadingMessage.second);
- minOffset = Min(minOffset, static_cast<i64>(messageData.offset()));
- maxOffset = Max(maxOffset, static_cast<i64>(messageData.offset()));
-
- if constexpr (UseMigrationProtocol) {
- TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo(
- messageData.offset(),
- batch.source_id(),
- messageData.seq_no(),
- TInstant::MilliSeconds(messageData.create_timestamp_ms()),
- batchWriteTimestamp,
- batch.ip(),
- meta,
- messageData.uncompressed_size()
- );
- if (DoDecompress) {
- messages->emplace_back(
- messageData.data(),
- GetDecompressionError(CurrentReadingMessage.first, CurrentReadingMessage.second),
- messageInfo,
- partitionStream,
- messageData.partition_key(),
- messageData.explicit_hash()
- );
- } else {
- compressedMessages->emplace_back(
- static_cast<ECodec>(messageData.codec()),
- messageData.data(),
- TVector<TReadSessionEvent::TDataReceivedEvent::TMessageInformation>{messageInfo},
- partitionStream,
- messageData.partition_key(),
- messageData.explicit_hash()
- );
- }
- } else {
- NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo(
- messageData.offset(),
- batch.producer_id(),
- messageData.seq_no(),
- TInstant::MilliSeconds(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(messageData.created_at())),
- batchWriteTimestamp,
- meta,
- messageData.uncompressed_size(),
- messageData.message_group_id()
- );
- if (DoDecompress) {
- messages->emplace_back(
- messageData.data(),
- GetDecompressionError(CurrentReadingMessage.first, CurrentReadingMessage.second),
- messageInfo,
- partitionStream
- );
- } else {
- compressedMessages->emplace_back(
- static_cast<NTopic::ECodec>(batch.codec()),
- messageData.data(),
- messageInfo,
- partitionStream
- );
- }
- }
-
- *maxByteSize -= Min(*maxByteSize, messageData.data().size());
-
- // Clear data to free internal session's memory.
- messageData.clear_data();
- }
-
- ++CurrentReadingMessage.second;
- if (CurrentReadingMessage.second >= static_cast<size_t>(batch.message_data_size())) {
- CurrentReadingMessage.second = 0;
- do {
- ++CurrentReadingMessage.first;
- } while (CurrentReadingMessage.first < static_cast<size_t>(msg.batches_size()) && msg.batches(CurrentReadingMessage.first).message_data_size() == 0);
- }
- }
- partitionStream->GetLog().Write(TLOG_DEBUG, TStringBuilder() << "Take Data. Partition " << partitionStream->GetPartitionId()
- << ". Read: {" << prevReadingMessage.first << ", " << prevReadingMessage.second << "} -> {"
- << CurrentReadingMessage.first << ", " << CurrentReadingMessage.second << "} ("
- << minOffset << "-" << maxOffset << ")");
- return CurrentReadingMessage <= *readyThreshold;
-}
-
-template<bool UseMigrationProtocol>
-bool TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
+void TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages,
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages,
- size_t* maxByteSize) const
+ size_t* maxByteSize,
+ TUserRetrievedEventInfoAccumulator<UseMigrationProtocol>* accumulator) const
{
auto& msg = Parent->GetServerMessage();
i64 minOffset = Max<i64>();
@@ -2146,6 +2088,10 @@ bool TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPart
*maxByteSize -= Min(*maxByteSize, messageData.data().size());
+ if (accumulator) {
+ accumulator->OnTakeData(Parent, messageData.data().size());
+ }
+
// Clear data to free internal session's memory.
messageData.clear_data();
@@ -2153,8 +2099,6 @@ bool TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPart
<< "Take Data. Partition " << partitionStream->GetPartitionId()
<< ". Read: {" << Batch << ", " << Message << "} ("
<< minOffset << "-" << maxOffset << ")");
-
- return false;
}
template<bool UseMigrationProtocol>
@@ -2166,6 +2110,28 @@ bool TDataDecompressionInfo<UseMigrationProtocol>::HasReadyUnreadData() const {
return CurrentReadingMessage <= *threshold;
}
+template<bool UseMigrationProtocol>
+void TDataDecompressionInfo<UseMigrationProtocol>::OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount, i64 serverBytesSize)
+{
+ CompressedDataSize -= sourceSize;
+ DecompressedDataSize += decompressedSize;
+
+ if (auto session = Session.lock()) {
+ session->OnDataDecompressed(sourceSize, estimatedDecompressedSize, decompressedSize, messagesCount, serverBytesSize);
+ }
+}
+
+template<bool UseMigrationProtocol>
+void TDataDecompressionInfo<UseMigrationProtocol>::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount)
+{
+ MessagesInflight -= messagesCount;
+ DecompressedDataSize -= decompressedSize;
+
+ if (auto session = Session.lock()) {
+ session->OnUserRetrievedEvent(decompressedSize, messagesCount);
+ }
+}
+
template <bool UseMigrationProtocol>
void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::Add(size_t batch, size_t message,
size_t sourceDataSize,
@@ -2253,9 +2219,7 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
Y_ASSERT(dataProcessed == SourceDataSize);
std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Parent->Session.lock();
- if (session) {
- session->OnDataDecompressed(SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed, Parent->ServerBytesSize);
- }
+ Parent->OnDataDecompressed(SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed, Parent->ServerBytesSize);
Parent->SourceDataNotProcessed -= dataProcessed;
Ready->Ready = true;
@@ -2327,6 +2291,12 @@ void TDeferredActions<UseMigrationProtocol>::DeferSignalWaiter(TWaiter&& waiter)
}
template<bool UseMigrationProtocol>
+void TDeferredActions<UseMigrationProtocol>::DeferDestroyDecompressionInfos(std::vector<TDataDecompressionInfoPtr<UseMigrationProtocol>>&& infos)
+{
+ DecompressionInfos = std::move(infos);
+}
+
+template<bool UseMigrationProtocol>
void TDeferredActions<UseMigrationProtocol>::DoActions() {
Read();
StartExecutorTasks();
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
index 0437f64720f..4caf3ee778c 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
@@ -1444,9 +1444,9 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
const size_t messagesInServerBatchCount = 10; // Many messages in order to fit in two 512 KB-tasks packs.
const size_t messageSize = 1000000;
const size_t batchLimit = std::numeric_limits<size_t>::max();
- const size_t batches = messagesInServerBatchCount;
- const size_t messagesInBatch = 1;
- const size_t expectedDecompressionTasksCount = messagesInServerBatchCount;
+ const size_t batches = 1;
+ const size_t messagesInBatch = 10;
+ const size_t expectedDecompressionTasksCount = 1;
const size_t reorderedCycleSize = 1;
const size_t memoryLimit = 10;
PacksBatchesImpl(serverBatchesCount, messagesInServerBatchCount, messageSize, batchLimit, batches, messagesInBatch, expectedDecompressionTasksCount, reorderedCycleSize, memoryLimit);
@@ -1485,16 +1485,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
UNIT_ASSERT_VALUES_EQUAL(events.size(), 1);
UNIT_ASSERT_EVENT_TYPE(events[0], TReadSessionEvent::TDataReceivedEvent);
TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get<TReadSessionEvent::TDataReceivedEvent>(events[0]);
- UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages().size(), 28);
- UNIT_ASSERT_EQUAL(dataEvent.GetPartitionStream(), stream1);
- }
-
- {
- TVector<TReadSessionEvent::TEvent> events = setup.EventsQueue->GetEvents(true);
- UNIT_ASSERT_VALUES_EQUAL(events.size(), 1);
- UNIT_ASSERT_EVENT_TYPE(events[0], TReadSessionEvent::TDataReceivedEvent);
- TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get<TReadSessionEvent::TDataReceivedEvent>(events[0]);
- UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages().size(), 22);
+ UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages().size(), 50);
UNIT_ASSERT_EQUAL(dataEvent.GetPartitionStream(), stream1);
}
@@ -1503,16 +1494,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
UNIT_ASSERT_VALUES_EQUAL(events.size(), 1);
UNIT_ASSERT_EVENT_TYPE(events[0], TReadSessionEvent::TDataReceivedEvent);
TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get<TReadSessionEvent::TDataReceivedEvent>(events[0]);
- UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages().size(), 27);
- UNIT_ASSERT_EQUAL(dataEvent.GetPartitionStream(), stream2);
- }
-
- {
- TVector<TReadSessionEvent::TEvent> events = setup.EventsQueue->GetEvents(true);
- UNIT_ASSERT_VALUES_EQUAL(events.size(), 1);
- UNIT_ASSERT_EVENT_TYPE(events[0], TReadSessionEvent::TDataReceivedEvent);
- TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get<TReadSessionEvent::TDataReceivedEvent>(events[0]);
- UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages().size(), 23);
+ UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages().size(), 50);
UNIT_ASSERT_EQUAL(dataEvent.GetPartitionStream(), stream2);
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
index f01af91a29c..8e7d930effa 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
@@ -33,7 +33,6 @@ TReadSession::TReadSession(const TReadSessionSettings& settings,
TReadSession::~TReadSession() {
Abort(EStatus::ABORTED, "Aborted");
- WaitAllDecompressionTasks();
ClearAllEvents();
}
@@ -190,12 +189,6 @@ bool TReadSession::Close(TDuration timeout) {
return result;
}
-void TReadSession::WaitAllDecompressionTasks() {
- if (Session) {
- Session->WaitAllDecompressionTasks();
- }
-}
-
void TReadSession::ClearAllEvents() {
EventsQueue->ClearAllEvents();
}
@@ -204,20 +197,13 @@ TStringBuilder TReadSession::GetLogPrefix() const {
return TStringBuilder() << GetDatabaseLogPrefix(DbDriverState->Database) << "[" << SessionId << "] ";
}
-static ELogPriority GetEventLogPriority(const TReadSessionEvent::TEvent& event) {
- if (std::holds_alternative<TReadSessionEvent::TStartPartitionSessionEvent>(event)
- || std::holds_alternative<TReadSessionEvent::TStopPartitionSessionEvent>(event)
- || std::holds_alternative<TReadSessionEvent::TPartitionSessionClosedEvent>(event)
- || std::holds_alternative<TSessionClosedEvent>(event))
- { // Control event.
- return TLOG_INFO;
- } else {
- return TLOG_DEBUG;
- }
-}
-
-void TReadSession::OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) {
- Log.Write(GetEventLogPriority(event), GetLogPrefix() << "Read session event " << DebugString(event));
+void TReadSession::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) {
+ Log.Write(TLOG_DEBUG, GetLogPrefix()
+ << "The application data is transferred to the client. Number of messages "
+ << messagesCount
+ << ", size "
+ << decompressedSize
+ << " bytes");
}
void TReadSession::MakeCountersIfNeeded() {
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h
index bd7329f2760..1ac885691be 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h
@@ -86,7 +86,6 @@ public:
void Abort(TSessionClosedEvent&& closeEvent);
- void WaitAllDecompressionTasks();
void ClearAllEvents();
private:
@@ -97,7 +96,7 @@ private:
void CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false>& deferred);
- void OnUserRetrievedEvent(const TReadSessionEvent::TEvent& event) override;
+ void OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) override;
void MakeCountersIfNeeded();
void DumpCountersToLog(size_t timeNumber = 0);
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 64f58ded093..e0c5dc6564c 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -2125,7 +2125,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Y_UNIT_TEST(CheckKillBalancer) {
NPersQueue::TTestServer server;
server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::PQ_READ_PROXY});
- PrepareForGrpc(server);
+ server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 2);
auto driver = server.AnnoyingClient->GetDriver();
auto decompressor = CreateThreadPoolExecutorWrapper(2);
@@ -2135,6 +2135,20 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
settings.DecompressionExecutor(decompressor);
auto reader = CreateReader(*driver, settings);
+ auto counters = reader->GetCounters();
+
+ auto DumpCounters = [&](const char *message) {
+ Cerr << "===== " << message << " =====" << Endl;
+ Cerr << "MessagesInflight: " << counters->MessagesInflight->Val() << Endl;
+ Cerr << "BytesInflightUncompressed: " << counters->BytesInflightUncompressed->Val() << Endl;
+ Cerr << "BytesInflightCompressed: " << counters->BytesInflightCompressed->Val() << Endl;
+ Cerr << "BytesInflightTotal: " << counters->BytesInflightTotal->Val() << Endl;
+ Cerr << "============" << Endl;
+ };
+
+ DumpCounters("CreateReader");
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0);
for (ui32 i = 0; i < 2; ++i) {
auto msg = reader->GetEvent(true, 1);
@@ -2158,7 +2172,9 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT(res);
}
+ DumpCounters("Write");
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 10);
ui32 createEv = 0, destroyEv = 0, dataEv = 0;
std::vector<ui32> gotDestroy{0, 0};
@@ -2196,37 +2212,440 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
decompressor->StartFuncs({0, 1, 2, 3, 4});
+ DumpCounters("StartFuncs-1");
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 10);
+
for (ui32 i = 0; i < 5; ++i) {
doRead();
}
UNIT_ASSERT_VALUES_EQUAL(dataEv, 5);
- server.AnnoyingClient->RestartBalancerTablet(server.CleverServer->GetRuntime(), "rt3.dc1--topic1");
- Cerr << "Balancer killed\n";
+ DumpCounters("doRead(5)");
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 5);
+
+ Cerr << ">>>> Restart balancer" << Endl;
+ server.AnnoyingClient->RestartBalancerTablet(server.CleverServer->GetRuntime(), DEFAULT_TOPIC_NAME);
+ Cerr << ">>>> Balancer restarted" << Endl;
Sleep(TDuration::Seconds(5));
+ decompressor->StartFuncs({5, 6, 7, 8, 9});
+
+ DumpCounters("StartFuncs-2");
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 5);
+
for (ui32 i = 0; i < 4; ++i) {
doRead();
}
UNIT_ASSERT_VALUES_EQUAL(createEv, 2);
UNIT_ASSERT_VALUES_EQUAL(destroyEv, 2);
-
UNIT_ASSERT_VALUES_EQUAL(dataEv, 5);
- decompressor->StartFuncs({5, 6, 7, 8, 9});
+ DumpCounters("doRead(4)");
Sleep(TDuration::Seconds(5));
auto msg = reader->GetEvent(false, 1);
-
UNIT_ASSERT(!msg);
UNIT_ASSERT(!reader->WaitEvent().Wait(TDuration::Seconds(1)));
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightUncompressed->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), 0);
+
+ DumpCounters("End");
+ }
+
+ Y_UNIT_TEST(CheckDeleteTopic) {
+ NPersQueue::TTestServer server;
+ server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::PQ_READ_PROXY});
+ server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 2);
+
+ auto driver = server.AnnoyingClient->GetDriver();
+ auto decompressor = CreateThreadPoolExecutorWrapper(2);
+
+ NYdb::NPersQueue::TReadSessionSettings settings;
+ settings.ConsumerName("shared/user").AppendTopics(SHORT_TOPIC_NAME).ReadOriginal({"dc1"});
+ settings.DecompressionExecutor(decompressor);
+ auto reader = CreateReader(*driver, settings);
+
+ auto counters = reader->GetCounters();
+
+ auto DumpCounters = [&](const char *message) {
+ Cerr << "===== " << message << " =====" << Endl;
+ Cerr << "MessagesInflight: " << counters->MessagesInflight->Val() << Endl;
+ Cerr << "BytesInflightUncompressed: " << counters->BytesInflightUncompressed->Val() << Endl;
+ Cerr << "BytesInflightCompressed: " << counters->BytesInflightCompressed->Val() << Endl;
+ Cerr << "BytesInflightTotal: " << counters->BytesInflightTotal->Val() << Endl;
+ Cerr << "============" << Endl;
+ };
+
+ DumpCounters("CreateReader");
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0);
+
+ for (ui32 i = 0; i < 2; ++i) {
+ auto msg = reader->GetEvent(true, 1);
+ UNIT_ASSERT(msg);
+
+ Cerr << NYdb::NPersQueue::DebugString(*msg) << "\n";
+
+ auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*msg);
+
+ UNIT_ASSERT(ev);
+
+ ev->Confirm();
+ }
+
+
+ for (ui32 i = 0; i < 10; ++i) {
+ auto writer = CreateSimpleWriter(*driver, SHORT_TOPIC_NAME, TStringBuilder() << "source" << i);
+ bool res = writer->Write("valuevaluevalue", 1);
+ UNIT_ASSERT(res);
+ res = writer->Close(TDuration::Seconds(10));
+ UNIT_ASSERT(res);
+ }
+
+ DumpCounters("Write");
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 10);
+
+ ui32 createEv = 0, destroyEv = 0, dataEv = 0;
+ std::vector<ui32> gotDestroy{0, 0};
+
+ auto doRead = [&]() {
+ auto msg = reader->GetEvent(true, 1);
+ UNIT_ASSERT(msg);
+
+ Cerr << "Got message: " << NYdb::NPersQueue::DebugString(*msg) << "\n";
+
+
+ if (std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&*msg)) {
+ ++dataEv;
+ return;
+ }
+
+ auto ev1 = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent>(&*msg);
+ auto ev2 = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*msg);
+
+ UNIT_ASSERT(ev1 || ev2);
+
+ if (ev1) {
+ ++destroyEv;
+ UNIT_ASSERT(ev1->GetPartitionStream()->GetPartitionId() < 2);
+ gotDestroy[ev1->GetPartitionStream()->GetPartitionId()]++;
+ }
+ if (ev2) {
+ ev2->Confirm(ev2->GetEndOffset());
+ ++createEv;
+ UNIT_ASSERT(ev2->GetPartitionStream()->GetPartitionId() < 2);
+ UNIT_ASSERT_VALUES_EQUAL(gotDestroy[ev2->GetPartitionStream()->GetPartitionId()], 1);
+
+ }
+ };
+
+ decompressor->StartFuncs({0, 1, 2, 3, 4});
+
+ DumpCounters("StartFuncs-1");
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 10);
+
+ for (ui32 i = 0; i < 5; ++i) {
+ doRead();
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(dataEv, 5);
+
+ DumpCounters("doRead(5)");
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 5);
+
+ Cerr << ">>>> Delete topic" << Endl;
+ server.AnnoyingClient->DeleteTopic2(DEFAULT_TOPIC_NAME);
+ Cerr << ">>>> Topic deleted" << Endl;
+
+ Sleep(TDuration::Seconds(5));
+
+ doRead();
+ doRead();
+
+ //
+ // there should be 2 TPartitionStreamClosedEvent events in the queue
+ //
+ UNIT_ASSERT_VALUES_EQUAL(createEv, 0);
+ UNIT_ASSERT_VALUES_EQUAL(destroyEv, 2);
+ UNIT_ASSERT_VALUES_EQUAL(dataEv, 5);
+
+ decompressor->StartFuncs({5, 6, 7, 8, 9});
+
+ DumpCounters("StartFuncs-2");
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 5);
+
+ Sleep(TDuration::Seconds(5));
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightUncompressed->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), 0);
+
+ DumpCounters("End");
+ }
+
+ enum WhenTheTopicIsDeletedMode {
+ AFTER_WRITES,
+ AFTER_START_TASKS,
+ AFTER_DOREAD
+ };
+
+ void WhenTheTopicIsDeletedImpl(WhenTheTopicIsDeletedMode mode, i64 maxMemoryUsageSize, bool decompress, i64 decompressedSize, i64 compressedSize) {
+ NPersQueue::TTestServer server;
+ server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::PQ_READ_PROXY});
+ server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1);
+
+ auto driver = server.AnnoyingClient->GetDriver();
+ auto decompressor = CreateThreadPoolExecutorWrapper(2);
+
+ NYdb::NPersQueue::TReadSessionSettings settings;
+ settings.ConsumerName("shared/user").AppendTopics(SHORT_TOPIC_NAME).ReadOriginal({"dc1"});
+ settings.DecompressionExecutor(decompressor);
+ settings.MaxMemoryUsageBytes(maxMemoryUsageSize);
+ settings.Decompress(decompress);
+
+ auto reader = CreateReader(*driver, settings);
+ auto counters = reader->GetCounters();
+
+ auto DumpCounters = [&](const char *message) {
+ Cerr << "===== " << message << " =====" << Endl;
+ Cerr << "MessagesInflight: " << counters->MessagesInflight->Val() << Endl;
+ Cerr << "BytesInflightUncompressed: " << counters->BytesInflightUncompressed->Val() << Endl;
+ Cerr << "BytesInflightCompressed: " << counters->BytesInflightCompressed->Val() << Endl;
+ Cerr << "BytesInflightTotal: " << counters->BytesInflightTotal->Val() << Endl;
+ Cerr << "============" << Endl;
+ };
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0);
+
+ //
+ // there should be 1 TCreatePartitionStreamEvent events in the queue
+ //
+ {
+ auto msg = reader->GetEvent(true, 1);
+ UNIT_ASSERT(msg);
+
+ Cerr << ">>>> message: " << NYdb::NPersQueue::DebugString(*msg) << Endl;
+
+ auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*msg);
+ UNIT_ASSERT(ev);
+
+ ev->Confirm();
+ }
+
+ for (ui32 i = 0; i < 2; ++i) {
+ std::optional<TString> codec;
+ if (!decompress) {
+ codec = "raw";
+ }
+
+ auto writer = CreateSimpleWriter(*driver, SHORT_TOPIC_NAME, TStringBuilder() << "source" << i, {}, codec);
+
+ std::string message(decompressedSize, 'x');
+
+ bool res = writer->Write(message, 1);
+ UNIT_ASSERT(res);
+
+ res = writer->Close(TDuration::Seconds(10));
+ UNIT_ASSERT(res);
+ }
+
+ Sleep(TDuration::Seconds(1));
+
+ DumpCounters("write");
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), compressedSize);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), compressedSize);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightUncompressed->Val(), 0);
+
+ if (mode == AFTER_WRITES) {
+ Cerr << ">>>> Delete topic" << Endl;
+ server.AnnoyingClient->DeleteTopic2(DEFAULT_TOPIC_NAME);
+ Cerr << ">>>> Topic deleted" << Endl;
+
+ auto msg = reader->GetEvent(true, 1);
+ UNIT_ASSERT(msg);
+ UNIT_ASSERT(std::get_if<NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent>(&*msg));
+
+ decompressor->RunAllTasks();
+ Sleep(TDuration::Seconds(1));
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightUncompressed->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), 0);
+
+ return;
+ }
+
+ ui32 dataEv = 0;
+
+ auto doRead = [&]() {
+ auto msg = reader->GetEvent(true, 1);
+ UNIT_ASSERT(msg);
+
+ if (!std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&*msg)) {
+ UNIT_FAIL("a TDataReceivedEvent event is expected");
+ }
+
+ ++dataEv;
+ };
+
+ decompressor->StartFuncs({0});
+ Sleep(TDuration::Seconds(1));
+
+ DumpCounters("task #0");
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), compressedSize + decompressedSize);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), compressedSize);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightUncompressed->Val(), decompressedSize);
+
+ if (mode == AFTER_START_TASKS) {
+ Cerr << ">>>> Delete topic" << Endl;
+ server.AnnoyingClient->DeleteTopic2(DEFAULT_TOPIC_NAME);
+ Cerr << ">>>> Topic deleted" << Endl;
+
+ auto msg = reader->GetEvent(true, 1);
+ UNIT_ASSERT(msg);
+ UNIT_ASSERT(std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&*msg));
+
+ msg = reader->GetEvent(true, 1);
+ UNIT_ASSERT(msg);
+ UNIT_ASSERT(std::get_if<NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent>(&*msg));
+
+ decompressor->RunAllTasks();
+ Sleep(TDuration::Seconds(1));
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightUncompressed->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), 0);
+
+ return;
+ }
+
+ doRead();
+ Sleep(TDuration::Seconds(1));
+
+ UNIT_ASSERT_VALUES_EQUAL(dataEv, 1);
+
+ DumpCounters("read");
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), compressedSize);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), compressedSize);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightUncompressed->Val(), 0);
+
+ if (mode == AFTER_DOREAD) {
+ Cerr << ">>>> Delete topic" << Endl;
+ server.AnnoyingClient->DeleteTopic2(DEFAULT_TOPIC_NAME);
+ Cerr << ">>>> Topic deleted" << Endl;
+
+ auto msg = reader->GetEvent(true, 1);
+ UNIT_ASSERT(msg);
+ UNIT_ASSERT(std::get_if<NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent>(&*msg));
+
+ decompressor->RunAllTasks();
+ Sleep(TDuration::Seconds(1));
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightUncompressed->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), 0);
+
+ return;
+ }
+
+ UNIT_FAIL("incorrect mode");
+ }
+
+ Y_UNIT_TEST(WhenTheTopicIsDeletedBeforeDataIsDecompressed_Compressed) {
+ WhenTheTopicIsDeletedImpl(AFTER_WRITES, 1_MB + 1, true, 1_MB - 1_KB, 1050);
+ }
+
+ Y_UNIT_TEST(WhenTheTopicIsDeletedAfterDecompressingTheData_Compressed) {
+ WhenTheTopicIsDeletedImpl(AFTER_START_TASKS, 1_MB + 1, true, 1_MB - 1_KB, 1050);
+ }
+
+ Y_UNIT_TEST(WhenTheTopicIsDeletedAfterReadingTheData_Compressed) {
+ WhenTheTopicIsDeletedImpl(AFTER_DOREAD, 1_MB + 1, true, 1_MB - 1_KB, 1050);
}
+
+ Y_UNIT_TEST(WhenTheTopicIsDeletedBeforeDataIsDecompressed_Uncompressed) {
+ WhenTheTopicIsDeletedImpl(AFTER_WRITES, 1_MB + 1, false, 1_MB - 1_KB, 1_MB - 1_KB);
+ }
+
+ Y_UNIT_TEST(WhenTheTopicIsDeletedAfterDecompressingTheData_Uncompressed) {
+ WhenTheTopicIsDeletedImpl(AFTER_START_TASKS, 1_MB + 1, false, 1_MB - 1_KB, 1_MB - 1_KB);
+ }
+
+ Y_UNIT_TEST(WhenTheTopicIsDeletedAfterReadingTheData_Uncompressed) {
+ WhenTheTopicIsDeletedImpl(AFTER_DOREAD, 1_MB + 1, false, 1_MB - 1_KB, 1_MB - 1_KB);
+ }
+
+ Y_UNIT_TEST(CheckDecompressionTasksWithoutSession) {
+ NPersQueue::TTestServer server;
+ server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::PQ_READ_PROXY});
+ server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1);
+
+ auto driver = server.AnnoyingClient->GetDriver();
+ auto decompressor = CreateThreadPoolExecutorWrapper(2);
+
+ NYdb::NPersQueue::TReadSessionSettings settings;
+ settings.ConsumerName("shared/user").AppendTopics(SHORT_TOPIC_NAME).ReadOriginal({"dc1"});
+ settings.DecompressionExecutor(decompressor);
+
+ auto reader = CreateReader(*driver, settings);
+ auto counters = reader->GetCounters();
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 0);
+ {
+ auto msg = reader->GetEvent(true, 1);
+ UNIT_ASSERT(msg);
+
+ auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*msg);
+ UNIT_ASSERT(ev);
+
+ ev->Confirm();
+ }
+
+ for (ui32 i = 0; i < 2; ++i) {
+ auto writer = CreateSimpleWriter(*driver, SHORT_TOPIC_NAME, TStringBuilder() << "source" << i);
+
+ bool res = writer->Write("abracadabra", 1);
+ UNIT_ASSERT(res);
+
+ res = writer->Close(TDuration::Seconds(10));
+ UNIT_ASSERT(res);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 2);
+
+ reader = nullptr;
+
+ decompressor->RunAllTasks();
+ Sleep(TDuration::Seconds(1));
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->MessagesInflight->Val(), 2);
+ }
Y_UNIT_TEST(TestWriteStat) {
auto testWriteStat = [](const TString& originallyProvidedConsumerName,
diff --git a/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp b/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
index f8177513507..336168c3690 100644
--- a/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
+++ b/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
@@ -43,6 +43,17 @@ size_t FunctionExecutorWrapper::GetFuncsCount() const
}
}
+void FunctionExecutorWrapper::RunAllTasks()
+{
+ with_lock (Mutex) {
+ for (auto& func : Funcs) {
+ if (func) {
+ Executor->Post(std::move(func));
+ }
+ }
+ }
+}
+
TIntrusivePtr<FunctionExecutorWrapper> CreateThreadPoolExecutorWrapper(size_t threads)
{
return MakeIntrusive<FunctionExecutorWrapper>(NYdb::NPersQueue::CreateThreadPoolExecutor(threads));
diff --git a/ydb/services/persqueue_v1/ut/functions_executor_wrapper.h b/ydb/services/persqueue_v1/ut/functions_executor_wrapper.h
index c895ee15d53..665d2d154f9 100644
--- a/ydb/services/persqueue_v1/ut/functions_executor_wrapper.h
+++ b/ydb/services/persqueue_v1/ut/functions_executor_wrapper.h
@@ -20,6 +20,8 @@ public:
size_t GetFuncsCount() const;
+ void RunAllTasks();
+
private:
void DoStart() override;