diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2025-04-17 22:16:03 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-17 22:16:03 +0300 |
commit | 1f62eb9b72a10d093a2acd13c434a8a94fa695bc (patch) | |
tree | 620d5cb541e414e7526588efc7a28e3824cf504f | |
parent | 471d8acb121041385c1b6a34c20b8ed4ad37c9a2 (diff) | |
download | ydb-1f62eb9b72a10d093a2acd13c434a8a94fa695bc.tar.gz |
memory control for graph nodes with specialities (#17301)
24 files changed, 320 insertions, 102 deletions
diff --git a/ydb/core/formats/arrow/program/abstract.h b/ydb/core/formats/arrow/program/abstract.h index 9ca8a96f908..32e73482e96 100644 --- a/ydb/core/formats/arrow/program/abstract.h +++ b/ydb/core/formats/arrow/program/abstract.h @@ -13,6 +13,47 @@ class TAccessorsCollection; namespace NKikimr::NArrow::NSSA { +class IMemoryCalculationPolicy { +public: + enum class EStage { + Accessors = 0 /* "ACCESSORS" */, + Filter = 1 /* "FILTER" */, + Fetching = 2 /* "FETCHING" */, + Merge = 3 /* "MERGE" */ + }; + + virtual ~IMemoryCalculationPolicy() = default; + + virtual EStage GetStage() const = 0; + virtual ui64 GetReserveMemorySize( + const ui64 blobsSize, const ui64 rawSize, const std::optional<ui32> limit, const ui32 recordsCount) const = 0; +}; + +class TFilterCalculationPolicy: public IMemoryCalculationPolicy { +public: + virtual EStage GetStage() const override { + return EStage::Filter; + } + virtual ui64 GetReserveMemorySize( + const ui64 blobsSize, const ui64 /*rawSize*/, const std::optional<ui32> /*limit*/, const ui32 /*recordsCount*/) const override { + return blobsSize; + } +}; + +class TFetchingCalculationPolicy: public IMemoryCalculationPolicy { +public: + virtual EStage GetStage() const override { + return EStage::Fetching; + } + virtual ui64 GetReserveMemorySize(const ui64 blobsSize, const ui64 rawSize, const std::optional<ui32> limit, const ui32 recordsCount) const override { + if (limit) { + return std::max<ui64>(blobsSize, rawSize * (1.0 * *limit) / recordsCount); + } else { + return std::max<ui64>(blobsSize, rawSize); + } + } +}; + class TIndexCheckOperation { public: enum class EOperation : ui32 { @@ -214,7 +255,8 @@ enum class EProcessorType { AssembleOriginalData, CheckIndexData, CheckHeaderData, - StreamLogic + StreamLogic, + ReserveMemory }; class TFetchingInfo { @@ -303,6 +345,13 @@ public: Input.emplace_back(TColumnChainInfo(resourceId)); } + void AddOutput(const ui32 resourceId) { + for (auto&& i : Output) { + AFL_VERIFY(i.GetColumnId() != resourceId); + } + Output.emplace_back(TColumnChainInfo(resourceId)); + } + void RemoveInput(const ui32 resourceId) { for (ui32 idx = 0; idx < Input.size(); ++idx) { if (Input[idx].GetColumnId() == resourceId) { diff --git a/ydb/core/formats/arrow/program/execution.h b/ydb/core/formats/arrow/program/execution.h index 4be60989c84..48b454f052a 100644 --- a/ydb/core/formats/arrow/program/execution.h +++ b/ydb/core/formats/arrow/program/execution.h @@ -276,9 +276,23 @@ private: virtual TConclusion<bool> DoStartFetch( const NArrow::NSSA::TProcessorContext& context, const std::vector<std::shared_ptr<NArrow::NSSA::IFetchLogic>>& fetchers) = 0; + virtual TConclusion<bool> DoStartReserveMemory(const NArrow::NSSA::TProcessorContext& /*context*/, + const THashMap<ui32, IDataSource::TDataAddress>& /*columns*/, const THashMap<ui32, IDataSource::TFetchIndexContext>& /*indexes*/, + const THashMap<ui32, IDataSource::TFetchHeaderContext>& /*headers*/, + const std::shared_ptr<NArrow::NSSA::IMemoryCalculationPolicy>& /*policy*/) { + return false; + } + public: virtual ~IDataSource() = default; + TConclusion<bool> StartReserveMemory(const NArrow::NSSA::TProcessorContext& context, + const THashMap<ui32, IDataSource::TDataAddress>& columns, const THashMap<ui32, IDataSource::TFetchIndexContext>& indexes, + const THashMap<ui32, IDataSource::TFetchHeaderContext>& headers, const std::shared_ptr<NArrow::NSSA::IMemoryCalculationPolicy>& policy) { + AFL_VERIFY(policy); + return DoStartReserveMemory(context, columns, indexes, headers, policy); + } + TConclusion<bool> StartFetch( const NArrow::NSSA::TProcessorContext& context, const std::vector<std::shared_ptr<NArrow::NSSA::IFetchLogic>>& fetchers) { return DoStartFetch(context, fetchers); diff --git a/ydb/core/formats/arrow/program/graph_execute.cpp b/ydb/core/formats/arrow/program/graph_execute.cpp index 6d1a3ebf6be..8628efe7473 100644 --- a/ydb/core/formats/arrow/program/graph_execute.cpp +++ b/ydb/core/formats/arrow/program/graph_execute.cpp @@ -104,7 +104,7 @@ TCompiledGraph::TCompiledGraph(const NOptimization::TGraph& original, const ICol if (i.second->GetProcessor()->GetProcessorType() == EProcessorType::Filter) { AFL_VERIFY(!IsFilterRoot(i.second->GetIdentifier())); FilterRoot.emplace_back(i.second); - } else if (i.second->GetProcessor()->GetProcessorType() != EProcessorType::Const) { + } else if (i.second->GetProcessor()->GetProcessorType() == EProcessorType::Projection) { AFL_VERIFY(!ResultRoot)("debug", DebugDOT()); ResultRoot = i.second; } else { @@ -124,6 +124,9 @@ TCompiledGraph::TCompiledGraph(const NOptimization::TGraph& original, const ICol for (; it->IsValid(); it->Next()) { it->MutableCurrentNode().SetSequentialIdx(currentIndex); for (auto&& i : it->GetProcessorVerified()->GetInput()) { + if (!i.GetColumnId()) { + continue; + } if (resolver.HasColumn(i.GetColumnId())) { if (IsFilterRoot(it->GetCurrentGraphNode()->GetIdentifier())) { FilterColumns.emplace(i.GetColumnId()); @@ -133,6 +136,9 @@ TCompiledGraph::TCompiledGraph(const NOptimization::TGraph& original, const ICol usage[i.GetColumnId()].InUsage(currentIndex); } for (auto&& i : it->GetProcessorVerified()->GetOutput()) { + if (!i.GetColumnId()) { + continue; + } usage[i.GetColumnId()].Constructed(currentIndex); } sortedNodes.emplace_back(&it->MutableCurrentNode()); diff --git a/ydb/core/formats/arrow/program/graph_optimization.cpp b/ydb/core/formats/arrow/program/graph_optimization.cpp index 56f9cbbb51c..43e547b1797 100644 --- a/ydb/core/formats/arrow/program/graph_optimization.cpp +++ b/ydb/core/formats/arrow/program/graph_optimization.cpp @@ -5,6 +5,7 @@ #include "header.h" #include "index.h" #include "original.h" +#include "reserve.h" #include "stream_logic.h" #include <ydb/library/arrow_kernels/operations.h> @@ -206,7 +207,8 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) { } if (i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetDataAddresses().size() + i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetIndexContext().size() + - i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetHeaderContext().size() > 1) { + i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetHeaderContext().size() > + 1) { continue; } if (i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetDataAddresses().size()) { @@ -220,6 +222,7 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) { } } bool changed = false; + TGraphNode* nodeFetch = nullptr; if (dataAddresses.size() > 1) { THashSet<ui32> columnIds; for (auto&& i : dataAddresses) { @@ -231,16 +234,32 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) { proc->Add(addr.second); } } - auto nodeFetch = AddNode(proc); + nodeFetch = AddNode(proc).get(); FetchersMerged.emplace(nodeFetch->GetIdentifier()); for (auto&& i : dataAddresses) { for (auto&& to : i->GetOutputEdges()) { - AddEdge(nodeFetch.get(), to.second, to.first.GetResourceId()); + AddEdge(nodeFetch, to.second, to.first.GetResourceId()); } RemoveNode(i->GetIdentifier()); } changed = true; + } else if (dataAddresses.size() == 1) { + nodeFetch = dataAddresses.front(); } + if (nodeFetch) { + std::shared_ptr<IMemoryCalculationPolicy> policy; + if (baseNode->Is(EProcessorType::Filter)) { + policy = std::make_shared<TFilterCalculationPolicy>(); + } else if (baseNode->Is(EProcessorType::Projection)) { + policy = std::make_shared<TFetchingCalculationPolicy>(); + } + auto reserveMemory = std::make_shared<TReserveMemoryProcessor>(*nodeFetch->GetProcessorAs<TOriginalColumnDataProcessor>(), policy); + auto nodeReserve = AddNode(reserveMemory); + nodeReserve->GetProcessor()->AddOutput(0); + nodeFetch->GetProcessor()->AddInput(0); + AddEdge(nodeReserve.get(), nodeFetch, 0); + } + if (indexes.size() + headers.size() > 1) { THashSet<ui32> columnIds; for (auto&& i : indexes) { diff --git a/ydb/core/formats/arrow/program/reserve.cpp b/ydb/core/formats/arrow/program/reserve.cpp new file mode 100644 index 00000000000..f10c8760200 --- /dev/null +++ b/ydb/core/formats/arrow/program/reserve.cpp @@ -0,0 +1,6 @@ +#include "execution.h" +#include "original.h" + +namespace NKikimr::NArrow::NSSA { + +} // namespace NKikimr::NArrow::NSSA diff --git a/ydb/core/formats/arrow/program/reserve.h b/ydb/core/formats/arrow/program/reserve.h new file mode 100644 index 00000000000..0ecadb8b300 --- /dev/null +++ b/ydb/core/formats/arrow/program/reserve.h @@ -0,0 +1,74 @@ +#pragma once +#include "abstract.h" +#include "original.h" + +namespace NKikimr::NArrow::NSSA { + +class TReserveMemoryProcessor: public IResourceProcessor { +private: + using TBase = IResourceProcessor; + + THashMap<ui32, IDataSource::TDataAddress> DataAddresses; + THashMap<ui32, IDataSource::TFetchIndexContext> IndexContext; + THashMap<ui32, IDataSource::TFetchHeaderContext> HeaderContext; + std::shared_ptr<IMemoryCalculationPolicy> Policy; + + virtual NJson::TJsonValue DoDebugJson() const override { + NJson::TJsonValue result = NJson::JSON_MAP; + if (DataAddresses.size()) { + auto& arrAddr = result.InsertValue("data", NJson::JSON_ARRAY); + for (auto&& i : DataAddresses) { + arrAddr.AppendValue(i.second.DebugJson()); + } + } + if (IndexContext.size()) { + auto& indexesArr = result.InsertValue("indexes", NJson::JSON_ARRAY); + for (auto&& i : IndexContext) { + indexesArr.AppendValue(i.second.DebugJson()); + } + } + if (HeaderContext.size()) { + auto& headersArr = result.InsertValue("headers", NJson::JSON_ARRAY); + for (auto&& i : HeaderContext) { + headersArr.AppendValue(i.second.DebugJson()); + } + } + return result; + } + + virtual TConclusion<EExecutionResult> DoExecute(const TProcessorContext& context, const TExecutionNodeContext& /*nodeContext*/) const override { + auto source = context.GetDataSource().lock(); + if (!source) { + return TConclusionStatus::Fail("source was destroyed before (original fetch start)"); + } + auto conclusion = source->StartReserveMemory(context, DataAddresses, IndexContext, HeaderContext, Policy); + if (conclusion.IsFail()) { + return conclusion; + } else if (conclusion.GetResult()) { + return EExecutionResult::InBackground; + } else { + return EExecutionResult::Success; + } + } + + virtual bool IsAggregation() const override { + return false; + } + + virtual ui64 DoGetWeight() const override { + return 0; + } + +public: + TReserveMemoryProcessor(const TOriginalColumnDataProcessor& original, const std::shared_ptr<IMemoryCalculationPolicy>& policy) + : TBase({}, {}, EProcessorType::ReserveMemory) + , DataAddresses(original.GetDataAddresses()) + , IndexContext(original.GetIndexContext()) + , HeaderContext(original.GetHeaderContext()) + , Policy(policy) + { + AFL_VERIFY(policy); + } +}; + +} // namespace NKikimr::NArrow::NSSA diff --git a/ydb/core/formats/arrow/program/ya.make b/ydb/core/formats/arrow/program/ya.make index 114660d3251..720b0f0b318 100644 --- a/ydb/core/formats/arrow/program/ya.make +++ b/ydb/core/formats/arrow/program/ya.make @@ -48,6 +48,7 @@ SRCS( assign_internal.cpp custom_registry.cpp GLOBAL kernel_logic.cpp + reserve.cpp ) GENERATE_ENUM_SERIALIZATION(abstract.h) diff --git a/ydb/core/formats/arrow/ut/ut_program_step.cpp b/ydb/core/formats/arrow/ut/ut_program_step.cpp index 7a6c52b5c66..8f3f403a2af 100644 --- a/ydb/core/formats/arrow/ut/ut_program_step.cpp +++ b/ydb/core/formats/arrow/ut/ut_program_step.cpp @@ -594,7 +594,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) { builder.Add(std::make_shared<TProjectionProcessor>(TColumnChainInfo::BuildVector({ 1, 2 }))); auto chain = builder.Finish().DetachResult(); Cerr << chain->DebugDOT() << Endl; - AFL_VERIFY(chain->DebugStats() == "[TOTAL:Const:2;Calculation:4;Projection:1;Filter:1;FetchOriginalData:2;AssembleOriginalData:3;CheckIndexData:1;StreamLogic:1;];SUB:[AssembleOriginalData:1;];")("debug", chain->DebugStats()); + AFL_VERIFY(chain->DebugStats() == "[TOTAL:Const:2;Calculation:4;Projection:1;Filter:1;FetchOriginalData:2;AssembleOriginalData:3;CheckIndexData:1;StreamLogic:1;ReserveMemory:1;];SUB:[AssembleOriginalData:1;];")("debug", chain->DebugStats()); } Y_UNIT_TEST(Projection) { diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 5e520d81b03..f2b2db17e69 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -559,6 +559,8 @@ void TKikimrRunner::Initialize(const TKikimrSettings& settings) { SetupLogLevelFromTestParam(NKikimrServices::TX_COLUMNSHARD); SetupLogLevelFromTestParam(NKikimrServices::TX_COLUMNSHARD_SCAN); SetupLogLevelFromTestParam(NKikimrServices::LOCAL_PGWIRE); + SetupLogLevelFromTestParam(NKikimrServices::SSA_GRAPH_EXECUTION); + RunCall([this, domain = settings.DomainRoot]{ this->Client->InitRootScheme(domain); @@ -1556,7 +1558,7 @@ NJson::TJsonValue SimplifyPlan(NJson::TJsonValue& opt, const TGetPlanParams& par opName.find("Join") != TString::npos || opName.find("Union") != TString::npos || (opName.find("Filter") != TString::npos && params.IncludeFilters) || - (opName.find("HashShuffle") != TString::npos && params.IncludeShuffles) + (opName.find("HashShuffle") != TString::npos && params.IncludeShuffles) ) { NJson::TJsonValue newChildren; diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h index a2165252243..4a14d95727f 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h @@ -14,13 +14,6 @@ enum class EMemType { RawSequential }; -enum class EStageFeaturesIndexes { - Accessors = 0, - Filter = 1, - Fetching = 2, - Merge = 3 -}; - class TIndexesSet { private: YDB_READONLY_DEF(std::vector<ui32>, IndexIds); diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp index 6f9219bc9f1..3559e599b05 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp @@ -48,25 +48,28 @@ bool TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocated(std::shared_ptr guard->Release(); return false; } - if (StageIndex == EStageFeaturesIndexes::Accessors) { + if (StageIndex == NArrow::NSSA::IMemoryCalculationPolicy::EStage::Accessors) { data->MutableStageData().SetAccessorsGuard(std::move(guard)); } else { data->RegisterAllocationGuard(std::move(guard)); } - Step.Next(); + if (NeedNextStep) { + Step.Next(); + } FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, data->AddEvent("fmalloc")); auto task = std::make_shared<TStepAction>(data, std::move(Step), data->GetContext()->GetCommonContext()->GetScanActorId(), false); NConveyor::TScanServiceOperator::SendTaskToExecute(task, data->GetContext()->GetCommonContext()->GetConveyorProcessId()); return true; } -TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation( - const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step, const EStageFeaturesIndexes stageIndex) +TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, + const TFetchingScriptCursor& step, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stageIndex, const bool needNextStep) : TBase(mem) , Source(source) , Step(step) , TasksGuard(source->GetContext()->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard()) - , StageIndex(stageIndex) { + , StageIndex(stageIndex) + , NeedNextStep(needNextStep) { } void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(const TString& errorMessage) { @@ -75,6 +78,8 @@ void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(cons FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, sourcePtr->AddEvent("fail_malloc")); sourcePtr->GetContext()->GetCommonContext()->AbortWithError( "cannot allocate memory for step " + Step.GetName() + ": '" + errorMessage + "'"); + } else { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "allocation_impossible")("error", errorMessage); } } diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h index 0123678d75c..3b8abd40ea4 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h @@ -21,25 +21,10 @@ private: }; std::vector<TColumnsPack> Packs; THashMap<ui32, THashSet<EMemType>> Control; - const EStageFeaturesIndexes StageIndex; + const NArrow::NSSA::IMemoryCalculationPolicy::EStage StageIndex; const std::optional<ui64> PredefinedSize; protected: - class TFetchingStepAllocation: public NGroupedMemoryManager::IAllocation { - private: - using TBase = NGroupedMemoryManager::IAllocation; - std::weak_ptr<IDataSource> Source; - TFetchingScriptCursor Step; - NColumnShard::TCounterGuard TasksGuard; - const EStageFeaturesIndexes StageIndex; - virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard, - const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override; - virtual void DoOnAllocationImpossible(const TString& errorMessage) override; - - public: - TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step, - const EStageFeaturesIndexes stageIndex); - }; virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override; virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override; virtual TString DoDebugString() const override { @@ -53,6 +38,22 @@ protected: } public: + class TFetchingStepAllocation: public NGroupedMemoryManager::IAllocation { + private: + using TBase = NGroupedMemoryManager::IAllocation; + std::weak_ptr<IDataSource> Source; + TFetchingScriptCursor Step; + NColumnShard::TCounterGuard TasksGuard; + const NArrow::NSSA::IMemoryCalculationPolicy::EStage StageIndex; + const bool NeedNextStep; + virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard, + const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override; + virtual void DoOnAllocationImpossible(const TString& errorMessage) override; + + public: + TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step, + const NArrow::NSSA::IMemoryCalculationPolicy::EStage stageIndex, const bool needNextStep = true); + }; void AddAllocation(const TColumnsSetIds& ids, const EMemType memType) { if (!ids.GetColumnsCount()) { return; @@ -62,17 +63,17 @@ public: } Packs.emplace_back(ids, memType); } - EStageFeaturesIndexes GetStage() const { + NArrow::NSSA::IMemoryCalculationPolicy::EStage GetStage() const { return StageIndex; } - TAllocateMemoryStep(const TColumnsSetIds& columns, const EMemType memType, const EStageFeaturesIndexes stageIndex) + TAllocateMemoryStep(const TColumnsSetIds& columns, const EMemType memType, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stageIndex) : TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex)) , StageIndex(stageIndex) { AddAllocation(columns, memType); } - TAllocateMemoryStep(const ui64 memSize, const EStageFeaturesIndexes stageIndex) + TAllocateMemoryStep(const ui64 memSize, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stageIndex) : TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex)) , StageIndex(stageIndex) , PredefinedSize(memSize) { diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp index 4319b0f944e..b9a686530b9 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp @@ -27,7 +27,7 @@ TConclusionStatus TStepAction::DoExecuteImpl() { return TConclusionStatus::Success(); } auto executeResult = Cursor.Execute(Source); - if (!executeResult) { + if (executeResult.IsFail()) { return executeResult; } if (*executeResult) { @@ -114,7 +114,8 @@ TString TFetchingScript::ProfileDebugString() const { return sb; } -void TFetchingScriptBuilder::AddAllocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType) { +void TFetchingScriptBuilder::AddAllocation( + const std::set<ui32>& entityIds, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stage, const EMemType mType) { if (Steps.size() == 0) { AddStep(std::make_shared<TAllocateMemoryStep>(entityIds, mType, stage)); } else { @@ -157,7 +158,7 @@ TFetchingScriptBuilder::TFetchingScriptBuilder(const TSpecialReadContext& contex : TFetchingScriptBuilder(context.GetReadMetadata()->GetResultSchema(), context.GetMergeColumns()) { } -void TFetchingScriptBuilder::AddFetchingStep(const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) { +void TFetchingScriptBuilder::AddFetchingStep(const TColumnsSetIds& columns, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stage) { auto actualColumns = columns - AddedFetchingColumns; AddedFetchingColumns += columns; if (actualColumns.IsEmpty()) { @@ -175,7 +176,7 @@ void TFetchingScriptBuilder::AddFetchingStep(const TColumnsSetIds& columns, cons } void TFetchingScriptBuilder::AddAssembleStep( - const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential) { + const TColumnsSetIds& columns, const TString& purposeId, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stage, const bool sequential) { auto actualColumns = columns - AddedAssembleColumns; AddedAssembleColumns += columns; if (actualColumns.IsEmpty()) { diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h index ddf067c4132..e8ca12f666d 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h @@ -253,7 +253,7 @@ private: } private: - void AddAllocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType); + void AddAllocation(const std::set<ui32>& entityIds, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stage, const EMemType mType); template <class T, typename... Args> std::shared_ptr<T> InsertStep(const ui32 index, Args... args) { @@ -275,8 +275,9 @@ public: Steps.emplace_back(step); } - void AddFetchingStep(const TColumnsSetIds& columns, const EStageFeaturesIndexes stage); - void AddAssembleStep(const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential); + void AddFetchingStep(const TColumnsSetIds& columns, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stage); + void AddAssembleStep(const TColumnsSetIds& columns, const TString& purposeId, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stage, + const bool sequential); static TFetchingScriptBuilder MakeForTests(ISnapshotSchema::TPtr schema, std::shared_ptr<TColumnsSetIds> guaranteeNotOptional = nullptr) { return TFetchingScriptBuilder(schema, guaranteeNotOptional ? guaranteeNotOptional : std::make_shared<TColumnsSetIds>()); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp index ae2deb2810e..2636db28693 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp @@ -30,7 +30,7 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(c if (!AskAccumulatorsScript) { NCommon::TFetchingScriptBuilder acc(*this); if (ui64 size = source->PredictAccessorsMemory()) { - acc.AddStep(std::make_shared<NCommon::TAllocateMemoryStep>(size, EStageFeaturesIndexes::Accessors)); + acc.AddStep(std::make_shared<NCommon::TAllocateMemoryStep>(size, NArrow::NSSA::IMemoryCalculationPolicy::EStage::Accessors)); } acc.AddStep(std::make_shared<TPortionAccessorFetchingStep>()); acc.AddStep(std::make_shared<TDetectInMem>(*GetFFColumns())); @@ -90,25 +90,25 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c bool hasFilterSharding = false; if (needFilterSharding && !GetShardingColumns()->IsEmpty()) { hasFilterSharding = true; - acc.AddFetchingStep(*GetShardingColumns(), EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetShardingColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); if (!exclusiveSource) { - acc.AddFetchingStep(*GetPKColumns(), EStageFeaturesIndexes::Filter); - acc.AddFetchingStep(*GetSpecColumns(), EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetPKColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); + acc.AddFetchingStep(*GetSpecColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); } - acc.AddAssembleStep(acc.GetAddedFetchingColumns(), "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(acc.GetAddedFetchingColumns(), "SPEC_SHARDING", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false); acc.AddStep(std::make_shared<TShardingFilter>()); } if (!GetEFColumns()->GetColumnsCount() && !partialUsageByPredicate) { acc.SetBranchName("simple"); - acc.AddFetchingStep(*GetFFColumns(), EStageFeaturesIndexes::Fetching); + acc.AddFetchingStep(*GetFFColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching); if (needFilterDeletion) { - acc.AddFetchingStep(*GetDeletionColumns(), EStageFeaturesIndexes::Fetching); + acc.AddFetchingStep(*GetDeletionColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching); } if (needSnapshots) { - acc.AddFetchingStep(*GetSpecColumns(), EStageFeaturesIndexes::Fetching); + acc.AddFetchingStep(*GetSpecColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching); } if (!exclusiveSource) { - acc.AddFetchingStep(*GetMergeColumns(), EStageFeaturesIndexes::Fetching); + acc.AddFetchingStep(*GetMergeColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching); } else { if (acc.GetAddedFetchingColumns().GetColumnsCount() == 1 && GetSpecColumns()->Contains(acc.GetAddedFetchingColumns()) && !hasFilterSharding) { return nullptr; @@ -116,76 +116,77 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c } if (acc.GetAddedFetchingColumns().GetColumnsCount() || hasFilterSharding || needFilterDeletion) { if (needSnapshots) { - acc.AddAssembleStep(*GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Fetching, false); + acc.AddAssembleStep(*GetSpecColumns(), "SPEC", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching, false); } if (!exclusiveSource) { - acc.AddAssembleStep(*GetMergeColumns(), "LAST_PK", EStageFeaturesIndexes::Fetching, false); + acc.AddAssembleStep(*GetMergeColumns(), "LAST_PK", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching, false); } if (needSnapshots) { acc.AddStep(std::make_shared<TSnapshotFilter>()); } if (needFilterDeletion) { - acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Fetching, false); + acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching, false); acc.AddStep(std::make_shared<TDeletionFilter>()); } - acc.AddAssembleStep(acc.GetAddedFetchingColumns().GetColumnIds(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); + acc.AddAssembleStep(acc.GetAddedFetchingColumns().GetColumnIds(), "LAST", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching, + !exclusiveSource); } else { return nullptr; } } else if (exclusiveSource) { acc.SetBranchName("exclusive"); - acc.AddFetchingStep(*GetEFColumns(), EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetEFColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); if (needFilterDeletion) { - acc.AddFetchingStep(*GetDeletionColumns(), EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetDeletionColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); } if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) { - acc.AddFetchingStep(*GetSpecColumns(), EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetSpecColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); } if (partialUsageByPredicate) { - acc.AddFetchingStep(*GetPredicateColumns(), EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetPredicateColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); } AFL_VERIFY(acc.GetAddedFetchingColumns().GetColumnsCount()); if (needFilterDeletion) { - acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false); acc.AddStep(std::make_shared<TDeletionFilter>()); } if (partialUsageByPredicate) { - acc.AddAssembleStep(*GetPredicateColumns(), "PREDICATE", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*GetPredicateColumns(), "PREDICATE", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false); acc.AddStep(std::make_shared<TPredicateFilter>()); } if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) { - acc.AddAssembleStep(*GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*GetSpecColumns(), "SPEC", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false); acc.AddStep(std::make_shared<TSnapshotFilter>()); } else if (GetProgramInputColumns()->Cross(*GetSpecColumns())) { acc.AddStep(std::make_shared<TBuildFakeSpec>()); } - acc.AddFetchingStep(*GetFFColumns(), EStageFeaturesIndexes::Fetching); - acc.AddAssembleStep(*GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); + acc.AddFetchingStep(*GetFFColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching); + acc.AddAssembleStep(*GetFFColumns(), "LAST", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching, !exclusiveSource); } else { acc.SetBranchName("merge"); - acc.AddFetchingStep(*GetMergeColumns(), EStageFeaturesIndexes::Filter); - acc.AddFetchingStep(*GetEFColumns(), EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetMergeColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); + acc.AddFetchingStep(*GetEFColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); if (needFilterDeletion) { - acc.AddFetchingStep(*GetDeletionColumns(), EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetDeletionColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); } AFL_VERIFY(acc.GetAddedFetchingColumns().GetColumnsCount()); - acc.AddAssembleStep(*GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); - acc.AddAssembleStep(*GetPKColumns(), "PK", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*GetSpecColumns(), "SPEC", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false); + acc.AddAssembleStep(*GetPKColumns(), "PK", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false); if (needSnapshots) { acc.AddStep(std::make_shared<TSnapshotFilter>()); } if (needFilterDeletion) { - acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false); acc.AddStep(std::make_shared<TDeletionFilter>()); } if (partialUsageByPredicate) { acc.AddStep(std::make_shared<TPredicateFilter>()); } - acc.AddFetchingStep(*GetFFColumns(), EStageFeaturesIndexes::Fetching); - acc.AddAssembleStep(*GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); + acc.AddFetchingStep(*GetFFColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching); + acc.AddAssembleStep(*GetFFColumns(), "LAST", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching, !exclusiveSource); } acc.AddStep(std::make_shared<NCommon::TBuildStageResultStep>()); return std::move(acc).Build(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h index 4d35be2eca8..9500e54a3cf 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h @@ -13,7 +13,6 @@ namespace NKikimr::NOlap::NReader::NPlain { class IDataSource; using TColumnsSet = NCommon::TColumnsSet; -using EStageFeaturesIndexes = NCommon::EStageFeaturesIndexes; using TColumnsSetIds = NCommon::TColumnsSetIds; using EMemType = NCommon::EMemType; using TFetchingScript = NCommon::TFetchingScript; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h index 1e3fa3a98b5..b2fcbe88b7e 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h @@ -14,7 +14,6 @@ namespace NKikimr::NOlap::NReader::NPlain { using TColumnsSet = NCommon::TColumnsSet; using TIndexesSet = NCommon::TIndexesSet; -using EStageFeaturesIndexes = NCommon::EStageFeaturesIndexes; using TColumnsSetIds = NCommon::TColumnsSetIds; using EMemType = NCommon::EMemType; using TFetchingScriptCursor = NCommon::TFetchingScriptCursor; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp index 7f44376f3ad..a431b81077f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp @@ -20,7 +20,7 @@ void TFetchingInterval::ConstructResult() { auto task = std::make_shared<TStartMergeTask>(MergingContext, Context, std::move(Sources)); task->SetPriority(NConveyor::ITask::EPriority::High); NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(Context->GetProcessMemoryControlId(), - Context->GetCommonContext()->GetScanId(), GetIntervalId(), { task }, (ui32)EStageFeaturesIndexes::Merge); + Context->GetCommonContext()->GetScanId(), GetIntervalId(), { task }, (ui32)NArrow::NSSA::IMemoryCalculationPolicy::EStage::Merge); } } @@ -83,7 +83,7 @@ void TFetchingInterval::OnPartSendingComplete() { auto task = std::make_shared<TContinueMergeTask>(MergingContext, Context, std::move(Merger)); task->SetPriority(NConveyor::ITask::EPriority::High); NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(Context->GetProcessMemoryControlId(), - Context->GetCommonContext()->GetScanId(), GetIntervalId(), { task }, (ui32)EStageFeaturesIndexes::Merge); + Context->GetCommonContext()->GetScanId(), GetIntervalId(), { task }, (ui32)NArrow::NSSA::IMemoryCalculationPolicy::EStage::Merge); } } // namespace NKikimr::NOlap::NReader::NPlain diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp index b00d2e8a380..59f3601bf55 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp @@ -14,8 +14,8 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(c if (!dontNeedColumns && !source->HasStageData()) { if (!AskAccumulatorsScript) { NCommon::TFetchingScriptBuilder acc(*this); - acc.AddStep(std::make_shared<NCommon::TAllocateMemoryStep> - (source->PredictAccessorsSize(GetFFColumns()->GetColumnIds()), EStageFeaturesIndexes::Accessors)); + acc.AddStep(std::make_shared<NCommon::TAllocateMemoryStep>( + source->PredictAccessorsSize(GetFFColumns()->GetColumnIds()), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Accessors)); acc.AddStep(std::make_shared<TPortionAccessorFetchingStep>()); acc.AddStep(std::make_shared<TDetectInMem>(*GetFFColumns())); AskAccumulatorsScript = std::move(acc).Build(); @@ -64,31 +64,31 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c NCommon::TFetchingScriptBuilder acc(*this); if (needFilterSharding && !GetShardingColumns()->IsEmpty()) { const TColumnsSetIds columnsFetch = *GetShardingColumns(); - acc.AddFetchingStep(columnsFetch, EStageFeaturesIndexes::Filter); - acc.AddAssembleStep(columnsFetch, "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false); + acc.AddFetchingStep(columnsFetch, NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); + acc.AddAssembleStep(columnsFetch, "SPEC_SHARDING", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false); acc.AddStep(std::make_shared<TShardingFilter>()); } { acc.SetBranchName("exclusive"); if (needFilterDeletion) { - acc.AddFetchingStep(*GetDeletionColumns(), EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetDeletionColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); } if (partialUsageByPredicate) { - acc.AddFetchingStep(*GetPredicateColumns(), EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetPredicateColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); } if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) { - acc.AddFetchingStep(*GetSpecColumns(), EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetSpecColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); } if (needFilterDeletion) { - acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false); acc.AddStep(std::make_shared<TDeletionFilter>()); } if (partialUsageByPredicate) { - acc.AddAssembleStep(*GetPredicateColumns(), "PREDICATE", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*GetPredicateColumns(), "PREDICATE", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false); acc.AddStep(std::make_shared<TPredicateFilter>()); } if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) { - acc.AddAssembleStep(*GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*GetSpecColumns(), "SPEC", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false); acc.AddStep(std::make_shared<TSnapshotFilter>()); } const auto& chainProgram = GetReadMetadata()->GetProgram().GetChainVerified(); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h index f4e3d0ab36c..c1859a3062d 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h @@ -12,7 +12,6 @@ namespace NKikimr::NOlap::NReader::NSimple { class IDataSource; using TColumnsSet = NCommon::TColumnsSet; -using EStageFeaturesIndexes = NCommon::EStageFeaturesIndexes; using TColumnsSetIds = NCommon::TColumnsSetIds; using EMemType = NCommon::EMemType; using TFetchingScript = NCommon::TFetchingScript; diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h index 0a62e6c0a59..d1dcc822b03 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h @@ -15,7 +15,6 @@ namespace NKikimr::NOlap::NReader::NSimple { class IDataSource; using TColumnsSet = NCommon::TColumnsSet; using TIndexesSet = NCommon::TIndexesSet; -using EStageFeaturesIndexes = NCommon::EStageFeaturesIndexes; using TColumnsSetIds = NCommon::TColumnsSetIds; using EMemType = NCommon::EMemType; using TFetchingScriptCursor = NCommon::TFetchingScriptCursor; diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp index d7505745e45..e075573a6ea 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp @@ -8,6 +8,7 @@ #include <ydb/core/tx/columnshard/engines/portions/data_accessor.h> #include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.h> #include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/default_fetching.h> +#include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h> #include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h> #include <ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h> #include <ydb/core/tx/columnshard/engines/storage/indexes/skip_index/meta.h> @@ -428,4 +429,49 @@ TPortionDataSource::TPortionDataSource( , Schema(GetContext()->GetReadMetadata()->GetLoadSchemaVerified(*portion)) { } +TConclusion<bool> TPortionDataSource::DoStartReserveMemory(const NArrow::NSSA::TProcessorContext& context, + const THashMap<ui32, IDataSource::TDataAddress>& columns, const THashMap<ui32, IDataSource::TFetchIndexContext>& /*indexes*/, + const THashMap<ui32, IDataSource::TFetchHeaderContext>& /*headers*/, const std::shared_ptr<NArrow::NSSA::IMemoryCalculationPolicy>& policy) { + class TEntitySize { + private: + YDB_READONLY(ui64, BlobsSize, 0); + YDB_READONLY(ui64, RawSize, 0); + + public: + void Add(const TEntitySize& item) { + Add(item.BlobsSize, item.RawSize); + } + + void Add(const ui64 blob, const ui64 raw) { + BlobsSize += blob; + RawSize += raw; + } + }; + + THashMap<ui32, TEntitySize> sizeByColumn; + for (auto&& [_, info] : columns) { + auto chunks = GetStageData().GetPortionAccessor().GetColumnChunksPointers(info.GetColumnId()); + auto& sizes = sizeByColumn[info.GetColumnId()]; + for (auto&& i : chunks) { + sizes.Add(i->GetBlobRange().GetSize(), i->GetMeta().GetRawBytes()); + } + } + TEntitySize result; + for (auto&& i : sizeByColumn) { + result.Add(i.second); + } + + auto source = context.GetDataSourceVerifiedAs<NCommon::IDataSource>(); + + const ui64 sizeToReserve = policy->GetReserveMemorySize( + result.GetBlobsSize(), result.GetRawSize(), GetContext()->GetReadMetadata()->GetLimitRobustOptional(), GetRecordsCount()); + + auto allocation = std::make_shared<NCommon::TAllocateMemoryStep::TFetchingStepAllocation>( + source, sizeToReserve, GetExecutionContext().GetCursorStep(), policy->GetStage(), false); + FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, AddEvent("mr")); + NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(GetContext()->GetProcessMemoryControlId(), + GetContext()->GetCommonContext()->GetScanId(), GetMemoryGroupId(), { allocation }, (ui32)policy->GetStage()); + return true; +} + } // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h index 22a888aff5f..de0d8faa3b5 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h @@ -328,6 +328,9 @@ private: virtual TConclusion<bool> DoStartFetchImpl( const NArrow::NSSA::TProcessorContext& context, const std::vector<std::shared_ptr<NCommon::IKernelFetchLogic>>& fetchersExt) override; + virtual TConclusion<bool> DoStartReserveMemory(const NArrow::NSSA::TProcessorContext& context, + const THashMap<ui32, IDataSource::TDataAddress>& columns, const THashMap<ui32, IDataSource::TFetchIndexContext>& indexes, + const THashMap<ui32, IDataSource::TFetchHeaderContext>& headers, const std::shared_ptr<NArrow::NSSA::IMemoryCalculationPolicy>& policy) override; virtual TConclusion<std::vector<std::shared_ptr<NArrow::NSSA::IFetchLogic>>> DoStartFetchIndex( const NArrow::NSSA::TProcessorContext& context, const TFetchIndexContext& fetchContext) override; virtual TConclusion<NArrow::TColumnFilter> DoCheckIndex(const NArrow::NSSA::TProcessorContext& context, diff --git a/ydb/core/tx/columnshard/engines/ut/ut_script.cpp b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp index a50388e8bfb..1ee5dd5091f 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_script.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp @@ -26,26 +26,26 @@ Y_UNIT_TEST_SUITE(TestScript) { { 1, NTable::TColumn("c1", 0, NScheme::TTypeInfo(NScheme::NTypeIds::Int32), "") }, { 2, NTable::TColumn("c2", 0, NScheme::TTypeInfo(NScheme::NTypeIds::Int32), "") } })); - acc.AddFetchingStep(std::vector<ui32>({ 0 }), NCommon::EStageFeaturesIndexes::Filter); - acc.AddFetchingStep(std::vector<ui32>({ 0 }), NCommon::EStageFeaturesIndexes::Filter); - acc.AddAssembleStep(std::vector<ui32>({ 0 }), "", NCommon::EStageFeaturesIndexes::Filter, false); + acc.AddFetchingStep(std::vector<ui32>({ 0 }), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); + acc.AddFetchingStep(std::vector<ui32>({ 0 }), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); + acc.AddAssembleStep(std::vector<ui32>({ 0 }), "", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false); acc.AddStep(std::make_shared<NSimple::TDeletionFilter>()); - acc.AddFetchingStep(std::vector<ui32>({ 0, 1 }), NCommon::EStageFeaturesIndexes::Filter); - acc.AddFetchingStep(std::vector<ui32>({ 1, 2 }), NCommon::EStageFeaturesIndexes::Fetching); - acc.AddFetchingStep(std::vector<ui32>({ 0 }), NCommon::EStageFeaturesIndexes::Fetching); - acc.AddAssembleStep(std::vector<ui32>({ 0, 1, 2 }), "", NCommon::EStageFeaturesIndexes::Fetching, false); + acc.AddFetchingStep(std::vector<ui32>({ 0, 1 }), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter); + acc.AddFetchingStep(std::vector<ui32>({ 1, 2 }), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching); + acc.AddFetchingStep(std::vector<ui32>({ 0 }), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching); + acc.AddAssembleStep(std::vector<ui32>({ 0, 1, 2 }), "", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching, false); acc.AddStep(std::make_shared<NSimple::TDeletionFilter>()); - acc.AddFetchingStep(std::vector<ui32>({ 0 }), NCommon::EStageFeaturesIndexes::Merge); + acc.AddFetchingStep(std::vector<ui32>({ 0 }), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Merge); auto script = std::move(acc).Build(); UNIT_ASSERT_STRINGS_EQUAL(script->DebugString(), "{branch:UNDEFINED;steps:[" - "{name=ALLOCATE_MEMORY::Filter;details={stage=Filter;column_ids=[Blob:0,Raw:0];};};" + "{name=ALLOCATE_MEMORY::FILTER;details={stage=FILTER;column_ids=[Blob:0,Raw:0];};};" "{name=FETCHING_COLUMNS;details={columns=0;};};" "{name=ASSEMBLER;details={columns=(column_ids=0;column_names=c0;);;};};" "{name=DELETION;details={};};" - "{name=ALLOCATE_MEMORY::Filter;details={stage=Filter;column_ids=[Blob:1];};};" - "{name=ALLOCATE_MEMORY::Fetching;details={stage=Fetching;column_ids=[Blob:2,Raw:1,Raw:2];};};" + "{name=ALLOCATE_MEMORY::FILTER;details={stage=FILTER;column_ids=[Blob:1];};};" + "{name=ALLOCATE_MEMORY::FETCHING;details={stage=FETCHING;column_ids=[Blob:2,Raw:1,Raw:2];};};" "{name=FETCHING_COLUMNS;details={columns=1,2;};};" "{name=ASSEMBLER;details={columns=(column_ids=1,2;column_names=c1,c2;);;};};" "{name=DELETION;details={};};]}"); |