aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2025-04-17 22:16:03 +0300
committerGitHub <noreply@github.com>2025-04-17 22:16:03 +0300
commit1f62eb9b72a10d093a2acd13c434a8a94fa695bc (patch)
tree620d5cb541e414e7526588efc7a28e3824cf504f
parent471d8acb121041385c1b6a34c20b8ed4ad37c9a2 (diff)
downloadydb-1f62eb9b72a10d093a2acd13c434a8a94fa695bc.tar.gz
memory control for graph nodes with specialities (#17301)
-rw-r--r--ydb/core/formats/arrow/program/abstract.h51
-rw-r--r--ydb/core/formats/arrow/program/execution.h14
-rw-r--r--ydb/core/formats/arrow/program/graph_execute.cpp8
-rw-r--r--ydb/core/formats/arrow/program/graph_optimization.cpp25
-rw-r--r--ydb/core/formats/arrow/program/reserve.cpp6
-rw-r--r--ydb/core/formats/arrow/program/reserve.h74
-rw-r--r--ydb/core/formats/arrow/program/ya.make1
-rw-r--r--ydb/core/formats/arrow/ut/ut_program_step.cpp2
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp15
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h39
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp9
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp61
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp20
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp46
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h3
-rw-r--r--ydb/core/tx/columnshard/engines/ut/ut_script.cpp22
24 files changed, 320 insertions, 102 deletions
diff --git a/ydb/core/formats/arrow/program/abstract.h b/ydb/core/formats/arrow/program/abstract.h
index 9ca8a96f908..32e73482e96 100644
--- a/ydb/core/formats/arrow/program/abstract.h
+++ b/ydb/core/formats/arrow/program/abstract.h
@@ -13,6 +13,47 @@ class TAccessorsCollection;
namespace NKikimr::NArrow::NSSA {
+class IMemoryCalculationPolicy {
+public:
+ enum class EStage {
+ Accessors = 0 /* "ACCESSORS" */,
+ Filter = 1 /* "FILTER" */,
+ Fetching = 2 /* "FETCHING" */,
+ Merge = 3 /* "MERGE" */
+ };
+
+ virtual ~IMemoryCalculationPolicy() = default;
+
+ virtual EStage GetStage() const = 0;
+ virtual ui64 GetReserveMemorySize(
+ const ui64 blobsSize, const ui64 rawSize, const std::optional<ui32> limit, const ui32 recordsCount) const = 0;
+};
+
+class TFilterCalculationPolicy: public IMemoryCalculationPolicy {
+public:
+ virtual EStage GetStage() const override {
+ return EStage::Filter;
+ }
+ virtual ui64 GetReserveMemorySize(
+ const ui64 blobsSize, const ui64 /*rawSize*/, const std::optional<ui32> /*limit*/, const ui32 /*recordsCount*/) const override {
+ return blobsSize;
+ }
+};
+
+class TFetchingCalculationPolicy: public IMemoryCalculationPolicy {
+public:
+ virtual EStage GetStage() const override {
+ return EStage::Fetching;
+ }
+ virtual ui64 GetReserveMemorySize(const ui64 blobsSize, const ui64 rawSize, const std::optional<ui32> limit, const ui32 recordsCount) const override {
+ if (limit) {
+ return std::max<ui64>(blobsSize, rawSize * (1.0 * *limit) / recordsCount);
+ } else {
+ return std::max<ui64>(blobsSize, rawSize);
+ }
+ }
+};
+
class TIndexCheckOperation {
public:
enum class EOperation : ui32 {
@@ -214,7 +255,8 @@ enum class EProcessorType {
AssembleOriginalData,
CheckIndexData,
CheckHeaderData,
- StreamLogic
+ StreamLogic,
+ ReserveMemory
};
class TFetchingInfo {
@@ -303,6 +345,13 @@ public:
Input.emplace_back(TColumnChainInfo(resourceId));
}
+ void AddOutput(const ui32 resourceId) {
+ for (auto&& i : Output) {
+ AFL_VERIFY(i.GetColumnId() != resourceId);
+ }
+ Output.emplace_back(TColumnChainInfo(resourceId));
+ }
+
void RemoveInput(const ui32 resourceId) {
for (ui32 idx = 0; idx < Input.size(); ++idx) {
if (Input[idx].GetColumnId() == resourceId) {
diff --git a/ydb/core/formats/arrow/program/execution.h b/ydb/core/formats/arrow/program/execution.h
index 4be60989c84..48b454f052a 100644
--- a/ydb/core/formats/arrow/program/execution.h
+++ b/ydb/core/formats/arrow/program/execution.h
@@ -276,9 +276,23 @@ private:
virtual TConclusion<bool> DoStartFetch(
const NArrow::NSSA::TProcessorContext& context, const std::vector<std::shared_ptr<NArrow::NSSA::IFetchLogic>>& fetchers) = 0;
+ virtual TConclusion<bool> DoStartReserveMemory(const NArrow::NSSA::TProcessorContext& /*context*/,
+ const THashMap<ui32, IDataSource::TDataAddress>& /*columns*/, const THashMap<ui32, IDataSource::TFetchIndexContext>& /*indexes*/,
+ const THashMap<ui32, IDataSource::TFetchHeaderContext>& /*headers*/,
+ const std::shared_ptr<NArrow::NSSA::IMemoryCalculationPolicy>& /*policy*/) {
+ return false;
+ }
+
public:
virtual ~IDataSource() = default;
+ TConclusion<bool> StartReserveMemory(const NArrow::NSSA::TProcessorContext& context,
+ const THashMap<ui32, IDataSource::TDataAddress>& columns, const THashMap<ui32, IDataSource::TFetchIndexContext>& indexes,
+ const THashMap<ui32, IDataSource::TFetchHeaderContext>& headers, const std::shared_ptr<NArrow::NSSA::IMemoryCalculationPolicy>& policy) {
+ AFL_VERIFY(policy);
+ return DoStartReserveMemory(context, columns, indexes, headers, policy);
+ }
+
TConclusion<bool> StartFetch(
const NArrow::NSSA::TProcessorContext& context, const std::vector<std::shared_ptr<NArrow::NSSA::IFetchLogic>>& fetchers) {
return DoStartFetch(context, fetchers);
diff --git a/ydb/core/formats/arrow/program/graph_execute.cpp b/ydb/core/formats/arrow/program/graph_execute.cpp
index 6d1a3ebf6be..8628efe7473 100644
--- a/ydb/core/formats/arrow/program/graph_execute.cpp
+++ b/ydb/core/formats/arrow/program/graph_execute.cpp
@@ -104,7 +104,7 @@ TCompiledGraph::TCompiledGraph(const NOptimization::TGraph& original, const ICol
if (i.second->GetProcessor()->GetProcessorType() == EProcessorType::Filter) {
AFL_VERIFY(!IsFilterRoot(i.second->GetIdentifier()));
FilterRoot.emplace_back(i.second);
- } else if (i.second->GetProcessor()->GetProcessorType() != EProcessorType::Const) {
+ } else if (i.second->GetProcessor()->GetProcessorType() == EProcessorType::Projection) {
AFL_VERIFY(!ResultRoot)("debug", DebugDOT());
ResultRoot = i.second;
} else {
@@ -124,6 +124,9 @@ TCompiledGraph::TCompiledGraph(const NOptimization::TGraph& original, const ICol
for (; it->IsValid(); it->Next()) {
it->MutableCurrentNode().SetSequentialIdx(currentIndex);
for (auto&& i : it->GetProcessorVerified()->GetInput()) {
+ if (!i.GetColumnId()) {
+ continue;
+ }
if (resolver.HasColumn(i.GetColumnId())) {
if (IsFilterRoot(it->GetCurrentGraphNode()->GetIdentifier())) {
FilterColumns.emplace(i.GetColumnId());
@@ -133,6 +136,9 @@ TCompiledGraph::TCompiledGraph(const NOptimization::TGraph& original, const ICol
usage[i.GetColumnId()].InUsage(currentIndex);
}
for (auto&& i : it->GetProcessorVerified()->GetOutput()) {
+ if (!i.GetColumnId()) {
+ continue;
+ }
usage[i.GetColumnId()].Constructed(currentIndex);
}
sortedNodes.emplace_back(&it->MutableCurrentNode());
diff --git a/ydb/core/formats/arrow/program/graph_optimization.cpp b/ydb/core/formats/arrow/program/graph_optimization.cpp
index 56f9cbbb51c..43e547b1797 100644
--- a/ydb/core/formats/arrow/program/graph_optimization.cpp
+++ b/ydb/core/formats/arrow/program/graph_optimization.cpp
@@ -5,6 +5,7 @@
#include "header.h"
#include "index.h"
#include "original.h"
+#include "reserve.h"
#include "stream_logic.h"
#include <ydb/library/arrow_kernels/operations.h>
@@ -206,7 +207,8 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) {
}
if (i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetDataAddresses().size() +
i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetIndexContext().size() +
- i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetHeaderContext().size() > 1) {
+ i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetHeaderContext().size() >
+ 1) {
continue;
}
if (i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetDataAddresses().size()) {
@@ -220,6 +222,7 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) {
}
}
bool changed = false;
+ TGraphNode* nodeFetch = nullptr;
if (dataAddresses.size() > 1) {
THashSet<ui32> columnIds;
for (auto&& i : dataAddresses) {
@@ -231,16 +234,32 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) {
proc->Add(addr.second);
}
}
- auto nodeFetch = AddNode(proc);
+ nodeFetch = AddNode(proc).get();
FetchersMerged.emplace(nodeFetch->GetIdentifier());
for (auto&& i : dataAddresses) {
for (auto&& to : i->GetOutputEdges()) {
- AddEdge(nodeFetch.get(), to.second, to.first.GetResourceId());
+ AddEdge(nodeFetch, to.second, to.first.GetResourceId());
}
RemoveNode(i->GetIdentifier());
}
changed = true;
+ } else if (dataAddresses.size() == 1) {
+ nodeFetch = dataAddresses.front();
}
+ if (nodeFetch) {
+ std::shared_ptr<IMemoryCalculationPolicy> policy;
+ if (baseNode->Is(EProcessorType::Filter)) {
+ policy = std::make_shared<TFilterCalculationPolicy>();
+ } else if (baseNode->Is(EProcessorType::Projection)) {
+ policy = std::make_shared<TFetchingCalculationPolicy>();
+ }
+ auto reserveMemory = std::make_shared<TReserveMemoryProcessor>(*nodeFetch->GetProcessorAs<TOriginalColumnDataProcessor>(), policy);
+ auto nodeReserve = AddNode(reserveMemory);
+ nodeReserve->GetProcessor()->AddOutput(0);
+ nodeFetch->GetProcessor()->AddInput(0);
+ AddEdge(nodeReserve.get(), nodeFetch, 0);
+ }
+
if (indexes.size() + headers.size() > 1) {
THashSet<ui32> columnIds;
for (auto&& i : indexes) {
diff --git a/ydb/core/formats/arrow/program/reserve.cpp b/ydb/core/formats/arrow/program/reserve.cpp
new file mode 100644
index 00000000000..f10c8760200
--- /dev/null
+++ b/ydb/core/formats/arrow/program/reserve.cpp
@@ -0,0 +1,6 @@
+#include "execution.h"
+#include "original.h"
+
+namespace NKikimr::NArrow::NSSA {
+
+} // namespace NKikimr::NArrow::NSSA
diff --git a/ydb/core/formats/arrow/program/reserve.h b/ydb/core/formats/arrow/program/reserve.h
new file mode 100644
index 00000000000..0ecadb8b300
--- /dev/null
+++ b/ydb/core/formats/arrow/program/reserve.h
@@ -0,0 +1,74 @@
+#pragma once
+#include "abstract.h"
+#include "original.h"
+
+namespace NKikimr::NArrow::NSSA {
+
+class TReserveMemoryProcessor: public IResourceProcessor {
+private:
+ using TBase = IResourceProcessor;
+
+ THashMap<ui32, IDataSource::TDataAddress> DataAddresses;
+ THashMap<ui32, IDataSource::TFetchIndexContext> IndexContext;
+ THashMap<ui32, IDataSource::TFetchHeaderContext> HeaderContext;
+ std::shared_ptr<IMemoryCalculationPolicy> Policy;
+
+ virtual NJson::TJsonValue DoDebugJson() const override {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ if (DataAddresses.size()) {
+ auto& arrAddr = result.InsertValue("data", NJson::JSON_ARRAY);
+ for (auto&& i : DataAddresses) {
+ arrAddr.AppendValue(i.second.DebugJson());
+ }
+ }
+ if (IndexContext.size()) {
+ auto& indexesArr = result.InsertValue("indexes", NJson::JSON_ARRAY);
+ for (auto&& i : IndexContext) {
+ indexesArr.AppendValue(i.second.DebugJson());
+ }
+ }
+ if (HeaderContext.size()) {
+ auto& headersArr = result.InsertValue("headers", NJson::JSON_ARRAY);
+ for (auto&& i : HeaderContext) {
+ headersArr.AppendValue(i.second.DebugJson());
+ }
+ }
+ return result;
+ }
+
+ virtual TConclusion<EExecutionResult> DoExecute(const TProcessorContext& context, const TExecutionNodeContext& /*nodeContext*/) const override {
+ auto source = context.GetDataSource().lock();
+ if (!source) {
+ return TConclusionStatus::Fail("source was destroyed before (original fetch start)");
+ }
+ auto conclusion = source->StartReserveMemory(context, DataAddresses, IndexContext, HeaderContext, Policy);
+ if (conclusion.IsFail()) {
+ return conclusion;
+ } else if (conclusion.GetResult()) {
+ return EExecutionResult::InBackground;
+ } else {
+ return EExecutionResult::Success;
+ }
+ }
+
+ virtual bool IsAggregation() const override {
+ return false;
+ }
+
+ virtual ui64 DoGetWeight() const override {
+ return 0;
+ }
+
+public:
+ TReserveMemoryProcessor(const TOriginalColumnDataProcessor& original, const std::shared_ptr<IMemoryCalculationPolicy>& policy)
+ : TBase({}, {}, EProcessorType::ReserveMemory)
+ , DataAddresses(original.GetDataAddresses())
+ , IndexContext(original.GetIndexContext())
+ , HeaderContext(original.GetHeaderContext())
+ , Policy(policy)
+ {
+ AFL_VERIFY(policy);
+ }
+};
+
+} // namespace NKikimr::NArrow::NSSA
diff --git a/ydb/core/formats/arrow/program/ya.make b/ydb/core/formats/arrow/program/ya.make
index 114660d3251..720b0f0b318 100644
--- a/ydb/core/formats/arrow/program/ya.make
+++ b/ydb/core/formats/arrow/program/ya.make
@@ -48,6 +48,7 @@ SRCS(
assign_internal.cpp
custom_registry.cpp
GLOBAL kernel_logic.cpp
+ reserve.cpp
)
GENERATE_ENUM_SERIALIZATION(abstract.h)
diff --git a/ydb/core/formats/arrow/ut/ut_program_step.cpp b/ydb/core/formats/arrow/ut/ut_program_step.cpp
index 7a6c52b5c66..8f3f403a2af 100644
--- a/ydb/core/formats/arrow/ut/ut_program_step.cpp
+++ b/ydb/core/formats/arrow/ut/ut_program_step.cpp
@@ -594,7 +594,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) {
builder.Add(std::make_shared<TProjectionProcessor>(TColumnChainInfo::BuildVector({ 1, 2 })));
auto chain = builder.Finish().DetachResult();
Cerr << chain->DebugDOT() << Endl;
- AFL_VERIFY(chain->DebugStats() == "[TOTAL:Const:2;Calculation:4;Projection:1;Filter:1;FetchOriginalData:2;AssembleOriginalData:3;CheckIndexData:1;StreamLogic:1;];SUB:[AssembleOriginalData:1;];")("debug", chain->DebugStats());
+ AFL_VERIFY(chain->DebugStats() == "[TOTAL:Const:2;Calculation:4;Projection:1;Filter:1;FetchOriginalData:2;AssembleOriginalData:3;CheckIndexData:1;StreamLogic:1;ReserveMemory:1;];SUB:[AssembleOriginalData:1;];")("debug", chain->DebugStats());
}
Y_UNIT_TEST(Projection) {
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 5e520d81b03..f2b2db17e69 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -559,6 +559,8 @@ void TKikimrRunner::Initialize(const TKikimrSettings& settings) {
SetupLogLevelFromTestParam(NKikimrServices::TX_COLUMNSHARD);
SetupLogLevelFromTestParam(NKikimrServices::TX_COLUMNSHARD_SCAN);
SetupLogLevelFromTestParam(NKikimrServices::LOCAL_PGWIRE);
+ SetupLogLevelFromTestParam(NKikimrServices::SSA_GRAPH_EXECUTION);
+
RunCall([this, domain = settings.DomainRoot]{
this->Client->InitRootScheme(domain);
@@ -1556,7 +1558,7 @@ NJson::TJsonValue SimplifyPlan(NJson::TJsonValue& opt, const TGetPlanParams& par
opName.find("Join") != TString::npos ||
opName.find("Union") != TString::npos ||
(opName.find("Filter") != TString::npos && params.IncludeFilters) ||
- (opName.find("HashShuffle") != TString::npos && params.IncludeShuffles)
+ (opName.find("HashShuffle") != TString::npos && params.IncludeShuffles)
) {
NJson::TJsonValue newChildren;
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h
index a2165252243..4a14d95727f 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h
@@ -14,13 +14,6 @@ enum class EMemType {
RawSequential
};
-enum class EStageFeaturesIndexes {
- Accessors = 0,
- Filter = 1,
- Fetching = 2,
- Merge = 3
-};
-
class TIndexesSet {
private:
YDB_READONLY_DEF(std::vector<ui32>, IndexIds);
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp
index 6f9219bc9f1..3559e599b05 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp
@@ -48,25 +48,28 @@ bool TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocated(std::shared_ptr
guard->Release();
return false;
}
- if (StageIndex == EStageFeaturesIndexes::Accessors) {
+ if (StageIndex == NArrow::NSSA::IMemoryCalculationPolicy::EStage::Accessors) {
data->MutableStageData().SetAccessorsGuard(std::move(guard));
} else {
data->RegisterAllocationGuard(std::move(guard));
}
- Step.Next();
+ if (NeedNextStep) {
+ Step.Next();
+ }
FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, data->AddEvent("fmalloc"));
auto task = std::make_shared<TStepAction>(data, std::move(Step), data->GetContext()->GetCommonContext()->GetScanActorId(), false);
NConveyor::TScanServiceOperator::SendTaskToExecute(task, data->GetContext()->GetCommonContext()->GetConveyorProcessId());
return true;
}
-TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation(
- const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step, const EStageFeaturesIndexes stageIndex)
+TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem,
+ const TFetchingScriptCursor& step, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stageIndex, const bool needNextStep)
: TBase(mem)
, Source(source)
, Step(step)
, TasksGuard(source->GetContext()->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard())
- , StageIndex(stageIndex) {
+ , StageIndex(stageIndex)
+ , NeedNextStep(needNextStep) {
}
void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(const TString& errorMessage) {
@@ -75,6 +78,8 @@ void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(cons
FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, sourcePtr->AddEvent("fail_malloc"));
sourcePtr->GetContext()->GetCommonContext()->AbortWithError(
"cannot allocate memory for step " + Step.GetName() + ": '" + errorMessage + "'");
+ } else {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "allocation_impossible")("error", errorMessage);
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h
index 0123678d75c..3b8abd40ea4 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h
@@ -21,25 +21,10 @@ private:
};
std::vector<TColumnsPack> Packs;
THashMap<ui32, THashSet<EMemType>> Control;
- const EStageFeaturesIndexes StageIndex;
+ const NArrow::NSSA::IMemoryCalculationPolicy::EStage StageIndex;
const std::optional<ui64> PredefinedSize;
protected:
- class TFetchingStepAllocation: public NGroupedMemoryManager::IAllocation {
- private:
- using TBase = NGroupedMemoryManager::IAllocation;
- std::weak_ptr<IDataSource> Source;
- TFetchingScriptCursor Step;
- NColumnShard::TCounterGuard TasksGuard;
- const EStageFeaturesIndexes StageIndex;
- virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
- const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;
- virtual void DoOnAllocationImpossible(const TString& errorMessage) override;
-
- public:
- TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step,
- const EStageFeaturesIndexes stageIndex);
- };
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
virtual TString DoDebugString() const override {
@@ -53,6 +38,22 @@ protected:
}
public:
+ class TFetchingStepAllocation: public NGroupedMemoryManager::IAllocation {
+ private:
+ using TBase = NGroupedMemoryManager::IAllocation;
+ std::weak_ptr<IDataSource> Source;
+ TFetchingScriptCursor Step;
+ NColumnShard::TCounterGuard TasksGuard;
+ const NArrow::NSSA::IMemoryCalculationPolicy::EStage StageIndex;
+ const bool NeedNextStep;
+ virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
+ const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;
+ virtual void DoOnAllocationImpossible(const TString& errorMessage) override;
+
+ public:
+ TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step,
+ const NArrow::NSSA::IMemoryCalculationPolicy::EStage stageIndex, const bool needNextStep = true);
+ };
void AddAllocation(const TColumnsSetIds& ids, const EMemType memType) {
if (!ids.GetColumnsCount()) {
return;
@@ -62,17 +63,17 @@ public:
}
Packs.emplace_back(ids, memType);
}
- EStageFeaturesIndexes GetStage() const {
+ NArrow::NSSA::IMemoryCalculationPolicy::EStage GetStage() const {
return StageIndex;
}
- TAllocateMemoryStep(const TColumnsSetIds& columns, const EMemType memType, const EStageFeaturesIndexes stageIndex)
+ TAllocateMemoryStep(const TColumnsSetIds& columns, const EMemType memType, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stageIndex)
: TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex))
, StageIndex(stageIndex) {
AddAllocation(columns, memType);
}
- TAllocateMemoryStep(const ui64 memSize, const EStageFeaturesIndexes stageIndex)
+ TAllocateMemoryStep(const ui64 memSize, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stageIndex)
: TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex))
, StageIndex(stageIndex)
, PredefinedSize(memSize) {
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp
index 4319b0f944e..b9a686530b9 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp
@@ -27,7 +27,7 @@ TConclusionStatus TStepAction::DoExecuteImpl() {
return TConclusionStatus::Success();
}
auto executeResult = Cursor.Execute(Source);
- if (!executeResult) {
+ if (executeResult.IsFail()) {
return executeResult;
}
if (*executeResult) {
@@ -114,7 +114,8 @@ TString TFetchingScript::ProfileDebugString() const {
return sb;
}
-void TFetchingScriptBuilder::AddAllocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType) {
+void TFetchingScriptBuilder::AddAllocation(
+ const std::set<ui32>& entityIds, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stage, const EMemType mType) {
if (Steps.size() == 0) {
AddStep(std::make_shared<TAllocateMemoryStep>(entityIds, mType, stage));
} else {
@@ -157,7 +158,7 @@ TFetchingScriptBuilder::TFetchingScriptBuilder(const TSpecialReadContext& contex
: TFetchingScriptBuilder(context.GetReadMetadata()->GetResultSchema(), context.GetMergeColumns()) {
}
-void TFetchingScriptBuilder::AddFetchingStep(const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) {
+void TFetchingScriptBuilder::AddFetchingStep(const TColumnsSetIds& columns, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stage) {
auto actualColumns = columns - AddedFetchingColumns;
AddedFetchingColumns += columns;
if (actualColumns.IsEmpty()) {
@@ -175,7 +176,7 @@ void TFetchingScriptBuilder::AddFetchingStep(const TColumnsSetIds& columns, cons
}
void TFetchingScriptBuilder::AddAssembleStep(
- const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential) {
+ const TColumnsSetIds& columns, const TString& purposeId, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stage, const bool sequential) {
auto actualColumns = columns - AddedAssembleColumns;
AddedAssembleColumns += columns;
if (actualColumns.IsEmpty()) {
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h
index ddf067c4132..e8ca12f666d 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h
@@ -253,7 +253,7 @@ private:
}
private:
- void AddAllocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType);
+ void AddAllocation(const std::set<ui32>& entityIds, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stage, const EMemType mType);
template <class T, typename... Args>
std::shared_ptr<T> InsertStep(const ui32 index, Args... args) {
@@ -275,8 +275,9 @@ public:
Steps.emplace_back(step);
}
- void AddFetchingStep(const TColumnsSetIds& columns, const EStageFeaturesIndexes stage);
- void AddAssembleStep(const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential);
+ void AddFetchingStep(const TColumnsSetIds& columns, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stage);
+ void AddAssembleStep(const TColumnsSetIds& columns, const TString& purposeId, const NArrow::NSSA::IMemoryCalculationPolicy::EStage stage,
+ const bool sequential);
static TFetchingScriptBuilder MakeForTests(ISnapshotSchema::TPtr schema, std::shared_ptr<TColumnsSetIds> guaranteeNotOptional = nullptr) {
return TFetchingScriptBuilder(schema, guaranteeNotOptional ? guaranteeNotOptional : std::make_shared<TColumnsSetIds>());
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp
index ae2deb2810e..2636db28693 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp
@@ -30,7 +30,7 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(c
if (!AskAccumulatorsScript) {
NCommon::TFetchingScriptBuilder acc(*this);
if (ui64 size = source->PredictAccessorsMemory()) {
- acc.AddStep(std::make_shared<NCommon::TAllocateMemoryStep>(size, EStageFeaturesIndexes::Accessors));
+ acc.AddStep(std::make_shared<NCommon::TAllocateMemoryStep>(size, NArrow::NSSA::IMemoryCalculationPolicy::EStage::Accessors));
}
acc.AddStep(std::make_shared<TPortionAccessorFetchingStep>());
acc.AddStep(std::make_shared<TDetectInMem>(*GetFFColumns()));
@@ -90,25 +90,25 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
bool hasFilterSharding = false;
if (needFilterSharding && !GetShardingColumns()->IsEmpty()) {
hasFilterSharding = true;
- acc.AddFetchingStep(*GetShardingColumns(), EStageFeaturesIndexes::Filter);
+ acc.AddFetchingStep(*GetShardingColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
if (!exclusiveSource) {
- acc.AddFetchingStep(*GetPKColumns(), EStageFeaturesIndexes::Filter);
- acc.AddFetchingStep(*GetSpecColumns(), EStageFeaturesIndexes::Filter);
+ acc.AddFetchingStep(*GetPKColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
+ acc.AddFetchingStep(*GetSpecColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
}
- acc.AddAssembleStep(acc.GetAddedFetchingColumns(), "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false);
+ acc.AddAssembleStep(acc.GetAddedFetchingColumns(), "SPEC_SHARDING", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false);
acc.AddStep(std::make_shared<TShardingFilter>());
}
if (!GetEFColumns()->GetColumnsCount() && !partialUsageByPredicate) {
acc.SetBranchName("simple");
- acc.AddFetchingStep(*GetFFColumns(), EStageFeaturesIndexes::Fetching);
+ acc.AddFetchingStep(*GetFFColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching);
if (needFilterDeletion) {
- acc.AddFetchingStep(*GetDeletionColumns(), EStageFeaturesIndexes::Fetching);
+ acc.AddFetchingStep(*GetDeletionColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching);
}
if (needSnapshots) {
- acc.AddFetchingStep(*GetSpecColumns(), EStageFeaturesIndexes::Fetching);
+ acc.AddFetchingStep(*GetSpecColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching);
}
if (!exclusiveSource) {
- acc.AddFetchingStep(*GetMergeColumns(), EStageFeaturesIndexes::Fetching);
+ acc.AddFetchingStep(*GetMergeColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching);
} else {
if (acc.GetAddedFetchingColumns().GetColumnsCount() == 1 && GetSpecColumns()->Contains(acc.GetAddedFetchingColumns()) && !hasFilterSharding) {
return nullptr;
@@ -116,76 +116,77 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
}
if (acc.GetAddedFetchingColumns().GetColumnsCount() || hasFilterSharding || needFilterDeletion) {
if (needSnapshots) {
- acc.AddAssembleStep(*GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Fetching, false);
+ acc.AddAssembleStep(*GetSpecColumns(), "SPEC", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching, false);
}
if (!exclusiveSource) {
- acc.AddAssembleStep(*GetMergeColumns(), "LAST_PK", EStageFeaturesIndexes::Fetching, false);
+ acc.AddAssembleStep(*GetMergeColumns(), "LAST_PK", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching, false);
}
if (needSnapshots) {
acc.AddStep(std::make_shared<TSnapshotFilter>());
}
if (needFilterDeletion) {
- acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Fetching, false);
+ acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching, false);
acc.AddStep(std::make_shared<TDeletionFilter>());
}
- acc.AddAssembleStep(acc.GetAddedFetchingColumns().GetColumnIds(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource);
+ acc.AddAssembleStep(acc.GetAddedFetchingColumns().GetColumnIds(), "LAST", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching,
+ !exclusiveSource);
} else {
return nullptr;
}
} else if (exclusiveSource) {
acc.SetBranchName("exclusive");
- acc.AddFetchingStep(*GetEFColumns(), EStageFeaturesIndexes::Filter);
+ acc.AddFetchingStep(*GetEFColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
if (needFilterDeletion) {
- acc.AddFetchingStep(*GetDeletionColumns(), EStageFeaturesIndexes::Filter);
+ acc.AddFetchingStep(*GetDeletionColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
}
if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) {
- acc.AddFetchingStep(*GetSpecColumns(), EStageFeaturesIndexes::Filter);
+ acc.AddFetchingStep(*GetSpecColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
}
if (partialUsageByPredicate) {
- acc.AddFetchingStep(*GetPredicateColumns(), EStageFeaturesIndexes::Filter);
+ acc.AddFetchingStep(*GetPredicateColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
}
AFL_VERIFY(acc.GetAddedFetchingColumns().GetColumnsCount());
if (needFilterDeletion) {
- acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false);
+ acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false);
acc.AddStep(std::make_shared<TDeletionFilter>());
}
if (partialUsageByPredicate) {
- acc.AddAssembleStep(*GetPredicateColumns(), "PREDICATE", EStageFeaturesIndexes::Filter, false);
+ acc.AddAssembleStep(*GetPredicateColumns(), "PREDICATE", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false);
acc.AddStep(std::make_shared<TPredicateFilter>());
}
if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) {
- acc.AddAssembleStep(*GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false);
+ acc.AddAssembleStep(*GetSpecColumns(), "SPEC", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false);
acc.AddStep(std::make_shared<TSnapshotFilter>());
} else if (GetProgramInputColumns()->Cross(*GetSpecColumns())) {
acc.AddStep(std::make_shared<TBuildFakeSpec>());
}
- acc.AddFetchingStep(*GetFFColumns(), EStageFeaturesIndexes::Fetching);
- acc.AddAssembleStep(*GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource);
+ acc.AddFetchingStep(*GetFFColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching);
+ acc.AddAssembleStep(*GetFFColumns(), "LAST", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching, !exclusiveSource);
} else {
acc.SetBranchName("merge");
- acc.AddFetchingStep(*GetMergeColumns(), EStageFeaturesIndexes::Filter);
- acc.AddFetchingStep(*GetEFColumns(), EStageFeaturesIndexes::Filter);
+ acc.AddFetchingStep(*GetMergeColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
+ acc.AddFetchingStep(*GetEFColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
if (needFilterDeletion) {
- acc.AddFetchingStep(*GetDeletionColumns(), EStageFeaturesIndexes::Filter);
+ acc.AddFetchingStep(*GetDeletionColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
}
AFL_VERIFY(acc.GetAddedFetchingColumns().GetColumnsCount());
- acc.AddAssembleStep(*GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false);
- acc.AddAssembleStep(*GetPKColumns(), "PK", EStageFeaturesIndexes::Filter, false);
+ acc.AddAssembleStep(*GetSpecColumns(), "SPEC", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false);
+ acc.AddAssembleStep(*GetPKColumns(), "PK", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false);
if (needSnapshots) {
acc.AddStep(std::make_shared<TSnapshotFilter>());
}
if (needFilterDeletion) {
- acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false);
+ acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false);
acc.AddStep(std::make_shared<TDeletionFilter>());
}
if (partialUsageByPredicate) {
acc.AddStep(std::make_shared<TPredicateFilter>());
}
- acc.AddFetchingStep(*GetFFColumns(), EStageFeaturesIndexes::Fetching);
- acc.AddAssembleStep(*GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource);
+ acc.AddFetchingStep(*GetFFColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching);
+ acc.AddAssembleStep(*GetFFColumns(), "LAST", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching, !exclusiveSource);
}
acc.AddStep(std::make_shared<NCommon::TBuildStageResultStep>());
return std::move(acc).Build();
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h
index 4d35be2eca8..9500e54a3cf 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h
@@ -13,7 +13,6 @@ namespace NKikimr::NOlap::NReader::NPlain {
class IDataSource;
using TColumnsSet = NCommon::TColumnsSet;
-using EStageFeaturesIndexes = NCommon::EStageFeaturesIndexes;
using TColumnsSetIds = NCommon::TColumnsSetIds;
using EMemType = NCommon::EMemType;
using TFetchingScript = NCommon::TFetchingScript;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h
index 1e3fa3a98b5..b2fcbe88b7e 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h
@@ -14,7 +14,6 @@ namespace NKikimr::NOlap::NReader::NPlain {
using TColumnsSet = NCommon::TColumnsSet;
using TIndexesSet = NCommon::TIndexesSet;
-using EStageFeaturesIndexes = NCommon::EStageFeaturesIndexes;
using TColumnsSetIds = NCommon::TColumnsSetIds;
using EMemType = NCommon::EMemType;
using TFetchingScriptCursor = NCommon::TFetchingScriptCursor;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp
index 7f44376f3ad..a431b81077f 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp
@@ -20,7 +20,7 @@ void TFetchingInterval::ConstructResult() {
auto task = std::make_shared<TStartMergeTask>(MergingContext, Context, std::move(Sources));
task->SetPriority(NConveyor::ITask::EPriority::High);
NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(Context->GetProcessMemoryControlId(),
- Context->GetCommonContext()->GetScanId(), GetIntervalId(), { task }, (ui32)EStageFeaturesIndexes::Merge);
+ Context->GetCommonContext()->GetScanId(), GetIntervalId(), { task }, (ui32)NArrow::NSSA::IMemoryCalculationPolicy::EStage::Merge);
}
}
@@ -83,7 +83,7 @@ void TFetchingInterval::OnPartSendingComplete() {
auto task = std::make_shared<TContinueMergeTask>(MergingContext, Context, std::move(Merger));
task->SetPriority(NConveyor::ITask::EPriority::High);
NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(Context->GetProcessMemoryControlId(),
- Context->GetCommonContext()->GetScanId(), GetIntervalId(), { task }, (ui32)EStageFeaturesIndexes::Merge);
+ Context->GetCommonContext()->GetScanId(), GetIntervalId(), { task }, (ui32)NArrow::NSSA::IMemoryCalculationPolicy::EStage::Merge);
}
} // namespace NKikimr::NOlap::NReader::NPlain
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp
index b00d2e8a380..59f3601bf55 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp
@@ -14,8 +14,8 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(c
if (!dontNeedColumns && !source->HasStageData()) {
if (!AskAccumulatorsScript) {
NCommon::TFetchingScriptBuilder acc(*this);
- acc.AddStep(std::make_shared<NCommon::TAllocateMemoryStep>
- (source->PredictAccessorsSize(GetFFColumns()->GetColumnIds()), EStageFeaturesIndexes::Accessors));
+ acc.AddStep(std::make_shared<NCommon::TAllocateMemoryStep>(
+ source->PredictAccessorsSize(GetFFColumns()->GetColumnIds()), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Accessors));
acc.AddStep(std::make_shared<TPortionAccessorFetchingStep>());
acc.AddStep(std::make_shared<TDetectInMem>(*GetFFColumns()));
AskAccumulatorsScript = std::move(acc).Build();
@@ -64,31 +64,31 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
NCommon::TFetchingScriptBuilder acc(*this);
if (needFilterSharding && !GetShardingColumns()->IsEmpty()) {
const TColumnsSetIds columnsFetch = *GetShardingColumns();
- acc.AddFetchingStep(columnsFetch, EStageFeaturesIndexes::Filter);
- acc.AddAssembleStep(columnsFetch, "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false);
+ acc.AddFetchingStep(columnsFetch, NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
+ acc.AddAssembleStep(columnsFetch, "SPEC_SHARDING", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false);
acc.AddStep(std::make_shared<TShardingFilter>());
}
{
acc.SetBranchName("exclusive");
if (needFilterDeletion) {
- acc.AddFetchingStep(*GetDeletionColumns(), EStageFeaturesIndexes::Filter);
+ acc.AddFetchingStep(*GetDeletionColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
}
if (partialUsageByPredicate) {
- acc.AddFetchingStep(*GetPredicateColumns(), EStageFeaturesIndexes::Filter);
+ acc.AddFetchingStep(*GetPredicateColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
}
if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) {
- acc.AddFetchingStep(*GetSpecColumns(), EStageFeaturesIndexes::Filter);
+ acc.AddFetchingStep(*GetSpecColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
}
if (needFilterDeletion) {
- acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false);
+ acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false);
acc.AddStep(std::make_shared<TDeletionFilter>());
}
if (partialUsageByPredicate) {
- acc.AddAssembleStep(*GetPredicateColumns(), "PREDICATE", EStageFeaturesIndexes::Filter, false);
+ acc.AddAssembleStep(*GetPredicateColumns(), "PREDICATE", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false);
acc.AddStep(std::make_shared<TPredicateFilter>());
}
if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) {
- acc.AddAssembleStep(*GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false);
+ acc.AddAssembleStep(*GetSpecColumns(), "SPEC", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false);
acc.AddStep(std::make_shared<TSnapshotFilter>());
}
const auto& chainProgram = GetReadMetadata()->GetProgram().GetChainVerified();
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h
index f4e3d0ab36c..c1859a3062d 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h
@@ -12,7 +12,6 @@ namespace NKikimr::NOlap::NReader::NSimple {
class IDataSource;
using TColumnsSet = NCommon::TColumnsSet;
-using EStageFeaturesIndexes = NCommon::EStageFeaturesIndexes;
using TColumnsSetIds = NCommon::TColumnsSetIds;
using EMemType = NCommon::EMemType;
using TFetchingScript = NCommon::TFetchingScript;
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h
index 0a62e6c0a59..d1dcc822b03 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h
@@ -15,7 +15,6 @@ namespace NKikimr::NOlap::NReader::NSimple {
class IDataSource;
using TColumnsSet = NCommon::TColumnsSet;
using TIndexesSet = NCommon::TIndexesSet;
-using EStageFeaturesIndexes = NCommon::EStageFeaturesIndexes;
using TColumnsSetIds = NCommon::TColumnsSetIds;
using EMemType = NCommon::EMemType;
using TFetchingScriptCursor = NCommon::TFetchingScriptCursor;
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp
index d7505745e45..e075573a6ea 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp
@@ -8,6 +8,7 @@
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
#include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.h>
#include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/default_fetching.h>
+#include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h>
#include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h>
#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h>
#include <ydb/core/tx/columnshard/engines/storage/indexes/skip_index/meta.h>
@@ -428,4 +429,49 @@ TPortionDataSource::TPortionDataSource(
, Schema(GetContext()->GetReadMetadata()->GetLoadSchemaVerified(*portion)) {
}
+TConclusion<bool> TPortionDataSource::DoStartReserveMemory(const NArrow::NSSA::TProcessorContext& context,
+ const THashMap<ui32, IDataSource::TDataAddress>& columns, const THashMap<ui32, IDataSource::TFetchIndexContext>& /*indexes*/,
+ const THashMap<ui32, IDataSource::TFetchHeaderContext>& /*headers*/, const std::shared_ptr<NArrow::NSSA::IMemoryCalculationPolicy>& policy) {
+ class TEntitySize {
+ private:
+ YDB_READONLY(ui64, BlobsSize, 0);
+ YDB_READONLY(ui64, RawSize, 0);
+
+ public:
+ void Add(const TEntitySize& item) {
+ Add(item.BlobsSize, item.RawSize);
+ }
+
+ void Add(const ui64 blob, const ui64 raw) {
+ BlobsSize += blob;
+ RawSize += raw;
+ }
+ };
+
+ THashMap<ui32, TEntitySize> sizeByColumn;
+ for (auto&& [_, info] : columns) {
+ auto chunks = GetStageData().GetPortionAccessor().GetColumnChunksPointers(info.GetColumnId());
+ auto& sizes = sizeByColumn[info.GetColumnId()];
+ for (auto&& i : chunks) {
+ sizes.Add(i->GetBlobRange().GetSize(), i->GetMeta().GetRawBytes());
+ }
+ }
+ TEntitySize result;
+ for (auto&& i : sizeByColumn) {
+ result.Add(i.second);
+ }
+
+ auto source = context.GetDataSourceVerifiedAs<NCommon::IDataSource>();
+
+ const ui64 sizeToReserve = policy->GetReserveMemorySize(
+ result.GetBlobsSize(), result.GetRawSize(), GetContext()->GetReadMetadata()->GetLimitRobustOptional(), GetRecordsCount());
+
+ auto allocation = std::make_shared<NCommon::TAllocateMemoryStep::TFetchingStepAllocation>(
+ source, sizeToReserve, GetExecutionContext().GetCursorStep(), policy->GetStage(), false);
+ FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, AddEvent("mr"));
+ NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(GetContext()->GetProcessMemoryControlId(),
+ GetContext()->GetCommonContext()->GetScanId(), GetMemoryGroupId(), { allocation }, (ui32)policy->GetStage());
+ return true;
+}
+
} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h
index 22a888aff5f..de0d8faa3b5 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h
@@ -328,6 +328,9 @@ private:
virtual TConclusion<bool> DoStartFetchImpl(
const NArrow::NSSA::TProcessorContext& context, const std::vector<std::shared_ptr<NCommon::IKernelFetchLogic>>& fetchersExt) override;
+ virtual TConclusion<bool> DoStartReserveMemory(const NArrow::NSSA::TProcessorContext& context,
+ const THashMap<ui32, IDataSource::TDataAddress>& columns, const THashMap<ui32, IDataSource::TFetchIndexContext>& indexes,
+ const THashMap<ui32, IDataSource::TFetchHeaderContext>& headers, const std::shared_ptr<NArrow::NSSA::IMemoryCalculationPolicy>& policy) override;
virtual TConclusion<std::vector<std::shared_ptr<NArrow::NSSA::IFetchLogic>>> DoStartFetchIndex(
const NArrow::NSSA::TProcessorContext& context, const TFetchIndexContext& fetchContext) override;
virtual TConclusion<NArrow::TColumnFilter> DoCheckIndex(const NArrow::NSSA::TProcessorContext& context,
diff --git a/ydb/core/tx/columnshard/engines/ut/ut_script.cpp b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp
index a50388e8bfb..1ee5dd5091f 100644
--- a/ydb/core/tx/columnshard/engines/ut/ut_script.cpp
+++ b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp
@@ -26,26 +26,26 @@ Y_UNIT_TEST_SUITE(TestScript) {
{ 1, NTable::TColumn("c1", 0, NScheme::TTypeInfo(NScheme::NTypeIds::Int32), "") },
{ 2, NTable::TColumn("c2", 0, NScheme::TTypeInfo(NScheme::NTypeIds::Int32), "") } }));
- acc.AddFetchingStep(std::vector<ui32>({ 0 }), NCommon::EStageFeaturesIndexes::Filter);
- acc.AddFetchingStep(std::vector<ui32>({ 0 }), NCommon::EStageFeaturesIndexes::Filter);
- acc.AddAssembleStep(std::vector<ui32>({ 0 }), "", NCommon::EStageFeaturesIndexes::Filter, false);
+ acc.AddFetchingStep(std::vector<ui32>({ 0 }), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
+ acc.AddFetchingStep(std::vector<ui32>({ 0 }), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
+ acc.AddAssembleStep(std::vector<ui32>({ 0 }), "", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false);
acc.AddStep(std::make_shared<NSimple::TDeletionFilter>());
- acc.AddFetchingStep(std::vector<ui32>({ 0, 1 }), NCommon::EStageFeaturesIndexes::Filter);
- acc.AddFetchingStep(std::vector<ui32>({ 1, 2 }), NCommon::EStageFeaturesIndexes::Fetching);
- acc.AddFetchingStep(std::vector<ui32>({ 0 }), NCommon::EStageFeaturesIndexes::Fetching);
- acc.AddAssembleStep(std::vector<ui32>({ 0, 1, 2 }), "", NCommon::EStageFeaturesIndexes::Fetching, false);
+ acc.AddFetchingStep(std::vector<ui32>({ 0, 1 }), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
+ acc.AddFetchingStep(std::vector<ui32>({ 1, 2 }), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching);
+ acc.AddFetchingStep(std::vector<ui32>({ 0 }), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching);
+ acc.AddAssembleStep(std::vector<ui32>({ 0, 1, 2 }), "", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Fetching, false);
acc.AddStep(std::make_shared<NSimple::TDeletionFilter>());
- acc.AddFetchingStep(std::vector<ui32>({ 0 }), NCommon::EStageFeaturesIndexes::Merge);
+ acc.AddFetchingStep(std::vector<ui32>({ 0 }), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Merge);
auto script = std::move(acc).Build();
UNIT_ASSERT_STRINGS_EQUAL(script->DebugString(),
"{branch:UNDEFINED;steps:["
- "{name=ALLOCATE_MEMORY::Filter;details={stage=Filter;column_ids=[Blob:0,Raw:0];};};"
+ "{name=ALLOCATE_MEMORY::FILTER;details={stage=FILTER;column_ids=[Blob:0,Raw:0];};};"
"{name=FETCHING_COLUMNS;details={columns=0;};};"
"{name=ASSEMBLER;details={columns=(column_ids=0;column_names=c0;);;};};"
"{name=DELETION;details={};};"
- "{name=ALLOCATE_MEMORY::Filter;details={stage=Filter;column_ids=[Blob:1];};};"
- "{name=ALLOCATE_MEMORY::Fetching;details={stage=Fetching;column_ids=[Blob:2,Raw:1,Raw:2];};};"
+ "{name=ALLOCATE_MEMORY::FILTER;details={stage=FILTER;column_ids=[Blob:1];};};"
+ "{name=ALLOCATE_MEMORY::FETCHING;details={stage=FETCHING;column_ids=[Blob:2,Raw:1,Raw:2];};};"
"{name=FETCHING_COLUMNS;details={columns=1,2;};};"
"{name=ASSEMBLER;details={columns=(column_ids=1,2;column_names=c1,c2;);;};};"
"{name=DELETION;details={};};]}");