diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2022-10-25 16:03:46 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2022-10-25 16:03:46 +0300 |
commit | d18928ecee746c0e009d8504d94c374aafe681af (patch) | |
tree | 2ba5846c8205bb38f1ad5d9733f9f248300069cb | |
parent | 4f0ceb8491b8041b1e4d74c824886c3b620e4dc2 (diff) | |
download | ydb-d18928ecee746c0e009d8504d94c374aafe681af.tar.gz |
Refactor usage of TScopedAlloc
-rw-r--r-- | ydb/core/kqp/executer/kqp_data_executer.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_executer_impl.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_literal_executer.cpp | 40 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_partition_helper.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_scan_executer.cpp | 7 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_tasks_runner.cpp | 18 | ||||
-rw-r--r-- | ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp | 3 |
7 files changed, 54 insertions, 27 deletions
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp index 2166e767e1a..e754ebdc555 100644 --- a/ydb/core/kqp/executer/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer/kqp_data_executer.cpp @@ -1365,9 +1365,9 @@ private: auto& funcRegistry = *AppData()->FunctionRegistry; NMiniKQL::TScopedAlloc alloc(__LOCATION__, TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators()); NMiniKQL::TTypeEnvironment typeEnv(alloc); - - NMiniKQL::TMemoryUsageInfo memInfo("PrepareTasks"); + NMiniKQL::TMemoryUsageInfo memInfo("KqpDataExecuter"); NMiniKQL::THolderFactory holderFactory(alloc.Ref(), memInfo, &funcRegistry); + auto unguard = Unguard(alloc); for (ui32 txIdx = 0; txIdx < Request.Transactions.size(); ++txIdx) { auto& tx = Request.Transactions[txIdx]; diff --git a/ydb/core/kqp/executer/kqp_executer_impl.cpp b/ydb/core/kqp/executer/kqp_executer_impl.cpp index b3c233b29a2..4ba5509cdcb 100644 --- a/ydb/core/kqp/executer/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer/kqp_executer_impl.cpp @@ -40,6 +40,7 @@ void BuildKqpExecuterResults(const NKqpProto::TKqpPhyTx& tx, TVector<TKqpExecute void PrepareKqpTaskParameters(const NKqpProto::TKqpPhyStage& stage, const TStageInfo& stageInfo, const TTask& task, NDqProto::TDqTask& dqTask, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory) { + auto g = typeEnv.BindAllocator(); for (auto& paramName : stage.GetProgramParameters()) { auto& dqParams = *dqTask.MutableParameters(); if (auto* taskParam = task.Meta.Params.FindPtr(paramName)) { diff --git a/ydb/core/kqp/executer/kqp_literal_executer.cpp b/ydb/core/kqp/executer/kqp_literal_executer.cpp index 42dfb59e499..5dc42cc766b 100644 --- a/ydb/core/kqp/executer/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer/kqp_literal_executer.cpp @@ -158,6 +158,7 @@ private: NMiniKQL::TTypeEnvironment typeEnv(alloc); NMiniKQL::TMemoryUsageInfo memInfo("KqpLocalExecuter"); NMiniKQL::THolderFactory holderFactory(alloc.Ref(), memInfo, funcRegistry); + auto unguard = Unguard(alloc); ui64 mkqlMemoryLimit = Request.MkqlMemoryLimit > 0 ? Request.MkqlMemoryLimit @@ -189,6 +190,7 @@ private: Y_DEFER { // clear allocator state + TGuard<NMiniKQL::TScopedAlloc> guard(alloc); Results.crop(0); TaskRunners.crop(0); }; @@ -250,7 +252,9 @@ private: if (auto* param = stageInfo.Meta.Tx.Params.Values.FindPtr(name)) { NMiniKQL::TType* typeFromProto; - std::tie(typeFromProto, value) = ImportValueFromProto(param->GetType(), param->GetValue(), typeEnv, holderFactory); + with_lock (typeEnv.GetAllocator()) { + std::tie(typeFromProto, value) = ImportValueFromProto(param->GetType(), param->GetValue(), typeEnv, holderFactory); + } #ifndef NDEBUG YQL_ENSURE(ToString(*type) == ToString(*typeFromProto), "" << *type << " != " << *typeFromProto); #else @@ -276,13 +280,15 @@ private: auto status = taskRunner->Run(); YQL_ENSURE(status == ERunStatus::Finished); - for (auto& taskOutput : task.Outputs) { - for (ui64 outputChannelId : taskOutput.Channels) { - auto outputChannel = taskRunner->GetOutputChannel(outputChannelId); - auto& channelDesc = TasksGraph.GetChannel(outputChannelId); + with_lock (*context.Alloc) { // allocator is used only by outputChannel->PopAll() + for (auto& taskOutput : task.Outputs) { + for (ui64 outputChannelId : taskOutput.Channels) { + auto outputChannel = taskRunner->GetOutputChannel(outputChannelId); + auto& channelDesc = TasksGraph.GetChannel(outputChannelId); - outputChannel->PopAll(Results[channelDesc.DstInputIndex].Rows); - YQL_ENSURE(outputChannel->IsFinished()); + outputChannel->PopAll(Results[channelDesc.DstInputIndex].Rows); + YQL_ENSURE(outputChannel->IsFinished()); + } } } } @@ -296,16 +302,18 @@ private: ui64 rows = 0; ui64 bytes = 0; - TKqpProtoBuilder protoBuilder(context.Alloc, context.TypeEnv, &holderFactory); - for (auto& result : Results) { - rows += result.Rows.size(); - auto* protoResult = response.MutableResult()->AddResults(); - if (result.IsStream) { - protoBuilder.BuildStream(result.Rows, result.ItemType, result.ResultItemType.Get(), protoResult); - } else { - protoBuilder.BuildValue(result.Rows, result.ItemType, protoResult); + with_lock (*context.Alloc) { + TKqpProtoBuilder protoBuilder(context.Alloc, context.TypeEnv, &holderFactory); + for (auto& result : Results) { + rows += result.Rows.size(); + auto* protoResult = response.MutableResult()->AddResults(); + if (result.IsStream) { + protoBuilder.BuildStream(result.Rows, result.ItemType, result.ResultItemType.Get(), protoResult); + } else { + protoBuilder.BuildValue(result.Rows, result.ItemType, protoResult); + } + bytes += protoResult->ByteSizeLong(); } - bytes += protoResult->ByteSizeLong(); } if (Stats) { diff --git a/ydb/core/kqp/executer/kqp_partition_helper.cpp b/ydb/core/kqp/executer/kqp_partition_helper.cpp index 1583ff2247a..134d82d15b5 100644 --- a/ydb/core/kqp/executer/kqp_partition_helper.cpp +++ b/ydb/core/kqp/executer/kqp_partition_helper.cpp @@ -108,6 +108,7 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKeyPrefix(const NDq:: const TTableId& tableId, const TKqpTableKeys& tableKeys, const TKeyDesc& key, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { + auto guard = typeEnv.BindAllocator(); YQL_ENSURE(tableId.HasSamePath(key.TableId)); auto& table = tableKeys.GetTable(tableId); @@ -363,6 +364,7 @@ TVector<TSerializedPointOrRange> FillRangesFromParameter(const TVector<NScheme:: const NKqpProto::TKqpPhyParamValue& rangesParam, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { + auto guard = typeEnv.BindAllocator(); TString paramName = rangesParam.GetParamName(); auto param = stageInfo.Meta.Tx.Params.Values.FindPtr(paramName); @@ -403,6 +405,7 @@ TVector<TSerializedPointOrRange> FillReadRangesInternal(const TVector<NScheme::T const PhyOpReadRanges& readRanges, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { + auto guard = typeEnv.BindAllocator(); if (readRanges.HasKeyRanges()) { return FillRangesFromParameter( keyColumnTypes, readRanges.GetKeyRanges(), stageInfo, holderFactory, typeEnv @@ -514,6 +517,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyOpReadRange& readRange, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { + auto guard = typeEnv.BindAllocator(); const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId); YQL_ENSURE(table); @@ -545,6 +549,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyOpReadRanges& readRanges, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { + auto guard = typeEnv.BindAllocator(); const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId); YQL_ENSURE(table); @@ -586,6 +591,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { + auto guard = typeEnv.BindAllocator(); const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId); YQL_ENSURE(table); YQL_ENSURE(table->TableKind == ETableKind::Olap); @@ -799,6 +805,7 @@ THashMap<ui64, TShardInfo> PartitionLookupByRowsList(const NKqpProto::TKqpPhyRow THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyOpLookup& lookup, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { + auto guard = typeEnv.BindAllocator(); YQL_CLOG(TRACE, ProviderKqp) << "PrunePartitions: " << lookup.DebugString(); if (!lookup.HasKeysValue()) { @@ -830,6 +837,7 @@ template <typename TEffect> THashMap<ui64, TShardInfo> PruneEffectPartitionsImpl(const TKqpTableKeys& tableKeys, const TEffect& effect, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { + auto guard = typeEnv.BindAllocator(); THashMap<ui64, TShardInfo> shardInfoMap; if (effect.HasRowsValue() && effect.GetRowsValue().GetKindCase() == NKqpProto::TKqpPhyValue::kParamValue) diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp index 1cec4fb368c..f85b2f3d0e6 100644 --- a/ydb/core/kqp/executer/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp @@ -662,12 +662,12 @@ private: auto& funcRegistry = *AppData()->FunctionRegistry; NMiniKQL::TScopedAlloc alloc(__LOCATION__, TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators()); NMiniKQL::TTypeEnvironment typeEnv(alloc); + NMiniKQL::TMemoryUsageInfo memInfo("KqpScanExecuter"); + NMiniKQL::THolderFactory holderFactory(alloc.Ref(), memInfo, &funcRegistry); + auto unguard = Unguard(alloc); NWilson::TSpan prepareTasksSpan(TWilsonKqp::ScanExecuterPrepareTasks, ExecuterStateSpan.GetTraceId(), "PrepareTasks", NWilson::EFlags::AUTO_END); - NMiniKQL::TMemoryUsageInfo memInfo("PrepareTasks"); - NMiniKQL::THolderFactory holderFactory(alloc.Ref(), memInfo, &funcRegistry); - auto& tx = Request.Transactions[0]; for (ui32 stageIdx = 0; stageIdx < tx.Body->StagesSize(); ++stageIdx) { auto& stage = tx.Body->GetStages(stageIdx); @@ -877,7 +877,6 @@ private: ExecuterStateSpan.End(); ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::ScanExecuterExecuteState, ExecuterSpan.GetTraceId(), "ExecuteState", NWilson::EFlags::AUTO_END); } - } void ExecuteScanTx(TVector<NYql::NDqProto::TDqTask>&& computeTasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks) { diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 4650f466796..81ffd6cc812 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -151,8 +151,9 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con } IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, - const NMiniKQL::TTypeEnvironment& /*typeEnv*/, TVector<IDqOutput::TPtr>&& outputs) + const NMiniKQL::TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& outputs) { + auto guard = typeEnv.BindAllocator(); switch (outputDesc.GetTypeCase()) { case NDqProto::TTaskOutput::kSink: Y_VERIFY(outputDesc.ChannelsSize() == 0); @@ -254,8 +255,7 @@ public: return TaskId; } - void BuildTask(const NDqProto::TDqTask& task, const TDqTaskRunnerParameterProvider& parameterProvider) - { + void BuildTask(const NDqProto::TDqTask& task, const TDqTaskRunnerParameterProvider& parameterProvider) { LOG(TStringBuilder() << "Build task: " << TaskId); auto startTime = TInstant::Now(); @@ -395,6 +395,7 @@ public: ProgramParsed.CompGraph = ProgramParsed.CompPattern->Clone( opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider, &Alloc().Ref())); } else { + auto guard = BindAllocator(); ProgramParsed.CompPattern = MakeComputationPattern(programExplorer, programRoot, ProgramParsed.EntryPoints, opts); ProgramParsed.CompGraph = ProgramParsed.CompPattern->Clone( opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider)); @@ -427,10 +428,14 @@ public: auto it = task.GetParameters().find(name); YQL_ENSURE(it != task.GetParameters().end()); + auto guard = typeEnv.BindAllocator(); TDqDataSerializer::DeserializeParam(it->second, type, graphHolderFactory, structMembers[i]); } - ValidateParamValue(name, type, structMembers[i]); + { + auto guard = typeEnv.BindAllocator(); + ValidateParamValue(name, type, structMembers[i]); + } } paramNode->SetValue(ProgramParsed.CompGraph->GetContext(), std::move(paramsStructValue)); @@ -607,6 +612,7 @@ public: } else if (outputConsumers.size() == 1) { Output = std::move(outputConsumers[0]); } else { + auto guard = BindAllocator(); Output = CreateOutputMultiConsumer(std::move(outputConsumers)); } @@ -638,6 +644,7 @@ public: ERunStatus Run() final { LOG(TStringBuilder() << "Run task: " << TaskId); if (!ResultStream) { + auto guard = BindAllocator(); TBindTerminator term(ProgramParsed.CompGraph->GetTerminator()); ResultStream = ProgramParsed.CompGraph->GetValue(); } @@ -730,7 +737,7 @@ public: return {ptr->TransformInput, ptr->TransformOutput}; } - TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit) override { + TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit = {}) override { auto guard = Context.TypeEnv ? Context.TypeEnv->BindAllocator() : SelfTypeEnv->BindAllocator(); if (memoryLimit) { guard.GetMutex()->SetLimit(*memoryLimit); @@ -809,6 +816,7 @@ private: } }; + auto guard = BindAllocator(); while (!Output->IsFull()) { if (Y_UNLIKELY(CollectProfileStats)) { auto now = TInstant::Now(); diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp index 9bd2238dcb8..d04ba790b30 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp @@ -231,6 +231,9 @@ public: } ~TComputationGraphBuildingVisitor() { + auto g = Env.BindAllocator(); + NodeFactory.Reset(); + PatternNodes.Reset(); } const TTypeEnvironment& GetTypeEnvironment() const { |