diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-06-20 12:07:52 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-06-20 12:07:52 +0300 |
commit | 8cff0d14b260196f30f555b3a713cc691244ad1c (patch) | |
tree | 5ba14bc42b8b382626e4ace0165dc51adf64760a | |
parent | 260dfc6bf9796ddc322e117e730a624f2418a767 (diff) | |
download | ydb-8cff0d14b260196f30f555b3a713cc691244ad1c.tar.gz |
Don't read from PQ when we have zero free space
-rw-r--r-- | ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp | 29 |
1 files changed, 19 insertions, 10 deletions
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index e415a2f75c..5785aa909a 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -288,17 +288,22 @@ private: return usedSpace; } - auto events = GetReadSession().GetEvents(false, TMaybe<size_t>(), static_cast<size_t>(Max<i64>(freeSpace, 0))); + bool recheckBatch = false; - ui32 batchItemsEstimatedCount = 0; - for (auto& event : events) { - if (const auto* val = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&event)) { - batchItemsEstimatedCount += val->GetMessages().size(); + if (freeSpace > 0) { + auto events = GetReadSession().GetEvents(false, TMaybe<size_t>(), static_cast<size_t>(freeSpace)); + recheckBatch = !events.empty(); + + ui32 batchItemsEstimatedCount = 0; + for (auto& event : events) { + if (const auto* val = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&event)) { + batchItemsEstimatedCount += val->GetMessages().size(); + } } - } - for (auto& event : events) { - std::visit(TPQEventProcessor{*this, batchItemsEstimatedCount, LogPrefix}, event); + for (auto& event : events) { + std::visit(TPQEventProcessor{*this, batchItemsEstimatedCount, LogPrefix}, event); + } } if (WatermarkTracker) { @@ -308,11 +313,15 @@ private: const auto t = watermark; SRC_LOG_T("Fake watermark " << t << " was produced"); PushWatermarkToReady(*watermark); + recheckBatch = true; } } - if (MaybeReturnReadyBatch(buffer, watermark, usedSpace)) { - return usedSpace; + if (recheckBatch) { + usedSpace = 0; + if (MaybeReturnReadyBatch(buffer, watermark, usedSpace)) { + return usedSpace; + } } watermark = Nothing(); |