summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <[email protected]>2023-06-13 12:28:53 +0300
committerivanmorozov <[email protected]>2023-06-13 12:28:53 +0300
commit8d5fbc6b3eefbde35de61a8c6592241133e72e6e (patch)
tree7d1b7f00e189ae4d220ac1b2e71742f46a949211
parentc98fafc8f6303ddd9a82e2fdfef0083ee61622db (diff)
control memory usage for column shard scan mode
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp6
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.h2
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h16
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.h7
8 files changed, 39 insertions, 9 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index 913b7056ce7..7205c63689d 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -66,7 +66,11 @@ NKikimr::NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() {
}
NKikimr::NColumnShard::TBlobRange TColumnShardScanIterator::GetNextBlobToRead() {
- return FetchBlobsQueue.pop_front();
+ if (IndexedData.GetGranulesContext().CanProcessMore()) {
+ return FetchBlobsQueue.pop_front();
+ } else {
+ return NKikimr::NColumnShard::TBlobRange();
+ }
}
void TColumnShardScanIterator::FillReadyResults() {
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h
index 3450928dfa5..faa5e58fe38 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.h
@@ -27,7 +27,7 @@ public:
using NOlap::TUnifiedBlobId;
using NOlap::TBlobRange;
-class TColumnShardScanIterator : public TScanIteratorBase {
+class TColumnShardScanIterator: public TScanIteratorBase {
private:
NOlap::TReadMetadata::TConstPtr ReadMetadata;
NOlap::TFetchBlobsQueue FetchBlobsQueue;
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 223a581a97f..7da2687974a 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -288,13 +288,8 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::
Y_VERIFY(batch->num_rows());
Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortReplaceDescription->ReplaceKey));
}
-#if 1 // optimization
auto deduped = SpecialMergeSorted(inGranule, indexInfo, SortReplaceDescription, granule->GetBatchesToDedup());
out.emplace_back(std::move(deduped));
-#else
- out.push_back({});
- out.back().emplace_back(CombineSortedBatches(inGranule, SortReplaceDescription));
-#endif
} else {
out.emplace_back(std::move(inGranule));
}
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
index d79bb9f7da7..abf4af046c4 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -164,6 +164,7 @@ bool TBatch::AddIndexedReady(const TBlobRange& bRange, const TString& blobData)
WaitingBytes -= bRange.Size;
FetchedBytes += bRange.Size;
Data.emplace(bRange, TPortionInfo::TAssembleBlobInfo(blobData));
+ Owner->OnBlobReady(bRange);
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
index 2b7cb49bbed..4eb488aee46 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
@@ -71,4 +71,10 @@ void TGranulesFillingContext::DrainNotIndexedBatches(THashMap<ui64, std::shared_
}
}
+bool TGranulesFillingContext::CanProcessMore() const {
+ return GranulesInProcessing.size() <= GranulesCountProcessingLimit ||
+ BlobsSizeInProcessing <= ProcessingBytesLimit
+ ;
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
index 1ee7bd46b90..0cc3f24eab2 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h
@@ -27,12 +27,23 @@ private:
std::set<ui32> UsedColumns;
IOrderPolicy::TPtr SortingPolicy;
NColumnShard::TScanCounters Counters;
-
+ std::set<ui64> GranulesInProcessing;
+ i64 BlobsSizeInProcessing = 0;
bool PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const;
+ static constexpr ui32 GranulesCountProcessingLimit = 16;
+ static constexpr ui64 ExpectedBytesForGranule = 200 * 1024 * 1024;
+ static constexpr i64 ProcessingBytesLimit = GranulesCountProcessingLimit * ExpectedBytesForGranule;
public:
TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading);
+ bool CanProcessMore() const;
+
+ void OnBlobReady(const ui64 granuleId, const TBlobRange& range) noexcept {
+ GranulesInProcessing.emplace(granuleId);
+ BlobsSizeInProcessing += range.Size;
+ }
+
TReadMetadata::TConstPtr GetReadMetadata() const noexcept {
return ReadMetadata;
}
@@ -104,6 +115,9 @@ public:
void OnGranuleReady(TGranule& granule) {
Y_VERIFY(GranulesToOut.emplace(granule.GetGranuleId(), &granule).second);
Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second || AbortedFlag);
+ Y_VERIFY(GranulesInProcessing.erase(granule.GetGranuleId()));
+ BlobsSizeInProcessing -= granule.GetBlobsDataSize();
+ Y_VERIFY(BlobsSizeInProcessing >= 0);
}
void Wakeup(TGranule& granule) {
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
index e970eb52677..dd1074db486 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -120,4 +120,9 @@ void TGranule::CheckReady() {
}
}
+void TGranule::OnBlobReady(const TBlobRange& range) noexcept {
+ BlobsDataSize += range.Size;
+ Owner->OnBlobReady(GranuleId, range);
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h
index 63657a469d0..e8afb9436d0 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.h
+++ b/ydb/core/tx/columnshard/engines/reader/granule.h
@@ -28,7 +28,7 @@ private:
std::set<ui32> GranuleBatchNumbers;
TGranulesFillingContext* Owner = nullptr;
THashSet<const void*> BatchesToDedup;
-
+ ui64 BlobsDataSize = 0;
void CheckReady();
public:
TGranule(const ui64 granuleId, const ui64 granuleIdx, TGranulesFillingContext& owner)
@@ -37,6 +37,10 @@ public:
, Owner(&owner) {
}
+ ui64 GetBlobsDataSize() const noexcept {
+ return BlobsDataSize;
+ }
+
ui64 GetGranuleId() const noexcept {
return GranuleId;
}
@@ -124,6 +128,7 @@ public:
const std::set<ui32>& GetEarlyFilterColumns() const;
void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch);
+ void OnBlobReady(const TBlobRange& range) noexcept;
bool OnFilterReady(TBatch& batchInfo);
TBatch& AddBatch(const TPortionInfo& portionInfo);
void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const;