aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2022-10-25 16:03:46 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2022-10-25 16:03:46 +0300
commitd18928ecee746c0e009d8504d94c374aafe681af (patch)
tree2ba5846c8205bb38f1ad5d9733f9f248300069cb
parent4f0ceb8491b8041b1e4d74c824886c3b620e4dc2 (diff)
downloadydb-d18928ecee746c0e009d8504d94c374aafe681af.tar.gz
Refactor usage of TScopedAlloc
-rw-r--r--ydb/core/kqp/executer/kqp_data_executer.cpp4
-rw-r--r--ydb/core/kqp/executer/kqp_executer_impl.cpp1
-rw-r--r--ydb/core/kqp/executer/kqp_literal_executer.cpp40
-rw-r--r--ydb/core/kqp/executer/kqp_partition_helper.cpp8
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp7
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp18
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp3
7 files changed, 54 insertions, 27 deletions
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp
index 2166e767e1a..e754ebdc555 100644
--- a/ydb/core/kqp/executer/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_data_executer.cpp
@@ -1365,9 +1365,9 @@ private:
auto& funcRegistry = *AppData()->FunctionRegistry;
NMiniKQL::TScopedAlloc alloc(__LOCATION__, TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators());
NMiniKQL::TTypeEnvironment typeEnv(alloc);
-
- NMiniKQL::TMemoryUsageInfo memInfo("PrepareTasks");
+ NMiniKQL::TMemoryUsageInfo memInfo("KqpDataExecuter");
NMiniKQL::THolderFactory holderFactory(alloc.Ref(), memInfo, &funcRegistry);
+ auto unguard = Unguard(alloc);
for (ui32 txIdx = 0; txIdx < Request.Transactions.size(); ++txIdx) {
auto& tx = Request.Transactions[txIdx];
diff --git a/ydb/core/kqp/executer/kqp_executer_impl.cpp b/ydb/core/kqp/executer/kqp_executer_impl.cpp
index b3c233b29a2..4ba5509cdcb 100644
--- a/ydb/core/kqp/executer/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer/kqp_executer_impl.cpp
@@ -40,6 +40,7 @@ void BuildKqpExecuterResults(const NKqpProto::TKqpPhyTx& tx, TVector<TKqpExecute
void PrepareKqpTaskParameters(const NKqpProto::TKqpPhyStage& stage, const TStageInfo& stageInfo, const TTask& task,
NDqProto::TDqTask& dqTask, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory)
{
+ auto g = typeEnv.BindAllocator();
for (auto& paramName : stage.GetProgramParameters()) {
auto& dqParams = *dqTask.MutableParameters();
if (auto* taskParam = task.Meta.Params.FindPtr(paramName)) {
diff --git a/ydb/core/kqp/executer/kqp_literal_executer.cpp b/ydb/core/kqp/executer/kqp_literal_executer.cpp
index 42dfb59e499..5dc42cc766b 100644
--- a/ydb/core/kqp/executer/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_literal_executer.cpp
@@ -158,6 +158,7 @@ private:
NMiniKQL::TTypeEnvironment typeEnv(alloc);
NMiniKQL::TMemoryUsageInfo memInfo("KqpLocalExecuter");
NMiniKQL::THolderFactory holderFactory(alloc.Ref(), memInfo, funcRegistry);
+ auto unguard = Unguard(alloc);
ui64 mkqlMemoryLimit = Request.MkqlMemoryLimit > 0
? Request.MkqlMemoryLimit
@@ -189,6 +190,7 @@ private:
Y_DEFER {
// clear allocator state
+ TGuard<NMiniKQL::TScopedAlloc> guard(alloc);
Results.crop(0);
TaskRunners.crop(0);
};
@@ -250,7 +252,9 @@ private:
if (auto* param = stageInfo.Meta.Tx.Params.Values.FindPtr(name)) {
NMiniKQL::TType* typeFromProto;
- std::tie(typeFromProto, value) = ImportValueFromProto(param->GetType(), param->GetValue(), typeEnv, holderFactory);
+ with_lock (typeEnv.GetAllocator()) {
+ std::tie(typeFromProto, value) = ImportValueFromProto(param->GetType(), param->GetValue(), typeEnv, holderFactory);
+ }
#ifndef NDEBUG
YQL_ENSURE(ToString(*type) == ToString(*typeFromProto), "" << *type << " != " << *typeFromProto);
#else
@@ -276,13 +280,15 @@ private:
auto status = taskRunner->Run();
YQL_ENSURE(status == ERunStatus::Finished);
- for (auto& taskOutput : task.Outputs) {
- for (ui64 outputChannelId : taskOutput.Channels) {
- auto outputChannel = taskRunner->GetOutputChannel(outputChannelId);
- auto& channelDesc = TasksGraph.GetChannel(outputChannelId);
+ with_lock (*context.Alloc) { // allocator is used only by outputChannel->PopAll()
+ for (auto& taskOutput : task.Outputs) {
+ for (ui64 outputChannelId : taskOutput.Channels) {
+ auto outputChannel = taskRunner->GetOutputChannel(outputChannelId);
+ auto& channelDesc = TasksGraph.GetChannel(outputChannelId);
- outputChannel->PopAll(Results[channelDesc.DstInputIndex].Rows);
- YQL_ENSURE(outputChannel->IsFinished());
+ outputChannel->PopAll(Results[channelDesc.DstInputIndex].Rows);
+ YQL_ENSURE(outputChannel->IsFinished());
+ }
}
}
}
@@ -296,16 +302,18 @@ private:
ui64 rows = 0;
ui64 bytes = 0;
- TKqpProtoBuilder protoBuilder(context.Alloc, context.TypeEnv, &holderFactory);
- for (auto& result : Results) {
- rows += result.Rows.size();
- auto* protoResult = response.MutableResult()->AddResults();
- if (result.IsStream) {
- protoBuilder.BuildStream(result.Rows, result.ItemType, result.ResultItemType.Get(), protoResult);
- } else {
- protoBuilder.BuildValue(result.Rows, result.ItemType, protoResult);
+ with_lock (*context.Alloc) {
+ TKqpProtoBuilder protoBuilder(context.Alloc, context.TypeEnv, &holderFactory);
+ for (auto& result : Results) {
+ rows += result.Rows.size();
+ auto* protoResult = response.MutableResult()->AddResults();
+ if (result.IsStream) {
+ protoBuilder.BuildStream(result.Rows, result.ItemType, result.ResultItemType.Get(), protoResult);
+ } else {
+ protoBuilder.BuildValue(result.Rows, result.ItemType, protoResult);
+ }
+ bytes += protoResult->ByteSizeLong();
}
- bytes += protoResult->ByteSizeLong();
}
if (Stats) {
diff --git a/ydb/core/kqp/executer/kqp_partition_helper.cpp b/ydb/core/kqp/executer/kqp_partition_helper.cpp
index 1583ff2247a..134d82d15b5 100644
--- a/ydb/core/kqp/executer/kqp_partition_helper.cpp
+++ b/ydb/core/kqp/executer/kqp_partition_helper.cpp
@@ -108,6 +108,7 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKeyPrefix(const NDq::
const TTableId& tableId, const TKqpTableKeys& tableKeys, const TKeyDesc& key,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
+ auto guard = typeEnv.BindAllocator();
YQL_ENSURE(tableId.HasSamePath(key.TableId));
auto& table = tableKeys.GetTable(tableId);
@@ -363,6 +364,7 @@ TVector<TSerializedPointOrRange> FillRangesFromParameter(const TVector<NScheme::
const NKqpProto::TKqpPhyParamValue& rangesParam, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
+ auto guard = typeEnv.BindAllocator();
TString paramName = rangesParam.GetParamName();
auto param = stageInfo.Meta.Tx.Params.Values.FindPtr(paramName);
@@ -403,6 +405,7 @@ TVector<TSerializedPointOrRange> FillReadRangesInternal(const TVector<NScheme::T
const PhyOpReadRanges& readRanges, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
+ auto guard = typeEnv.BindAllocator();
if (readRanges.HasKeyRanges()) {
return FillRangesFromParameter(
keyColumnTypes, readRanges.GetKeyRanges(), stageInfo, holderFactory, typeEnv
@@ -514,6 +517,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpPhyOpReadRange& readRange, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
+ auto guard = typeEnv.BindAllocator();
const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId);
YQL_ENSURE(table);
@@ -545,6 +549,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpPhyOpReadRanges& readRanges, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
+ auto guard = typeEnv.BindAllocator();
const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId);
YQL_ENSURE(table);
@@ -586,6 +591,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
+ auto guard = typeEnv.BindAllocator();
const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId);
YQL_ENSURE(table);
YQL_ENSURE(table->TableKind == ETableKind::Olap);
@@ -799,6 +805,7 @@ THashMap<ui64, TShardInfo> PartitionLookupByRowsList(const NKqpProto::TKqpPhyRow
THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyOpLookup& lookup,
const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
+ auto guard = typeEnv.BindAllocator();
YQL_CLOG(TRACE, ProviderKqp) << "PrunePartitions: " << lookup.DebugString();
if (!lookup.HasKeysValue()) {
@@ -830,6 +837,7 @@ template <typename TEffect>
THashMap<ui64, TShardInfo> PruneEffectPartitionsImpl(const TKqpTableKeys& tableKeys, const TEffect& effect,
const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
+ auto guard = typeEnv.BindAllocator();
THashMap<ui64, TShardInfo> shardInfoMap;
if (effect.HasRowsValue() &&
effect.GetRowsValue().GetKindCase() == NKqpProto::TKqpPhyValue::kParamValue)
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index 1cec4fb368c..f85b2f3d0e6 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -662,12 +662,12 @@ private:
auto& funcRegistry = *AppData()->FunctionRegistry;
NMiniKQL::TScopedAlloc alloc(__LOCATION__, TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators());
NMiniKQL::TTypeEnvironment typeEnv(alloc);
+ NMiniKQL::TMemoryUsageInfo memInfo("KqpScanExecuter");
+ NMiniKQL::THolderFactory holderFactory(alloc.Ref(), memInfo, &funcRegistry);
+ auto unguard = Unguard(alloc);
NWilson::TSpan prepareTasksSpan(TWilsonKqp::ScanExecuterPrepareTasks, ExecuterStateSpan.GetTraceId(), "PrepareTasks", NWilson::EFlags::AUTO_END);
- NMiniKQL::TMemoryUsageInfo memInfo("PrepareTasks");
- NMiniKQL::THolderFactory holderFactory(alloc.Ref(), memInfo, &funcRegistry);
-
auto& tx = Request.Transactions[0];
for (ui32 stageIdx = 0; stageIdx < tx.Body->StagesSize(); ++stageIdx) {
auto& stage = tx.Body->GetStages(stageIdx);
@@ -877,7 +877,6 @@ private:
ExecuterStateSpan.End();
ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::ScanExecuterExecuteState, ExecuterSpan.GetTraceId(), "ExecuteState", NWilson::EFlags::AUTO_END);
}
-
}
void ExecuteScanTx(TVector<NYql::NDqProto::TDqTask>&& computeTasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks) {
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 4650f466796..81ffd6cc812 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -151,8 +151,9 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con
}
IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type,
- const NMiniKQL::TTypeEnvironment& /*typeEnv*/, TVector<IDqOutput::TPtr>&& outputs)
+ const NMiniKQL::TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& outputs)
{
+ auto guard = typeEnv.BindAllocator();
switch (outputDesc.GetTypeCase()) {
case NDqProto::TTaskOutput::kSink:
Y_VERIFY(outputDesc.ChannelsSize() == 0);
@@ -254,8 +255,7 @@ public:
return TaskId;
}
- void BuildTask(const NDqProto::TDqTask& task, const TDqTaskRunnerParameterProvider& parameterProvider)
- {
+ void BuildTask(const NDqProto::TDqTask& task, const TDqTaskRunnerParameterProvider& parameterProvider) {
LOG(TStringBuilder() << "Build task: " << TaskId);
auto startTime = TInstant::Now();
@@ -395,6 +395,7 @@ public:
ProgramParsed.CompGraph = ProgramParsed.CompPattern->Clone(
opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider, &Alloc().Ref()));
} else {
+ auto guard = BindAllocator();
ProgramParsed.CompPattern = MakeComputationPattern(programExplorer, programRoot, ProgramParsed.EntryPoints, opts);
ProgramParsed.CompGraph = ProgramParsed.CompPattern->Clone(
opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider));
@@ -427,10 +428,14 @@ public:
auto it = task.GetParameters().find(name);
YQL_ENSURE(it != task.GetParameters().end());
+ auto guard = typeEnv.BindAllocator();
TDqDataSerializer::DeserializeParam(it->second, type, graphHolderFactory, structMembers[i]);
}
- ValidateParamValue(name, type, structMembers[i]);
+ {
+ auto guard = typeEnv.BindAllocator();
+ ValidateParamValue(name, type, structMembers[i]);
+ }
}
paramNode->SetValue(ProgramParsed.CompGraph->GetContext(), std::move(paramsStructValue));
@@ -607,6 +612,7 @@ public:
} else if (outputConsumers.size() == 1) {
Output = std::move(outputConsumers[0]);
} else {
+ auto guard = BindAllocator();
Output = CreateOutputMultiConsumer(std::move(outputConsumers));
}
@@ -638,6 +644,7 @@ public:
ERunStatus Run() final {
LOG(TStringBuilder() << "Run task: " << TaskId);
if (!ResultStream) {
+ auto guard = BindAllocator();
TBindTerminator term(ProgramParsed.CompGraph->GetTerminator());
ResultStream = ProgramParsed.CompGraph->GetValue();
}
@@ -730,7 +737,7 @@ public:
return {ptr->TransformInput, ptr->TransformOutput};
}
- TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit) override {
+ TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit = {}) override {
auto guard = Context.TypeEnv ? Context.TypeEnv->BindAllocator() : SelfTypeEnv->BindAllocator();
if (memoryLimit) {
guard.GetMutex()->SetLimit(*memoryLimit);
@@ -809,6 +816,7 @@ private:
}
};
+ auto guard = BindAllocator();
while (!Output->IsFull()) {
if (Y_UNLIKELY(CollectProfileStats)) {
auto now = TInstant::Now();
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
index 9bd2238dcb8..d04ba790b30 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
@@ -231,6 +231,9 @@ public:
}
~TComputationGraphBuildingVisitor() {
+ auto g = Env.BindAllocator();
+ NodeFactory.Reset();
+ PatternNodes.Reset();
}
const TTypeEnvironment& GetTypeEnvironment() const {