diff options
author | ivanmorozov <[email protected]> | 2023-06-15 15:07:28 +0300 |
---|---|---|
committer | ivanmorozov <[email protected]> | 2023-06-15 15:07:28 +0300 |
commit | e85294245a1a7a68282ac41df190e4061a249a29 (patch) | |
tree | 7673a11c32a5855d1cc6d802afe4f1b3139a4475 | |
parent | c4f4c306e4d45da901caa1094ca630535fb7cbbd (diff) |
optimize granules usage on scan
18 files changed, 112 insertions, 107 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 8d1f53df638..92a188a5a8d 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -142,25 +142,26 @@ private: InFlightReadBytes += blobRange.Size; ranges[blobRange.BlobId].emplace_back(blobRange); } - if (ranges.size()) { - auto& externBlobs = ReadMetadataRanges[ReadMetadataIndex]->ExternBlobs; - for (auto&& i : ranges) { - bool fallback = externBlobs && externBlobs->contains(i.first); - NBlobCache::TReadBlobRangeOptions readOpts{ - .CacheAfterRead = true, - .ForceFallback = fallback, - .IsBackgroud = false - }; - ui32 size = 0; - for (auto&& s : i.second) { - size += s.Size; - } - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " blobs request:" << i.first << "/" << i.second.size() << "/" << size - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); - Stats.RequestSent(i.second); - Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(i.second), std::move(readOpts))); + if (!ranges.size()) { + return true; + } + auto& externBlobs = ReadMetadataRanges[ReadMetadataIndex]->ExternBlobs; + for (auto&& i : ranges) { + bool fallback = externBlobs && externBlobs->contains(i.first); + NBlobCache::TReadBlobRangeOptions readOpts{ + .CacheAfterRead = true, + .ForceFallback = fallback, + .IsBackgroud = false + }; + ui32 size = 0; + for (auto&& s : i.second) { + size += s.Size; } + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, + "Scan " << ScanActorId << " blobs request:" << i.first << "/" << i.second.size() << "/" << size + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + Stats.RequestSent(i.second); + Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(i.second), std::move(readOpts))); } return true; } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 34fe9d09952..aa574c533b9 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -855,7 +855,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { THashSet<ui64> excludedPortions; for (const auto& portionInfo : changes->PortionsToDrop) { ui64 portionId = portionInfo.Records.front().Portion; - // Exclude portions that are used by in-flght reads/scans + // Exclude portions that are used by in-flight reads/scans if (!InFlightReadsTracker.IsPortionUsed(portionId)) { portionsCanBedropped.push_back(portionInfo); } else { diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 7da2687974a..b1b6e6c9052 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -143,8 +143,8 @@ void TIndexedReadData::InitRead(ui32 inputBatch) { portionsBytes += portionInfo.BlobsBytes(); Y_VERIFY_S(portionInfo.Records.size(), "ReadMeatadata: " << *ReadMetadata); - NIndexedReader::TGranule& granule = GranulesContext->UpsertGranule(portionInfo.Records[0].Granule); - granule.AddBatch(portionInfo); + NIndexedReader::TGranule::TPtr granule = GranulesContext->UpsertGranule(portionInfo.Records[0].Granule); + granule->AddBatch(portionInfo); } GranulesContext->PrepareForStart(); @@ -265,7 +265,7 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData:: Y_VERIFY(GranulesContext); auto& indexInfo = ReadMetadata->GetIndexInfo(); - std::vector<NIndexedReader::TGranule*> ready = GranulesContext->DetachReadyInOrder(); + std::vector<NIndexedReader::TGranule::TPtr> ready = GranulesContext->DetachReadyInOrder(); std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> out; out.reserve(ready.size() + 1); diff --git a/ydb/core/tx/columnshard/engines/reader/common.cpp b/ydb/core/tx/columnshard/engines/reader/common.cpp index 5e29cd8c9c1..46168595a26 100644 --- a/ydb/core/tx/columnshard/engines/reader/common.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common.cpp @@ -1,17 +1,17 @@ -#include "common.h"
-#include <util/string/builder.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-TString TBatchAddress::ToString() const {
- return TStringBuilder() << GranuleIdx << "," << BatchGranuleIdx;
-}
-
-TBatchAddress::TBatchAddress(const ui32 granuleIdx, const ui32 batchGranuleIdx) - : GranuleIdx(granuleIdx)
+#include "common.h" +#include <util/string/builder.h> + +namespace NKikimr::NOlap::NIndexedReader { + +TString TBatchAddress::ToString() const { + return TStringBuilder() << GranuleId << "," << BatchGranuleIdx; +} + +TBatchAddress::TBatchAddress(const ui32 granuleId, const ui32 batchGranuleIdx) + : GranuleId(granuleId) , BatchGranuleIdx(batchGranuleIdx) -{
-
-}
-
+{ + +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/common.h b/ydb/core/tx/columnshard/engines/reader/common.h index 85f684c482d..30ad233b2e2 100644 --- a/ydb/core/tx/columnshard/engines/reader/common.h +++ b/ydb/core/tx/columnshard/engines/reader/common.h @@ -7,15 +7,15 @@ namespace NKikimr::NOlap::NIndexedReader { class TBatchAddress { private: - ui32 GranuleIdx = 0; + ui32 GranuleId = 0; ui32 BatchGranuleIdx = 0; public: TString ToString() const; - TBatchAddress(const ui32 granuleIdx, const ui32 batchGranuleIdx); + TBatchAddress(const ui32 granuleId, const ui32 batchGranuleIdx); - ui32 GetGranuleIdx() const { - return GranuleIdx; + ui32 GetGranuleId() const { + return GranuleId; } ui32 GetBatchGranuleIdx() const { diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp index 4eb488aee46..372efaaaf82 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp @@ -45,10 +45,13 @@ void TGranulesFillingContext::OnBatchReady(const NIndexedReader::TBatch& batchIn return Owner.OnBatchReady(batchInfo, batch); } -NKikimr::NOlap::NIndexedReader::TBatch& TGranulesFillingContext::GetBatchInfo(const TBatchAddress& address) { - Y_VERIFY(address.GetGranuleIdx() < GranulesStorage.size()); - auto& g = GranulesStorage[address.GetGranuleIdx()]; - return g.GetBatchInfo(address.GetBatchGranuleIdx()); +NIndexedReader::TBatch* TGranulesFillingContext::GetBatchInfo(const TBatchAddress& address) { + auto it = GranulesWaiting.find(address.GetGranuleId()); + if (it == GranulesWaiting.end()) { + return nullptr; + } else { + return &it->second->GetBatchInfo(address.GetBatchGranuleIdx()); + } } NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::GetTasksProcessor() const { @@ -56,15 +59,15 @@ NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::Get } void TGranulesFillingContext::DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches) { - for (auto&& g : GranulesStorage) { + for (auto&& [_, gPtr] : GranulesWaiting) { if (!batches) { - g.AddNotIndexedBatch(nullptr); + gPtr->AddNotIndexedBatch(nullptr); } else { - auto it = batches->find(g.GetGranuleId()); + auto it = batches->find(gPtr->GetGranuleId()); if (it == batches->end()) { - g.AddNotIndexedBatch(nullptr); + gPtr->AddNotIndexedBatch(nullptr); } else { - g.AddNotIndexedBatch(it->second); + gPtr->AddNotIndexedBatch(it->second); } batches->erase(it); } diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h index d9f0297497d..5465ac6727d 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -17,10 +17,9 @@ private: TReadMetadata::TConstPtr ReadMetadata; const bool InternalReading = false; TIndexedReadData& Owner; - THashMap<ui64, NIndexedReader::TGranule*> GranulesToOut; + THashMap<ui64, NIndexedReader::TGranule::TPtr> GranulesToOut; std::set<ui64> ReadyGranulesAccumulator; - std::deque<NIndexedReader::TGranule> GranulesStorage; - THashMap<ui64, NIndexedReader::TGranule*> GranulesUpserted; + THashMap<ui64, NIndexedReader::TGranule::TPtr> GranulesWaiting; std::set<ui32> EarlyFilterColumns; std::set<ui32> PostFilterColumns; std::set<ui32> FilterStageColumns; @@ -67,18 +66,18 @@ public: NColumnShard::TDataTasksProcessorContainer GetTasksProcessor() const; void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches); - TBatch& GetBatchInfo(const TBatchAddress& address); + NIndexedReader::TBatch* GetBatchInfo(const TBatchAddress& address); void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch); void OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch); - NIndexedReader::TGranule& GetGranuleVerified(const ui64 granuleId) { - auto it = GranulesUpserted.find(granuleId); - Y_VERIFY(it != GranulesUpserted.end()); - return *it->second; + TGranule::TPtr GetGranuleVerified(const ui64 granuleId) { + auto it = GranulesWaiting.find(granuleId); + Y_VERIFY(it != GranulesWaiting.end()); + return it->second; } - bool IsInProgress() const { return GranulesStorage.size() > ReadyGranulesAccumulator.size(); } + bool IsInProgress() const { return GranulesWaiting.size(); } void OnNewBatch(TBatch& batch) { if (!InternalReading && PredictEmptyAfterFilter(batch.GetPortionInfo())) { @@ -88,35 +87,33 @@ public: } } - std::vector<TGranule*> DetachReadyInOrder() { + std::vector<TGranule::TPtr> DetachReadyInOrder() { Y_VERIFY(SortingPolicy); return SortingPolicy->DetachReadyGranules(GranulesToOut); } void Abort() { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort"); - for (auto&& i : GranulesStorage) { - ReadyGranulesAccumulator.emplace(i.GetGranuleId()); - } + GranulesWaiting.clear(); AbortedFlag = true; - Y_VERIFY(ReadyGranulesAccumulator.size() == GranulesStorage.size()); Y_VERIFY(!IsInProgress()); } - TGranule& UpsertGranule(const ui64 granuleId) { - auto itGranule = GranulesUpserted.find(granuleId); - if (itGranule == GranulesUpserted.end()) { - GranulesStorage.emplace_back(NIndexedReader::TGranule(granuleId, GranulesStorage.size(), *this)); - itGranule = GranulesUpserted.emplace(granuleId, &GranulesStorage.back()).first; + TGranule::TPtr UpsertGranule(const ui64 granuleId) { + auto itGranule = GranulesWaiting.find(granuleId); + if (itGranule == GranulesWaiting.end()) { + itGranule = GranulesWaiting.emplace(granuleId, std::make_shared<TGranule>(granuleId, *this)).first; } - return *itGranule->second; + return itGranule->second; } - void OnGranuleReady(TGranule& granule) { - Y_VERIFY(GranulesToOut.emplace(granule.GetGranuleId(), &granule).second); - Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second || AbortedFlag); - GranulesInProcessing.erase(granule.GetGranuleId()); - BlobsSizeInProcessing -= granule.GetBlobsDataSize(); + void OnGranuleReady(const ui64 granuleId) { + auto granule = GetGranuleVerified(granuleId); + Y_VERIFY(GranulesToOut.emplace(granule->GetGranuleId(), granule).second); + Y_VERIFY(ReadyGranulesAccumulator.emplace(granule->GetGranuleId()).second || AbortedFlag); + Y_VERIFY(GranulesWaiting.erase(granuleId)); + GranulesInProcessing.erase(granule->GetGranuleId()); + BlobsSizeInProcessing -= granule->GetBlobsDataSize(); Y_VERIFY(BlobsSizeInProcessing >= 0); } diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp index 7389d88286c..aaf404fd297 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp @@ -51,11 +51,13 @@ bool TAssembleFilter::DoExecuteImpl() { } bool TAssembleFilter::DoApply(TGranulesFillingContext& owner) const { - TBatch& batch = owner.GetBatchInfo(BatchAddress); Y_VERIFY(OriginalCount); owner.GetCounters().OriginalRowsCount->Add(OriginalCount); owner.GetCounters().AssembleFilterCount->Add(1); - batch.InitFilter(Filter, FilteredBatch, OriginalCount, EarlyFilter); + TBatch* batch = owner.GetBatchInfo(BatchAddress); + if (batch) { + batch->InitFilter(Filter, FilteredBatch, OriginalCount, EarlyFilter); + } return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp index eeac416b75b..7f419648b29 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -37,7 +37,7 @@ NKikimr::NOlap::NIndexedReader::TBatch& TGranule::AddBatch(const TPortionInfo& p Y_VERIFY(!ReadyFlag); ui32 batchGranuleIdx = Batches.size(); WaitBatches.emplace(batchGranuleIdx); - Batches.emplace_back(TBatch(TBatchAddress(GranuleIdx, batchGranuleIdx), *this, portionInfo)); + Batches.emplace_back(TBatch(TBatchAddress(GranuleId, batchGranuleIdx), *this, portionInfo)); Y_VERIFY(GranuleBatchNumbers.emplace(batchGranuleIdx).second); Owner->OnNewBatch(Batches.back()); return Batches.back(); @@ -117,7 +117,7 @@ void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) { void TGranule::CheckReady() { if (WaitBatches.empty() && NotIndexedBatchReadyFlag) { ReadyFlag = true; - Owner->OnGranuleReady(*this); + Owner->OnGranuleReady(GranuleId); } } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h index e8afb9436d0..e991e0126c6 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.h +++ b/ydb/core/tx/columnshard/engines/reader/granule.h @@ -12,9 +12,10 @@ namespace NKikimr::NOlap::NIndexedReader { class TGranulesFillingContext; class TGranule { +public: + using TPtr = std::shared_ptr<TGranule>; private: ui64 GranuleId = 0; - YDB_READONLY(ui64, GranuleIdx, 0); bool NotIndexedBatchReadyFlag = false; std::shared_ptr<arrow::RecordBatch> NotIndexedBatch; @@ -31,9 +32,8 @@ private: ui64 BlobsDataSize = 0; void CheckReady(); public: - TGranule(const ui64 granuleId, const ui64 granuleIdx, TGranulesFillingContext& owner) + TGranule(const ui64 granuleId, TGranulesFillingContext& owner) : GranuleId(granuleId) - , GranuleIdx(granuleIdx) , Owner(&owner) { } diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h index 14de62d85ea..43a322c36c1 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h +++ b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h @@ -22,7 +22,7 @@ protected: virtual bool DoWakeup(const TGranule& /*granule*/, TGranulesFillingContext& /*context*/) { return true; } - virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) = 0; + virtual std::vector<TGranule::TPtr> DoDetachReadyGranules(THashMap<ui64, TGranule::TPtr>& granulesToOut) = 0; virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& /*granule*/, TGranulesFillingContext& context) { OnBatchFilterInitialized(batchInfo, context); return true; @@ -67,7 +67,7 @@ public: virtual bool ReadyForAddNotIndexedToEnd() const = 0; - std::vector<TGranule*> DetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { + std::vector<TGranule::TPtr> DetachReadyGranules(THashMap<ui64, TGranule::TPtr>& granulesToOut) { return DoDetachReadyGranules(granulesToOut); } diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp index b31b499583d..95642e73a16 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp +++ b/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp @@ -6,15 +6,15 @@ namespace NKikimr::NOlap::NIndexedReader { void TAnySorting::DoFill(TGranulesFillingContext& context) { auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted()); for (ui64 granule : granulesOrder) { - TGranule& g = context.GetGranuleVerified(granule); - GranulesOutOrder.emplace_back(&g); + TGranule::TPtr g = context.GetGranuleVerified(granule); + GranulesOutOrder.emplace_back(g); } } -std::vector<TGranule*> TAnySorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { - std::vector<TGranule*> result; +std::vector<TGranule::TPtr> TAnySorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule::TPtr>& granulesToOut) { + std::vector<TGranule::TPtr> result; while (GranulesOutOrder.size()) { - NIndexedReader::TGranule* granule = GranulesOutOrder.front(); + NIndexedReader::TGranule::TPtr granule = GranulesOutOrder.front(); if (!granule->IsReady()) { break; } diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/default.h b/ydb/core/tx/columnshard/engines/reader/order_control/default.h index 507488e9a86..24ae84ea107 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/default.h +++ b/ydb/core/tx/columnshard/engines/reader/order_control/default.h @@ -6,10 +6,10 @@ namespace NKikimr::NOlap::NIndexedReader { class TAnySorting: public IOrderPolicy { private: using TBase = IOrderPolicy; - std::deque<TGranule*> GranulesOutOrder; + std::deque<TGranule::TPtr> GranulesOutOrder; protected: virtual void DoFill(TGranulesFillingContext& context) override; - virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override; + virtual std::vector<TGranule::TPtr> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule::TPtr>& granulesToOut) override; virtual TString DoDebugString() const override { return TStringBuilder() << "type=AnySorting;granules_count=" << GranulesOutOrder.size() << ";"; } diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp index 8d6cc168e52..24652ea6e36 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp +++ b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp @@ -2,8 +2,8 @@ namespace NKikimr::NOlap::NIndexedReader { -std::vector<TGranule*> TNonSorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { - std::vector<TGranule*> result; +std::vector<TGranule::TPtr> TNonSorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule::TPtr>& granulesToOut) { + std::vector<TGranule::TPtr> result; result.reserve(granulesToOut.size()); for (auto&& i : granulesToOut) { result.emplace_back(i.second); diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h index c1c914044fb..b8c939bbcd1 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h +++ b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h @@ -14,7 +14,7 @@ protected: virtual void DoFill(TGranulesFillingContext& /*context*/) override { } - virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override; + virtual std::vector<TGranule::TPtr> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule::TPtr>& granulesToOut) override; public: TNonSorting(TReadMetadata::TConstPtr readMetadata) :TBase(readMetadata) diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp index fc2c48d46d5..b38708eb4a6 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp +++ b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp @@ -94,16 +94,16 @@ bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) { auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted()); for (ui64 granule : granulesOrder) { - TGranule& g = context.GetGranuleVerified(granule); - GranulesOutOrder.emplace_back(&g); - GranulesOutOrderForPortions.emplace_back(g.SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata), &g); + TGranule::TPtr g = context.GetGranuleVerified(granule); + GranulesOutOrder.emplace_back(g); + GranulesOutOrderForPortions.emplace_back(g->SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata), g); } } -std::vector<TGranule*> TPKSortingWithLimit::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { - std::vector<TGranule*> result; +std::vector<TGranule::TPtr> TPKSortingWithLimit::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule::TPtr>& granulesToOut) { + std::vector<TGranule::TPtr> result; while (GranulesOutOrder.size()) { - NIndexedReader::TGranule* granule = GranulesOutOrder.front(); + NIndexedReader::TGranule::TPtr granule = GranulesOutOrder.front(); if (!granule->IsReady()) { break; } diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h index 3f5d17d24cb..f3ba0c467f8 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h +++ b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h @@ -8,7 +8,7 @@ class TGranuleOrdered { private: bool StartedFlag = false; std::deque<TGranule::TBatchForMerge> OrderedBatches; - TGranule* Granule = nullptr; + TGranule::TPtr Granule; public: bool Start() { if (!StartedFlag) { @@ -20,7 +20,7 @@ public: } - TGranuleOrdered(std::deque<TGranule::TBatchForMerge>&& orderedBatches, TGranule* granule) + TGranuleOrdered(std::deque<TGranule::TBatchForMerge>&& orderedBatches, TGranule::TPtr granule) : OrderedBatches(std::move(orderedBatches)) , Granule(granule) { @@ -30,7 +30,7 @@ public: return OrderedBatches; } - TGranule* GetGranule() const noexcept { + TGranule::TPtr GetGranule() const noexcept { return Granule; } }; @@ -38,7 +38,7 @@ public: class TPKSortingWithLimit: public IOrderPolicy { private: using TBase = IOrderPolicy; - std::deque<TGranule*> GranulesOutOrder; + std::deque<TGranule::TPtr> GranulesOutOrder; std::deque<TGranuleOrdered> GranulesOutOrderForPortions; ui32 CurrentItemsLimit = 0; THashMap<ui32, ui32> CountBatchesByPools; @@ -51,7 +51,7 @@ private: protected: virtual bool DoWakeup(const TGranule& granule, TGranulesFillingContext& context) override; virtual void DoFill(TGranulesFillingContext& context) override; - virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override; + virtual std::vector<TGranule::TPtr> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule::TPtr>& granulesToOut) override; virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) override; virtual TFeatures DoGetFeatures() const override { return (TFeatures)EFeatures::CanInterrupt & (TFeatures)EFeatures::NeedNotAppliedEarlyFilter; diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp index 38d7bfc7056..c48aa234633 100644 --- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp @@ -23,8 +23,10 @@ bool TAssembleBatch::DoExecuteImpl() { } bool TAssembleBatch::DoApply(TGranulesFillingContext& owner) const { - TBatch& batch = owner.GetBatchInfo(BatchAddress); - batch.InitBatch(FullBatch); + TBatch* batch = owner.GetBatchInfo(BatchAddress); + if (batch) { + batch->InitBatch(FullBatch); + } return true; } |