diff options
author | ivanmorozov <[email protected]> | 2023-06-13 12:28:53 +0300 |
---|---|---|
committer | ivanmorozov <[email protected]> | 2023-06-13 12:28:53 +0300 |
commit | 8d5fbc6b3eefbde35de61a8c6592241133e72e6e (patch) | |
tree | 7d1b7f00e189ae4d220ac1b2e71742f46a949211 | |
parent | c98fafc8f6303ddd9a82e2fdfef0083ee61622db (diff) |
control memory usage for column shard scan mode
8 files changed, 39 insertions, 9 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index 913b7056ce7..7205c63689d 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -66,7 +66,11 @@ NKikimr::NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() { } NKikimr::NColumnShard::TBlobRange TColumnShardScanIterator::GetNextBlobToRead() { - return FetchBlobsQueue.pop_front(); + if (IndexedData.GetGranulesContext().CanProcessMore()) { + return FetchBlobsQueue.pop_front(); + } else { + return NKikimr::NColumnShard::TBlobRange(); + } } void TColumnShardScanIterator::FillReadyResults() { diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h index 3450928dfa5..faa5e58fe38 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.h +++ b/ydb/core/tx/columnshard/columnshard__index_scan.h @@ -27,7 +27,7 @@ public: using NOlap::TUnifiedBlobId; using NOlap::TBlobRange; -class TColumnShardScanIterator : public TScanIteratorBase { +class TColumnShardScanIterator: public TScanIteratorBase { private: NOlap::TReadMetadata::TConstPtr ReadMetadata; NOlap::TFetchBlobsQueue FetchBlobsQueue; diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 223a581a97f..7da2687974a 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -288,13 +288,8 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData:: Y_VERIFY(batch->num_rows()); Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortReplaceDescription->ReplaceKey)); } -#if 1 // optimization auto deduped = SpecialMergeSorted(inGranule, indexInfo, SortReplaceDescription, granule->GetBatchesToDedup()); out.emplace_back(std::move(deduped)); -#else - out.push_back({}); - out.back().emplace_back(CombineSortedBatches(inGranule, SortReplaceDescription)); -#endif } else { out.emplace_back(std::move(inGranule)); } diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index d79bb9f7da7..abf4af046c4 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -164,6 +164,7 @@ bool TBatch::AddIndexedReady(const TBlobRange& bRange, const TString& blobData) WaitingBytes -= bRange.Size; FetchedBytes += bRange.Size; Data.emplace(bRange, TPortionInfo::TAssembleBlobInfo(blobData)); + Owner->OnBlobReady(bRange); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp index 2b7cb49bbed..4eb488aee46 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp @@ -71,4 +71,10 @@ void TGranulesFillingContext::DrainNotIndexedBatches(THashMap<ui64, std::shared_ } } +bool TGranulesFillingContext::CanProcessMore() const { + return GranulesInProcessing.size() <= GranulesCountProcessingLimit || + BlobsSizeInProcessing <= ProcessingBytesLimit + ; +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h index 1ee7bd46b90..0cc3f24eab2 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -27,12 +27,23 @@ private: std::set<ui32> UsedColumns; IOrderPolicy::TPtr SortingPolicy; NColumnShard::TScanCounters Counters; - + std::set<ui64> GranulesInProcessing; + i64 BlobsSizeInProcessing = 0; bool PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const; + static constexpr ui32 GranulesCountProcessingLimit = 16; + static constexpr ui64 ExpectedBytesForGranule = 200 * 1024 * 1024; + static constexpr i64 ProcessingBytesLimit = GranulesCountProcessingLimit * ExpectedBytesForGranule; public: TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading); + bool CanProcessMore() const; + + void OnBlobReady(const ui64 granuleId, const TBlobRange& range) noexcept { + GranulesInProcessing.emplace(granuleId); + BlobsSizeInProcessing += range.Size; + } + TReadMetadata::TConstPtr GetReadMetadata() const noexcept { return ReadMetadata; } @@ -104,6 +115,9 @@ public: void OnGranuleReady(TGranule& granule) { Y_VERIFY(GranulesToOut.emplace(granule.GetGranuleId(), &granule).second); Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second || AbortedFlag); + Y_VERIFY(GranulesInProcessing.erase(granule.GetGranuleId())); + BlobsSizeInProcessing -= granule.GetBlobsDataSize(); + Y_VERIFY(BlobsSizeInProcessing >= 0); } void Wakeup(TGranule& granule) { diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp index e970eb52677..dd1074db486 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -120,4 +120,9 @@ void TGranule::CheckReady() { } } +void TGranule::OnBlobReady(const TBlobRange& range) noexcept { + BlobsDataSize += range.Size; + Owner->OnBlobReady(GranuleId, range); +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h index 63657a469d0..e8afb9436d0 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.h +++ b/ydb/core/tx/columnshard/engines/reader/granule.h @@ -28,7 +28,7 @@ private: std::set<ui32> GranuleBatchNumbers; TGranulesFillingContext* Owner = nullptr; THashSet<const void*> BatchesToDedup; - + ui64 BlobsDataSize = 0; void CheckReady(); public: TGranule(const ui64 granuleId, const ui64 granuleIdx, TGranulesFillingContext& owner) @@ -37,6 +37,10 @@ public: , Owner(&owner) { } + ui64 GetBlobsDataSize() const noexcept { + return BlobsDataSize; + } + ui64 GetGranuleId() const noexcept { return GranuleId; } @@ -124,6 +128,7 @@ public: const std::set<ui32>& GetEarlyFilterColumns() const; void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch); + void OnBlobReady(const TBlobRange& range) noexcept; bool OnFilterReady(TBatch& batchInfo); TBatch& AddBatch(const TPortionInfo& portionInfo); void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const; |