aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2024-11-20 17:02:24 +0300
committerGitHub <noreply@github.com>2024-11-20 17:02:24 +0300
commit1c150358435d98e771c8998fde124bcabe04cb3f (patch)
treec325f5578f0d9e1a10ea3e3c8b80893e9305e53a
parent02fbb27671fe11ce5f18fae394be48715f00b95d (diff)
downloadydb-1c150358435d98e771c8998fde124bcabe04cb3f.tar.gz
accessors memory control for scan (#11792)
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/columns_set.h7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp25
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h10
5 files changed, 38 insertions, 14 deletions
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/columns_set.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/columns_set.h
index 23514aff1d..a0a3df00d0 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/columns_set.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/columns_set.h
@@ -15,9 +15,10 @@ enum class EMemType {
};
enum class EStageFeaturesIndexes {
- Filter = 0,
- Fetching = 1,
- Merge = 2
+ Accessors = 0,
+ Filter = 1,
+ Fetching = 2,
+ Merge = 3
};
class TIndexesSet {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp
index 28d31123eb..5e7be055cb 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp
@@ -27,7 +27,10 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::GetColumnsFetchingPlan(con
if (source->NeedAccessorsFetching()) {
if (!AskAccumulatorsScript) {
AskAccumulatorsScript = std::make_shared<TFetchingScript>(*this);
- AskAccumulatorsScript->AddStep(std::make_shared<TPortionAccessorFetchingStep>());
+ if (ui64 size = source->PredictAccessorsMemory()) {
+ AskAccumulatorsScript->AddStep<TAllocateMemoryStep>(size, EStageFeaturesIndexes::Accessors);
+ }
+ AskAccumulatorsScript->AddStep<TPortionAccessorFetchingStep>();
}
AskAccumulatorsScript->AddStep<TDetectInMem>(*FFColumns);
return AskAccumulatorsScript;
@@ -54,8 +57,8 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::GetColumnsFetchingPlan(con
}
}
{
- auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0]
- [useIndexes ? 1 : 0][needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0];
+ auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
+ [needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0];
if (!result) {
TGuard<TMutex> wg(Mutex);
result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
@@ -274,11 +277,11 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext)
: CommonContext(commonContext) {
-
ReadMetadata = dynamic_pointer_cast<const TReadMetadata>(CommonContext->GetReadMetadata());
Y_ABORT_UNLESS(ReadMetadata);
Y_ABORT_UNLESS(ReadMetadata->SelectInfo);
+ double kffAccessors = 0.01;
double kffFilter = 0.45;
double kffFetching = 0.45;
double kffMerge = 0.10;
@@ -287,15 +290,19 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
stagePrefix = "EF";
kffFilter = 0.7;
kffFetching = 0.15;
- kffMerge = 0.15;
+ kffMerge = 0.14;
+ kffAccessors = 0.01;
} else {
stagePrefix = "FO";
kffFilter = 0.1;
kffFetching = 0.75;
- kffMerge = 0.15;
+ kffMerge = 0.14;
+ kffAccessors = 0.01;
}
- std::vector<std::shared_ptr<NGroupedMemoryManager::TStageFeatures>> stages = {
+ std::vector<std::shared_ptr<NGroupedMemoryManager::TStageFeatures>> stages = {
+ NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(
+ stagePrefix + "::ACCESSORS", kffAccessors * TGlobalLimits::ScanMemoryLimit),
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(
stagePrefix + "::FILTER", kffFilter * TGlobalLimits::ScanMemoryLimit),
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(
@@ -304,8 +311,8 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
};
ProcessMemoryGuard =
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(CommonContext->GetReadMetadata()->GetTxId(), stages);
- ProcessScopeGuard =
- NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId());
+ ProcessScopeGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(
+ CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId());
auto readSchema = ReadMetadata->GetResultSchema();
SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema);
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp
index f09e0e5b9a..fbeaba7f09 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp
@@ -200,7 +200,7 @@ void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(cons
TConclusion<bool> TAllocateMemoryStep::DoExecuteInplace(
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
- ui64 size = 0;
+ ui64 size = PredefinedSize.value_or(0);
for (auto&& i : Packs) {
ui32 sizeLocal = source->GetColumnsVolume(i.GetColumns().GetColumnIds(), i.GetMemType());
if (source->GetStageData().GetUseFilter() && source->GetContext()->GetReadMetadata()->Limit && i.GetMemType() != EMemType::Blob) {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h
index f9e5774c4a..f630c36eba 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h
@@ -227,6 +227,7 @@ private:
std::vector<TColumnsPack> Packs;
THashMap<ui32, THashSet<EMemType>> Control;
const EStageFeaturesIndexes StageIndex;
+ std::optional<ui64> PredefinedSize;
protected:
class TFetchingStepAllocation: public NGroupedMemoryManager::IAllocation {
@@ -238,6 +239,7 @@ protected:
virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;
virtual void DoOnAllocationImpossible(const TString& errorMessage) override;
+
public:
TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step);
};
@@ -266,6 +268,12 @@ public:
, StageIndex(stageIndex) {
AddAllocation(columns, memType);
}
+
+ TAllocateMemoryStep(const ui64 size, const EStageFeaturesIndexes stageIndex)
+ : TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex))
+ , StageIndex(stageIndex)
+ , PredefinedSize(size) {
+ }
};
class TDetectInMemStep: public IFetchingStep {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h
index 563dab4477..fe164bd212 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h
@@ -77,7 +77,7 @@ protected:
public:
virtual bool NeedAccessorsForRead() const = 0;
virtual bool NeedAccessorsFetching() const = 0;
-
+ virtual ui64 PredictAccessorsMemory() const = 0;
bool StartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) {
return DoStartFetchingAccessor(sourcePtr, step);
}
@@ -318,6 +318,10 @@ private:
virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) override;
public:
+ virtual ui64 PredictAccessorsMemory() const override {
+ return Portion->GetApproxChunksCount(GetContext()->GetCommonContext()->GetReadMetadata()->GetResultSchema()->GetColumnsCount()) * sizeof(TColumnRecord);
+ }
+
virtual bool NeedAccessorsForRead() const override {
return true;
}
@@ -430,6 +434,10 @@ private:
}
public:
+ virtual ui64 PredictAccessorsMemory() const override {
+ return 0;
+ }
+
virtual bool NeedAccessorsForRead() const override {
return false;
}