aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-06-19 16:13:22 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-06-19 16:13:22 +0300
commiteab4a30d4e945a439121bf79f196e3a8465fbd26 (patch)
tree431941a447930b734fc0ef4ea8638dc62d7b3389
parent9e147dc3c7f690495eaf8ac92fb39971f7c93e81 (diff)
downloadydb-eab4a30d4e945a439121bf79f196e3a8465fbd26.tar.gz
fix case ready granule's size correction and result blobssize verification failed
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp14
1 files changed, 10 insertions, 4 deletions
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
index 17b66ed984..8b7657f4e0 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -7,10 +7,8 @@
namespace NKikimr::NOlap::NIndexedReader {
void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) {
- if (Owner->GetSortingPolicy()->CanInterrupt()) {
- if (ReadyFlag) {
- return;
- }
+ if (Owner->GetSortingPolicy()->CanInterrupt() && ReadyFlag) {
+ return;
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_batch")("granule_id", GranuleId)
("batch_address", batchInfo.GetBatchAddress().ToString())("count", WaitBatches.size());
@@ -94,6 +92,10 @@ std::deque<TGranule::TBatchForMerge> TGranule::SortBatchesByPK(const bool revers
}
void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) {
+ if (Owner->GetSortingPolicy()->CanInterrupt() && ReadyFlag) {
+ return;
+ }
+ Y_VERIFY(!ReadyFlag);
Y_VERIFY(!NotIndexedBatchReadyFlag || !batch);
if (!NotIndexedBatchReadyFlag) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_batch")("granule_id", GranuleId)("batch_no", "add_not_indexed_batch")("count", WaitBatches.size());
@@ -122,6 +124,10 @@ void TGranule::CheckReady() {
}
void TGranule::OnBlobReady(const TBlobRange& range) noexcept {
+ if (Owner->GetSortingPolicy()->CanInterrupt() && ReadyFlag) {
+ return;
+ }
+ Y_VERIFY(!ReadyFlag);
BlobsDataSize += range.Size;
Owner->OnBlobReady(GranuleId, range);
}