summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <[email protected]>2023-06-15 15:07:28 +0300
committerivanmorozov <[email protected]>2023-06-15 15:07:28 +0300
commite85294245a1a7a68282ac41df190e4061a249a29 (patch)
tree7673a11c32a5855d1cc6d802afe4f1b3139a4475
parentc4f4c306e4d45da901caa1094ca630535fb7cbbd (diff)
optimize granules usage on scan
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp37
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common.cpp30
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common.h8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.cpp21
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h47
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.h6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/abstract.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/default.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/default.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp12
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h10
-rw-r--r--ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp6
18 files changed, 112 insertions, 107 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 8d1f53df638..92a188a5a8d 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -142,25 +142,26 @@ private:
InFlightReadBytes += blobRange.Size;
ranges[blobRange.BlobId].emplace_back(blobRange);
}
- if (ranges.size()) {
- auto& externBlobs = ReadMetadataRanges[ReadMetadataIndex]->ExternBlobs;
- for (auto&& i : ranges) {
- bool fallback = externBlobs && externBlobs->contains(i.first);
- NBlobCache::TReadBlobRangeOptions readOpts{
- .CacheAfterRead = true,
- .ForceFallback = fallback,
- .IsBackgroud = false
- };
- ui32 size = 0;
- for (auto&& s : i.second) {
- size += s.Size;
- }
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " blobs request:" << i.first << "/" << i.second.size() << "/" << size
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
- Stats.RequestSent(i.second);
- Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(i.second), std::move(readOpts)));
+ if (!ranges.size()) {
+ return true;
+ }
+ auto& externBlobs = ReadMetadataRanges[ReadMetadataIndex]->ExternBlobs;
+ for (auto&& i : ranges) {
+ bool fallback = externBlobs && externBlobs->contains(i.first);
+ NBlobCache::TReadBlobRangeOptions readOpts{
+ .CacheAfterRead = true,
+ .ForceFallback = fallback,
+ .IsBackgroud = false
+ };
+ ui32 size = 0;
+ for (auto&& s : i.second) {
+ size += s.Size;
}
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
+ "Scan " << ScanActorId << " blobs request:" << i.first << "/" << i.second.size() << "/" << size
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+ Stats.RequestSent(i.second);
+ Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(i.second), std::move(readOpts)));
}
return true;
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 34fe9d09952..aa574c533b9 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -855,7 +855,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
THashSet<ui64> excludedPortions;
for (const auto& portionInfo : changes->PortionsToDrop) {
ui64 portionId = portionInfo.Records.front().Portion;
- // Exclude portions that are used by in-flght reads/scans
+ // Exclude portions that are used by in-flight reads/scans
if (!InFlightReadsTracker.IsPortionUsed(portionId)) {
portionsCanBedropped.push_back(portionInfo);
} else {
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 7da2687974a..b1b6e6c9052 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -143,8 +143,8 @@ void TIndexedReadData::InitRead(ui32 inputBatch) {
portionsBytes += portionInfo.BlobsBytes();
Y_VERIFY_S(portionInfo.Records.size(), "ReadMeatadata: " << *ReadMetadata);
- NIndexedReader::TGranule& granule = GranulesContext->UpsertGranule(portionInfo.Records[0].Granule);
- granule.AddBatch(portionInfo);
+ NIndexedReader::TGranule::TPtr granule = GranulesContext->UpsertGranule(portionInfo.Records[0].Granule);
+ granule->AddBatch(portionInfo);
}
GranulesContext->PrepareForStart();
@@ -265,7 +265,7 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::
Y_VERIFY(GranulesContext);
auto& indexInfo = ReadMetadata->GetIndexInfo();
- std::vector<NIndexedReader::TGranule*> ready = GranulesContext->DetachReadyInOrder();
+ std::vector<NIndexedReader::TGranule::TPtr> ready = GranulesContext->DetachReadyInOrder();
std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> out;
out.reserve(ready.size() + 1);
diff --git a/ydb/core/tx/columnshard/engines/reader/common.cpp b/ydb/core/tx/columnshard/engines/reader/common.cpp
index 5e29cd8c9c1..46168595a26 100644
--- a/ydb/core/tx/columnshard/engines/reader/common.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/common.cpp
@@ -1,17 +1,17 @@
-#include "common.h"
-#include <util/string/builder.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-TString TBatchAddress::ToString() const {
- return TStringBuilder() << GranuleIdx << "," << BatchGranuleIdx;
-}
-
-TBatchAddress::TBatchAddress(const ui32 granuleIdx, const ui32 batchGranuleIdx)
- : GranuleIdx(granuleIdx)
+#include "common.h"
+#include <util/string/builder.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+TString TBatchAddress::ToString() const {
+ return TStringBuilder() << GranuleId << "," << BatchGranuleIdx;
+}
+
+TBatchAddress::TBatchAddress(const ui32 granuleId, const ui32 batchGranuleIdx)
+ : GranuleId(granuleId)
, BatchGranuleIdx(batchGranuleIdx)
-{
-
-}
-
+{
+
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/common.h b/ydb/core/tx/columnshard/engines/reader/common.h
index 85f684c482d..30ad233b2e2 100644
--- a/ydb/core/tx/columnshard/engines/reader/common.h
+++ b/ydb/core/tx/columnshard/engines/reader/common.h
@@ -7,15 +7,15 @@ namespace NKikimr::NOlap::NIndexedReader {
class TBatchAddress {
private:
- ui32 GranuleIdx = 0;
+ ui32 GranuleId = 0;
ui32 BatchGranuleIdx = 0;
public:
TString ToString() const;
- TBatchAddress(const ui32 granuleIdx, const ui32 batchGranuleIdx);
+ TBatchAddress(const ui32 granuleId, const ui32 batchGranuleIdx);
- ui32 GetGranuleIdx() const {
- return GranuleIdx;
+ ui32 GetGranuleId() const {
+ return GranuleId;
}
ui32 GetBatchGranuleIdx() const {
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
index 4eb488aee46..372efaaaf82 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
@@ -45,10 +45,13 @@ void TGranulesFillingContext::OnBatchReady(const NIndexedReader::TBatch& batchIn
return Owner.OnBatchReady(batchInfo, batch);
}
-NKikimr::NOlap::NIndexedReader::TBatch& TGranulesFillingContext::GetBatchInfo(const TBatchAddress& address) {
- Y_VERIFY(address.GetGranuleIdx() < GranulesStorage.size());
- auto& g = GranulesStorage[address.GetGranuleIdx()];
- return g.GetBatchInfo(address.GetBatchGranuleIdx());
+NIndexedReader::TBatch* TGranulesFillingContext::GetBatchInfo(const TBatchAddress& address) {
+ auto it = GranulesWaiting.find(address.GetGranuleId());
+ if (it == GranulesWaiting.end()) {
+ return nullptr;
+ } else {
+ return &it->second->GetBatchInfo(address.GetBatchGranuleIdx());
+ }
}
NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::GetTasksProcessor() const {
@@ -56,15 +59,15 @@ NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::Get
}
void TGranulesFillingContext::DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches) {
- for (auto&& g : GranulesStorage) {
+ for (auto&& [_, gPtr] : GranulesWaiting) {
if (!batches) {
- g.AddNotIndexedBatch(nullptr);
+ gPtr->AddNotIndexedBatch(nullptr);
} else {
- auto it = batches->find(g.GetGranuleId());
+ auto it = batches->find(gPtr->GetGranuleId());
if (it == batches->end()) {
- g.AddNotIndexedBatch(nullptr);
+ gPtr->AddNotIndexedBatch(nullptr);
} else {
- g.AddNotIndexedBatch(it->second);
+ gPtr->AddNotIndexedBatch(it->second);
}
batches->erase(it);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
index d9f0297497d..5465ac6727d 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h
@@ -17,10 +17,9 @@ private:
TReadMetadata::TConstPtr ReadMetadata;
const bool InternalReading = false;
TIndexedReadData& Owner;
- THashMap<ui64, NIndexedReader::TGranule*> GranulesToOut;
+ THashMap<ui64, NIndexedReader::TGranule::TPtr> GranulesToOut;
std::set<ui64> ReadyGranulesAccumulator;
- std::deque<NIndexedReader::TGranule> GranulesStorage;
- THashMap<ui64, NIndexedReader::TGranule*> GranulesUpserted;
+ THashMap<ui64, NIndexedReader::TGranule::TPtr> GranulesWaiting;
std::set<ui32> EarlyFilterColumns;
std::set<ui32> PostFilterColumns;
std::set<ui32> FilterStageColumns;
@@ -67,18 +66,18 @@ public:
NColumnShard::TDataTasksProcessorContainer GetTasksProcessor() const;
void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches);
- TBatch& GetBatchInfo(const TBatchAddress& address);
+ NIndexedReader::TBatch* GetBatchInfo(const TBatchAddress& address);
void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch);
void OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch);
- NIndexedReader::TGranule& GetGranuleVerified(const ui64 granuleId) {
- auto it = GranulesUpserted.find(granuleId);
- Y_VERIFY(it != GranulesUpserted.end());
- return *it->second;
+ TGranule::TPtr GetGranuleVerified(const ui64 granuleId) {
+ auto it = GranulesWaiting.find(granuleId);
+ Y_VERIFY(it != GranulesWaiting.end());
+ return it->second;
}
- bool IsInProgress() const { return GranulesStorage.size() > ReadyGranulesAccumulator.size(); }
+ bool IsInProgress() const { return GranulesWaiting.size(); }
void OnNewBatch(TBatch& batch) {
if (!InternalReading && PredictEmptyAfterFilter(batch.GetPortionInfo())) {
@@ -88,35 +87,33 @@ public:
}
}
- std::vector<TGranule*> DetachReadyInOrder() {
+ std::vector<TGranule::TPtr> DetachReadyInOrder() {
Y_VERIFY(SortingPolicy);
return SortingPolicy->DetachReadyGranules(GranulesToOut);
}
void Abort() {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort");
- for (auto&& i : GranulesStorage) {
- ReadyGranulesAccumulator.emplace(i.GetGranuleId());
- }
+ GranulesWaiting.clear();
AbortedFlag = true;
- Y_VERIFY(ReadyGranulesAccumulator.size() == GranulesStorage.size());
Y_VERIFY(!IsInProgress());
}
- TGranule& UpsertGranule(const ui64 granuleId) {
- auto itGranule = GranulesUpserted.find(granuleId);
- if (itGranule == GranulesUpserted.end()) {
- GranulesStorage.emplace_back(NIndexedReader::TGranule(granuleId, GranulesStorage.size(), *this));
- itGranule = GranulesUpserted.emplace(granuleId, &GranulesStorage.back()).first;
+ TGranule::TPtr UpsertGranule(const ui64 granuleId) {
+ auto itGranule = GranulesWaiting.find(granuleId);
+ if (itGranule == GranulesWaiting.end()) {
+ itGranule = GranulesWaiting.emplace(granuleId, std::make_shared<TGranule>(granuleId, *this)).first;
}
- return *itGranule->second;
+ return itGranule->second;
}
- void OnGranuleReady(TGranule& granule) {
- Y_VERIFY(GranulesToOut.emplace(granule.GetGranuleId(), &granule).second);
- Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second || AbortedFlag);
- GranulesInProcessing.erase(granule.GetGranuleId());
- BlobsSizeInProcessing -= granule.GetBlobsDataSize();
+ void OnGranuleReady(const ui64 granuleId) {
+ auto granule = GetGranuleVerified(granuleId);
+ Y_VERIFY(GranulesToOut.emplace(granule->GetGranuleId(), granule).second);
+ Y_VERIFY(ReadyGranulesAccumulator.emplace(granule->GetGranuleId()).second || AbortedFlag);
+ Y_VERIFY(GranulesWaiting.erase(granuleId));
+ GranulesInProcessing.erase(granule->GetGranuleId());
+ BlobsSizeInProcessing -= granule->GetBlobsDataSize();
Y_VERIFY(BlobsSizeInProcessing >= 0);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
index 7389d88286c..aaf404fd297 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
@@ -51,11 +51,13 @@ bool TAssembleFilter::DoExecuteImpl() {
}
bool TAssembleFilter::DoApply(TGranulesFillingContext& owner) const {
- TBatch& batch = owner.GetBatchInfo(BatchAddress);
Y_VERIFY(OriginalCount);
owner.GetCounters().OriginalRowsCount->Add(OriginalCount);
owner.GetCounters().AssembleFilterCount->Add(1);
- batch.InitFilter(Filter, FilteredBatch, OriginalCount, EarlyFilter);
+ TBatch* batch = owner.GetBatchInfo(BatchAddress);
+ if (batch) {
+ batch->InitFilter(Filter, FilteredBatch, OriginalCount, EarlyFilter);
+ }
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
index eeac416b75b..7f419648b29 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -37,7 +37,7 @@ NKikimr::NOlap::NIndexedReader::TBatch& TGranule::AddBatch(const TPortionInfo& p
Y_VERIFY(!ReadyFlag);
ui32 batchGranuleIdx = Batches.size();
WaitBatches.emplace(batchGranuleIdx);
- Batches.emplace_back(TBatch(TBatchAddress(GranuleIdx, batchGranuleIdx), *this, portionInfo));
+ Batches.emplace_back(TBatch(TBatchAddress(GranuleId, batchGranuleIdx), *this, portionInfo));
Y_VERIFY(GranuleBatchNumbers.emplace(batchGranuleIdx).second);
Owner->OnNewBatch(Batches.back());
return Batches.back();
@@ -117,7 +117,7 @@ void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) {
void TGranule::CheckReady() {
if (WaitBatches.empty() && NotIndexedBatchReadyFlag) {
ReadyFlag = true;
- Owner->OnGranuleReady(*this);
+ Owner->OnGranuleReady(GranuleId);
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h
index e8afb9436d0..e991e0126c6 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.h
+++ b/ydb/core/tx/columnshard/engines/reader/granule.h
@@ -12,9 +12,10 @@ namespace NKikimr::NOlap::NIndexedReader {
class TGranulesFillingContext;
class TGranule {
+public:
+ using TPtr = std::shared_ptr<TGranule>;
private:
ui64 GranuleId = 0;
- YDB_READONLY(ui64, GranuleIdx, 0);
bool NotIndexedBatchReadyFlag = false;
std::shared_ptr<arrow::RecordBatch> NotIndexedBatch;
@@ -31,9 +32,8 @@ private:
ui64 BlobsDataSize = 0;
void CheckReady();
public:
- TGranule(const ui64 granuleId, const ui64 granuleIdx, TGranulesFillingContext& owner)
+ TGranule(const ui64 granuleId, TGranulesFillingContext& owner)
: GranuleId(granuleId)
- , GranuleIdx(granuleIdx)
, Owner(&owner) {
}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h
index 14de62d85ea..43a322c36c1 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h
@@ -22,7 +22,7 @@ protected:
virtual bool DoWakeup(const TGranule& /*granule*/, TGranulesFillingContext& /*context*/) {
return true;
}
- virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) = 0;
+ virtual std::vector<TGranule::TPtr> DoDetachReadyGranules(THashMap<ui64, TGranule::TPtr>& granulesToOut) = 0;
virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& /*granule*/, TGranulesFillingContext& context) {
OnBatchFilterInitialized(batchInfo, context);
return true;
@@ -67,7 +67,7 @@ public:
virtual bool ReadyForAddNotIndexedToEnd() const = 0;
- std::vector<TGranule*> DetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
+ std::vector<TGranule::TPtr> DetachReadyGranules(THashMap<ui64, TGranule::TPtr>& granulesToOut) {
return DoDetachReadyGranules(granulesToOut);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp
index b31b499583d..95642e73a16 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp
@@ -6,15 +6,15 @@ namespace NKikimr::NOlap::NIndexedReader {
void TAnySorting::DoFill(TGranulesFillingContext& context) {
auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted());
for (ui64 granule : granulesOrder) {
- TGranule& g = context.GetGranuleVerified(granule);
- GranulesOutOrder.emplace_back(&g);
+ TGranule::TPtr g = context.GetGranuleVerified(granule);
+ GranulesOutOrder.emplace_back(g);
}
}
-std::vector<TGranule*> TAnySorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
- std::vector<TGranule*> result;
+std::vector<TGranule::TPtr> TAnySorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule::TPtr>& granulesToOut) {
+ std::vector<TGranule::TPtr> result;
while (GranulesOutOrder.size()) {
- NIndexedReader::TGranule* granule = GranulesOutOrder.front();
+ NIndexedReader::TGranule::TPtr granule = GranulesOutOrder.front();
if (!granule->IsReady()) {
break;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/default.h b/ydb/core/tx/columnshard/engines/reader/order_control/default.h
index 507488e9a86..24ae84ea107 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_control/default.h
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/default.h
@@ -6,10 +6,10 @@ namespace NKikimr::NOlap::NIndexedReader {
class TAnySorting: public IOrderPolicy {
private:
using TBase = IOrderPolicy;
- std::deque<TGranule*> GranulesOutOrder;
+ std::deque<TGranule::TPtr> GranulesOutOrder;
protected:
virtual void DoFill(TGranulesFillingContext& context) override;
- virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override;
+ virtual std::vector<TGranule::TPtr> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule::TPtr>& granulesToOut) override;
virtual TString DoDebugString() const override {
return TStringBuilder() << "type=AnySorting;granules_count=" << GranulesOutOrder.size() << ";";
}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp
index 8d6cc168e52..24652ea6e36 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp
@@ -2,8 +2,8 @@
namespace NKikimr::NOlap::NIndexedReader {
-std::vector<TGranule*> TNonSorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
- std::vector<TGranule*> result;
+std::vector<TGranule::TPtr> TNonSorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule::TPtr>& granulesToOut) {
+ std::vector<TGranule::TPtr> result;
result.reserve(granulesToOut.size());
for (auto&& i : granulesToOut) {
result.emplace_back(i.second);
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h
index c1c914044fb..b8c939bbcd1 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h
@@ -14,7 +14,7 @@ protected:
virtual void DoFill(TGranulesFillingContext& /*context*/) override {
}
- virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override;
+ virtual std::vector<TGranule::TPtr> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule::TPtr>& granulesToOut) override;
public:
TNonSorting(TReadMetadata::TConstPtr readMetadata)
:TBase(readMetadata)
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
index fc2c48d46d5..b38708eb4a6 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
@@ -94,16 +94,16 @@ bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule&
void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) {
auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted());
for (ui64 granule : granulesOrder) {
- TGranule& g = context.GetGranuleVerified(granule);
- GranulesOutOrder.emplace_back(&g);
- GranulesOutOrderForPortions.emplace_back(g.SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata), &g);
+ TGranule::TPtr g = context.GetGranuleVerified(granule);
+ GranulesOutOrder.emplace_back(g);
+ GranulesOutOrderForPortions.emplace_back(g->SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata), g);
}
}
-std::vector<TGranule*> TPKSortingWithLimit::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
- std::vector<TGranule*> result;
+std::vector<TGranule::TPtr> TPKSortingWithLimit::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule::TPtr>& granulesToOut) {
+ std::vector<TGranule::TPtr> result;
while (GranulesOutOrder.size()) {
- NIndexedReader::TGranule* granule = GranulesOutOrder.front();
+ NIndexedReader::TGranule::TPtr granule = GranulesOutOrder.front();
if (!granule->IsReady()) {
break;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h
index 3f5d17d24cb..f3ba0c467f8 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h
@@ -8,7 +8,7 @@ class TGranuleOrdered {
private:
bool StartedFlag = false;
std::deque<TGranule::TBatchForMerge> OrderedBatches;
- TGranule* Granule = nullptr;
+ TGranule::TPtr Granule;
public:
bool Start() {
if (!StartedFlag) {
@@ -20,7 +20,7 @@ public:
}
- TGranuleOrdered(std::deque<TGranule::TBatchForMerge>&& orderedBatches, TGranule* granule)
+ TGranuleOrdered(std::deque<TGranule::TBatchForMerge>&& orderedBatches, TGranule::TPtr granule)
: OrderedBatches(std::move(orderedBatches))
, Granule(granule)
{
@@ -30,7 +30,7 @@ public:
return OrderedBatches;
}
- TGranule* GetGranule() const noexcept {
+ TGranule::TPtr GetGranule() const noexcept {
return Granule;
}
};
@@ -38,7 +38,7 @@ public:
class TPKSortingWithLimit: public IOrderPolicy {
private:
using TBase = IOrderPolicy;
- std::deque<TGranule*> GranulesOutOrder;
+ std::deque<TGranule::TPtr> GranulesOutOrder;
std::deque<TGranuleOrdered> GranulesOutOrderForPortions;
ui32 CurrentItemsLimit = 0;
THashMap<ui32, ui32> CountBatchesByPools;
@@ -51,7 +51,7 @@ private:
protected:
virtual bool DoWakeup(const TGranule& granule, TGranulesFillingContext& context) override;
virtual void DoFill(TGranulesFillingContext& context) override;
- virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override;
+ virtual std::vector<TGranule::TPtr> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule::TPtr>& granulesToOut) override;
virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) override;
virtual TFeatures DoGetFeatures() const override {
return (TFeatures)EFeatures::CanInterrupt & (TFeatures)EFeatures::NeedNotAppliedEarlyFilter;
diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
index 38d7bfc7056..c48aa234633 100644
--- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
@@ -23,8 +23,10 @@ bool TAssembleBatch::DoExecuteImpl() {
}
bool TAssembleBatch::DoApply(TGranulesFillingContext& owner) const {
- TBatch& batch = owner.GetBatchInfo(BatchAddress);
- batch.InitBatch(FullBatch);
+ TBatch* batch = owner.GetBatchInfo(BatchAddress);
+ if (batch) {
+ batch->InitBatch(FullBatch);
+ }
return true;
}