diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2024-11-20 17:02:24 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-20 17:02:24 +0300 |
commit | 1c150358435d98e771c8998fde124bcabe04cb3f (patch) | |
tree | c325f5578f0d9e1a10ea3e3c8b80893e9305e53a | |
parent | 02fbb27671fe11ce5f18fae394be48715f00b95d (diff) | |
download | ydb-1c150358435d98e771c8998fde124bcabe04cb3f.tar.gz |
accessors memory control for scan (#11792)
5 files changed, 38 insertions, 14 deletions
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/columns_set.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/columns_set.h index 23514aff1d..a0a3df00d0 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/columns_set.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/columns_set.h @@ -15,9 +15,10 @@ enum class EMemType { }; enum class EStageFeaturesIndexes { - Filter = 0, - Fetching = 1, - Merge = 2 + Accessors = 0, + Filter = 1, + Fetching = 2, + Merge = 3 }; class TIndexesSet { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp index 28d31123eb..5e7be055cb 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp @@ -27,7 +27,10 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::GetColumnsFetchingPlan(con if (source->NeedAccessorsFetching()) { if (!AskAccumulatorsScript) { AskAccumulatorsScript = std::make_shared<TFetchingScript>(*this); - AskAccumulatorsScript->AddStep(std::make_shared<TPortionAccessorFetchingStep>()); + if (ui64 size = source->PredictAccessorsMemory()) { + AskAccumulatorsScript->AddStep<TAllocateMemoryStep>(size, EStageFeaturesIndexes::Accessors); + } + AskAccumulatorsScript->AddStep<TPortionAccessorFetchingStep>(); } AskAccumulatorsScript->AddStep<TDetectInMem>(*FFColumns); return AskAccumulatorsScript; @@ -54,8 +57,8 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::GetColumnsFetchingPlan(con } } { - auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0] - [useIndexes ? 1 : 0][needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0]; + auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0] + [needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0]; if (!result) { TGuard<TMutex> wg(Mutex); result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0] @@ -274,11 +277,11 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext) : CommonContext(commonContext) { - ReadMetadata = dynamic_pointer_cast<const TReadMetadata>(CommonContext->GetReadMetadata()); Y_ABORT_UNLESS(ReadMetadata); Y_ABORT_UNLESS(ReadMetadata->SelectInfo); + double kffAccessors = 0.01; double kffFilter = 0.45; double kffFetching = 0.45; double kffMerge = 0.10; @@ -287,15 +290,19 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co stagePrefix = "EF"; kffFilter = 0.7; kffFetching = 0.15; - kffMerge = 0.15; + kffMerge = 0.14; + kffAccessors = 0.01; } else { stagePrefix = "FO"; kffFilter = 0.1; kffFetching = 0.75; - kffMerge = 0.15; + kffMerge = 0.14; + kffAccessors = 0.01; } - std::vector<std::shared_ptr<NGroupedMemoryManager::TStageFeatures>> stages = { + std::vector<std::shared_ptr<NGroupedMemoryManager::TStageFeatures>> stages = { + NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures( + stagePrefix + "::ACCESSORS", kffAccessors * TGlobalLimits::ScanMemoryLimit), NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures( stagePrefix + "::FILTER", kffFilter * TGlobalLimits::ScanMemoryLimit), NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures( @@ -304,8 +311,8 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co }; ProcessMemoryGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(CommonContext->GetReadMetadata()->GetTxId(), stages); - ProcessScopeGuard = - NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId()); + ProcessScopeGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard( + CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId()); auto readSchema = ReadMetadata->GetResultSchema(); SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp index f09e0e5b9a..fbeaba7f09 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp @@ -200,7 +200,7 @@ void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(cons TConclusion<bool> TAllocateMemoryStep::DoExecuteInplace( const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const { - ui64 size = 0; + ui64 size = PredefinedSize.value_or(0); for (auto&& i : Packs) { ui32 sizeLocal = source->GetColumnsVolume(i.GetColumns().GetColumnIds(), i.GetMemType()); if (source->GetStageData().GetUseFilter() && source->GetContext()->GetReadMetadata()->Limit && i.GetMemType() != EMemType::Blob) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h index f9e5774c4a..f630c36eba 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h @@ -227,6 +227,7 @@ private: std::vector<TColumnsPack> Packs; THashMap<ui32, THashSet<EMemType>> Control; const EStageFeaturesIndexes StageIndex; + std::optional<ui64> PredefinedSize; protected: class TFetchingStepAllocation: public NGroupedMemoryManager::IAllocation { @@ -238,6 +239,7 @@ protected: virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard, const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override; virtual void DoOnAllocationImpossible(const TString& errorMessage) override; + public: TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step); }; @@ -266,6 +268,12 @@ public: , StageIndex(stageIndex) { AddAllocation(columns, memType); } + + TAllocateMemoryStep(const ui64 size, const EStageFeaturesIndexes stageIndex) + : TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex)) + , StageIndex(stageIndex) + , PredefinedSize(size) { + } }; class TDetectInMemStep: public IFetchingStep { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h index 563dab4477..fe164bd212 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h @@ -77,7 +77,7 @@ protected: public: virtual bool NeedAccessorsForRead() const = 0; virtual bool NeedAccessorsFetching() const = 0; - + virtual ui64 PredictAccessorsMemory() const = 0; bool StartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) { return DoStartFetchingAccessor(sourcePtr, step); } @@ -318,6 +318,10 @@ private: virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) override; public: + virtual ui64 PredictAccessorsMemory() const override { + return Portion->GetApproxChunksCount(GetContext()->GetCommonContext()->GetReadMetadata()->GetResultSchema()->GetColumnsCount()) * sizeof(TColumnRecord); + } + virtual bool NeedAccessorsForRead() const override { return true; } @@ -430,6 +434,10 @@ private: } public: + virtual ui64 PredictAccessorsMemory() const override { + return 0; + } + virtual bool NeedAccessorsForRead() const override { return false; } |