diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-19 16:13:22 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-19 16:13:22 +0300 |
commit | eab4a30d4e945a439121bf79f196e3a8465fbd26 (patch) | |
tree | 431941a447930b734fc0ef4ea8638dc62d7b3389 | |
parent | 9e147dc3c7f690495eaf8ac92fb39971f7c93e81 (diff) | |
download | ydb-eab4a30d4e945a439121bf79f196e3a8465fbd26.tar.gz |
fix case ready granule's size correction and result blobssize verification failed
-rw-r--r-- | ydb/core/tx/columnshard/engines/reader/granule.cpp | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp index 17b66ed984..8b7657f4e0 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -7,10 +7,8 @@ namespace NKikimr::NOlap::NIndexedReader { void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) { - if (Owner->GetSortingPolicy()->CanInterrupt()) { - if (ReadyFlag) { - return; - } + if (Owner->GetSortingPolicy()->CanInterrupt() && ReadyFlag) { + return; } AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_batch")("granule_id", GranuleId) ("batch_address", batchInfo.GetBatchAddress().ToString())("count", WaitBatches.size()); @@ -94,6 +92,10 @@ std::deque<TGranule::TBatchForMerge> TGranule::SortBatchesByPK(const bool revers } void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) { + if (Owner->GetSortingPolicy()->CanInterrupt() && ReadyFlag) { + return; + } + Y_VERIFY(!ReadyFlag); Y_VERIFY(!NotIndexedBatchReadyFlag || !batch); if (!NotIndexedBatchReadyFlag) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_batch")("granule_id", GranuleId)("batch_no", "add_not_indexed_batch")("count", WaitBatches.size()); @@ -122,6 +124,10 @@ void TGranule::CheckReady() { } void TGranule::OnBlobReady(const TBlobRange& range) noexcept { + if (Owner->GetSortingPolicy()->CanInterrupt() && ReadyFlag) { + return; + } + Y_VERIFY(!ReadyFlag); BlobsDataSize += range.Size; Owner->OnBlobReady(GranuleId, range); } |