aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-06-20 12:07:52 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-06-20 12:07:52 +0300
commit8cff0d14b260196f30f555b3a713cc691244ad1c (patch)
tree5ba14bc42b8b382626e4ace0165dc51adf64760a
parent260dfc6bf9796ddc322e117e730a624f2418a767 (diff)
downloadydb-8cff0d14b260196f30f555b3a713cc691244ad1c.tar.gz
Don't read from PQ when we have zero free space
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp29
1 files changed, 19 insertions, 10 deletions
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
index e415a2f75c..5785aa909a 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
@@ -288,17 +288,22 @@ private:
return usedSpace;
}
- auto events = GetReadSession().GetEvents(false, TMaybe<size_t>(), static_cast<size_t>(Max<i64>(freeSpace, 0)));
+ bool recheckBatch = false;
- ui32 batchItemsEstimatedCount = 0;
- for (auto& event : events) {
- if (const auto* val = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&event)) {
- batchItemsEstimatedCount += val->GetMessages().size();
+ if (freeSpace > 0) {
+ auto events = GetReadSession().GetEvents(false, TMaybe<size_t>(), static_cast<size_t>(freeSpace));
+ recheckBatch = !events.empty();
+
+ ui32 batchItemsEstimatedCount = 0;
+ for (auto& event : events) {
+ if (const auto* val = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&event)) {
+ batchItemsEstimatedCount += val->GetMessages().size();
+ }
}
- }
- for (auto& event : events) {
- std::visit(TPQEventProcessor{*this, batchItemsEstimatedCount, LogPrefix}, event);
+ for (auto& event : events) {
+ std::visit(TPQEventProcessor{*this, batchItemsEstimatedCount, LogPrefix}, event);
+ }
}
if (WatermarkTracker) {
@@ -308,11 +313,15 @@ private:
const auto t = watermark;
SRC_LOG_T("Fake watermark " << t << " was produced");
PushWatermarkToReady(*watermark);
+ recheckBatch = true;
}
}
- if (MaybeReturnReadyBatch(buffer, watermark, usedSpace)) {
- return usedSpace;
+ if (recheckBatch) {
+ usedSpace = 0;
+ if (MaybeReturnReadyBatch(buffer, watermark, usedSpace)) {
+ return usedSpace;
+ }
}
watermark = Nothing();