diff options
author | ulya-sidorina <yulia@ydb.tech> | 2023-02-03 16:02:35 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2023-02-03 16:02:35 +0300 |
commit | 3a523cb63883287a45f6c2f5cc6b3f40e6316b05 (patch) | |
tree | 85b63a2aee75e4574a084baa08508f46cb85607d | |
parent | caf6f6406909fea82e873fe60b8b473c56f02d1c (diff) | |
download | ydb-3a523cb63883287a45f6c2f5cc6b3f40e6316b05.tar.gz |
wrap allocator into shared_ptr
fix(dq_task_runner): wrap allocator into shared_ptr
9 files changed, 151 insertions, 91 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 09bd872c4d0..fe0224adb34 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -266,6 +266,7 @@ public: , InputIndex(args.InputIndex) , TypeEnv(args.TypeEnv) , HolderFactory(args.HolderFactory) + , Alloc(args.Alloc) { TableId = TTableId( Settings.GetTable().GetTableId().GetOwnerId(), @@ -287,6 +288,13 @@ public: } } + virtual ~TKqpReadActor() { + if (!Results.empty() && Alloc) { + TGuard<NMiniKQL::TScopedAlloc> allocGuard(*Alloc); + Results.clear(); + } + } + STFUNC(ReadyState) { Y_UNUSED(ctx); try { @@ -981,6 +989,7 @@ private: const ui64 InputIndex; const NMiniKQL::TTypeEnvironment& TypeEnv; const NMiniKQL::THolderFactory& HolderFactory; + std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; }; diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 564c1b01591..b4c2f0dc3a3 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -24,9 +24,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku public: TKqpStreamLookupActor(ui64 inputIndex, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, - NKikimrKqp::TKqpStreamLookupSettings&& settings) + std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings) : InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv) - , HolderFactory(holderFactory), TableId(MakeTableId(settings.GetTable())) + , HolderFactory(holderFactory), Alloc(alloc), TableId(MakeTableId(settings.GetTable())) , Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId()) , LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>()) , ImmediateTx(settings.GetImmediateTx()) @@ -36,6 +36,13 @@ public: , RetryReadTimeout(RETRY_READ_TIMEOUT) { }; + virtual ~TKqpStreamLookupActor() { + if (Input.HasValue() && Alloc) { + TGuard<NMiniKQL::TScopedAlloc> allocGuard(*Alloc); + Input.Clear(); + } + } + void Bootstrap() { ResolveTable(); @@ -624,6 +631,7 @@ private: const NActors::TActorId ComputeActorId; const NMiniKQL::TTypeEnvironment& TypeEnv; const NMiniKQL::THolderFactory& HolderFactory; + std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; const TTableId TableId; IKqpGateway::TKqpSnapshot Snapshot; const TMaybe<ui64> LockTxId; @@ -647,8 +655,9 @@ private: std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv, - const NMiniKQL::THolderFactory& holderFactory, NKikimrKqp::TKqpStreamLookupSettings&& settings) { - auto actor = new TKqpStreamLookupActor(inputIndex, input, computeActorId, typeEnv, holderFactory, + const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, + NKikimrKqp::TKqpStreamLookupSettings&& settings) { + auto actor = new TKqpStreamLookupActor(inputIndex, input, computeActorId, typeEnv, holderFactory, alloc, std::move(settings)); return {actor, actor}; } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h index 59d8d83b875..51205c88baa 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h @@ -8,7 +8,8 @@ namespace NKqp { std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv, - const NMiniKQL::THolderFactory& holderFactory, NKikimrKqp::TKqpStreamLookupSettings&& settings); + const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, + NKikimrKqp::TKqpStreamLookupSettings&& settings); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp index dab7ce8f5c1..df6b8d5cfbd 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp @@ -8,7 +8,7 @@ void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory) { factory.RegisterInputTransform<NKikimrKqp::TKqpStreamLookupSettings>("StreamLookupInputTransformer", [](NKikimrKqp::TKqpStreamLookupSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TInputTransformArguments&& args) { return CreateStreamLookupActor(args.InputIndex, args.TransformInput, args.ComputeActorId, args.TypeEnv, - args.HolderFactory, std::move(settings)); + args.HolderFactory, args.Alloc, std::move(settings)); }); } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 19c3c1d6122..b823da97c27 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -174,6 +174,7 @@ public: const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; const NKikimr::NMiniKQL::THolderFactory& HolderFactory; ::NMonitoring::TDynamicCounterPtr TaskCounters; + std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; }; struct TSinkArguments { @@ -201,6 +202,7 @@ public: const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; const NKikimr::NMiniKQL::THolderFactory& HolderFactory; NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder; + std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; }; struct TOutputTransformArguments { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 64d3d11547a..d52073182e9 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1524,7 +1524,8 @@ protected: .ComputeActorId = this->SelfId(), .TypeEnv = typeEnv, .HolderFactory = holderFactory, - .TaskCounters = TaskCounters + .TaskCounters = TaskCounters, + .Alloc = TaskRunner ? TaskRunner->GetAllocatorPtr() : nullptr }); } catch (const std::exception& ex) { throw yexception() << "Failed to create source " << inputDesc.GetSource().GetType() << ": " << ex.what(); @@ -1552,7 +1553,8 @@ protected: .ComputeActorId = this->SelfId(), .TypeEnv = typeEnv, .HolderFactory = holderFactory, - .ProgramBuilder = *transform.ProgramBuilder + .ProgramBuilder = *transform.ProgramBuilder, + .Alloc = TaskRunner->GetAllocatorPtr() }); } catch (const std::exception& ex) { throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what(); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 79bb5cc96c2..383a5c0d164 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -218,6 +218,7 @@ public: : Context(context) , Settings(settings) , LogFunc(logFunc) + , AllocatedHolder(std::make_optional<TAllocatedHolder>()) , CollectBasicStats(Settings.CollectBasicStats) , CollectProfileStats(Settings.CollectProfileStats) { @@ -231,12 +232,15 @@ public: } if (!Context.Alloc) { - SelfAlloc = std::make_unique<TScopedAlloc>(__LOCATION__, TAlignedPagePoolCounters(), - Context.FuncRegistry->SupportsSizedAllocators()); + SelfAlloc = std::shared_ptr<TScopedAlloc>(new TScopedAlloc(__LOCATION__, TAlignedPagePoolCounters(), + Context.FuncRegistry->SupportsSizedAllocators()), [](TScopedAlloc* ptr) { + ptr->Acquire(); + delete ptr; + }); } if (!Context.TypeEnv) { - SelfTypeEnv = std::make_unique<TTypeEnvironment>(Context.Alloc ? *Context.Alloc : *SelfAlloc); + AllocatedHolder->SelfTypeEnv = std::make_unique<TTypeEnvironment>(Context.Alloc ? *Context.Alloc : *SelfAlloc); } if (SelfAlloc) { @@ -247,6 +251,8 @@ public: ~TDqTaskRunner() { if (SelfAlloc) { SelfAlloc->Acquire(); + AllocatedHolder.reset(); + SelfAlloc->Release(); } } @@ -273,12 +279,12 @@ public: return nullptr; }; - if (Y_UNLIKELY(CollectProfileStats && !ProgramParsed.StatsRegistry)) { - ProgramParsed.StatsRegistry = NMiniKQL::CreateDefaultStatsRegistry(); + if (Y_UNLIKELY(CollectProfileStats && !AllocatedHolder->ProgramParsed.StatsRegistry)) { + AllocatedHolder->ProgramParsed.StatsRegistry = NMiniKQL::CreateDefaultStatsRegistry(); } TComputationPatternOpts opts(alloc.Ref(), typeEnv, taskRunnerFactory, Context.FuncRegistry, NUdf::EValidateMode::None, validatePolicy, Settings.OptLLVM, EGraphPerProcess::Multi, - ProgramParsed.StatsRegistry.Get()); + AllocatedHolder->ProgramParsed.StatsRegistry.Get()); if (!SecureParamsProvider) { SecureParamsProvider = MakeSimpleSecureParamsProvider(Settings.SecureParams); @@ -414,20 +420,20 @@ public: entry = CreateComputationPattern(task, program.GetRaw()); } - ProgramParsed.PatternCacheEntry = entry; + AllocatedHolder->ProgramParsed.PatternCacheEntry = entry; // clone pattern using TDqTaskRunner's alloc auto opts = CreatePatternOpts(Alloc(), TypeEnv()); - ProgramParsed.CompGraph = ProgramParsed.GetPattern()->Clone( + AllocatedHolder->ProgramParsed.CompGraph = AllocatedHolder->ProgramParsed.GetPattern()->Clone( opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider, &TypeEnv())); - TBindTerminator term(ProgramParsed.CompGraph->GetTerminator()); + TBindTerminator term(AllocatedHolder->ProgramParsed.CompGraph->GetTerminator()); - auto paramNode = ProgramParsed.CompGraph->GetEntryPoint(entry->ProgramInputsCount, /* require */ false); + auto paramNode = AllocatedHolder->ProgramParsed.CompGraph->GetEntryPoint(entry->ProgramInputsCount, /* require */ false); if (paramNode) { // TODO: Remove serialized parameters that are used in OLAP program and not used in current program - const auto& graphHolderFactory = ProgramParsed.CompGraph->GetHolderFactory(); + const auto& graphHolderFactory = AllocatedHolder->ProgramParsed.CompGraph->GetHolderFactory(); NUdf::TUnboxedValue* structMembers; auto paramsCount = entry->ParamsStruct->GetMembersCount(); auto paramsStructValue = graphHolderFactory.CreateDirectArrayHolder(paramsCount, structMembers); @@ -454,7 +460,7 @@ public: } } - paramNode->SetValue(ProgramParsed.CompGraph->GetContext(), std::move(paramsStructValue)); + paramNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(), std::move(paramsStructValue)); } else { /* * This situation is ok, when there are OLAP parameters only. There is no parameter node @@ -480,8 +486,8 @@ public: LOG(TStringBuilder() << "Prepare task: " << TaskId); auto startTime = TInstant::Now(); - auto& holderFactory = ProgramParsed.CompGraph->GetHolderFactory(); - TBindTerminator term(ProgramParsed.CompGraph->GetTerminator()); + auto& holderFactory = AllocatedHolder->ProgramParsed.CompGraph->GetHolderFactory(); + TBindTerminator term(AllocatedHolder->ProgramParsed.CompGraph->GetTerminator()); auto& typeEnv = TypeEnv(); @@ -493,7 +499,7 @@ public: TType** inputType = &entry->InputItemTypes[i]; if (inputDesc.HasTransform()) { const auto& transformDesc = inputDesc.GetTransform(); - transform = &InputTransforms[i]; + transform = &AllocatedHolder->InputTransforms[i]; Y_VERIFY(!transform->TransformInput); Y_VERIFY(!transform->TransformOutput); @@ -527,7 +533,7 @@ public: if (inputDesc.HasSource()) { auto source = CreateDqAsyncInputBuffer(i, *inputType, memoryLimits.ChannelBufferSize, Settings.CollectProfileStats); - auto [_, inserted] = Sources.emplace(i, source); + auto [_, inserted] = AllocatedHolder->Sources.emplace(i, source); Y_VERIFY(inserted); inputs.emplace_back(source); } else { @@ -536,21 +542,21 @@ public: auto inputChannel = CreateDqInputChannel(channelId, *inputType, memoryLimits.ChannelBufferSize, Settings.CollectProfileStats, typeEnv, holderFactory, inputChannelDesc.GetTransportVersion()); - auto ret = InputChannels.emplace(channelId, inputChannel); + auto ret = AllocatedHolder->InputChannels.emplace(channelId, inputChannel); YQL_ENSURE(ret.second, "task: " << TaskId << ", duplicated input channelId: " << channelId); inputs.emplace_back(inputChannel); } } - auto entryNode = ProgramParsed.CompGraph->GetEntryPoint(i, true); + auto entryNode = AllocatedHolder->ProgramParsed.CompGraph->GetEntryPoint(i, true); if (transform) { transform->TransformInput = DqBuildInputValue(inputDesc, transform->TransformInputType, std::move(inputs), holderFactory); inputs.clear(); inputs.emplace_back(transform->TransformOutput); - entryNode->SetValue(ProgramParsed.CompGraph->GetContext(), + entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(), CreateInputUnionValue(std::move(inputs), holderFactory)); } else { - entryNode->SetValue(ProgramParsed.CompGraph->GetContext(), + entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(), DqBuildInputValue(inputDesc, entry->InputItemTypes[i], std::move(inputs), holderFactory)); } } @@ -568,7 +574,7 @@ public: TType** taskOutputType = &entry->OutputItemTypes[i]; if (outputDesc.HasTransform()) { const auto& transformDesc = outputDesc.GetTransform(); - transform = &OutputTransforms[i]; + transform = &AllocatedHolder->OutputTransforms[i]; Y_VERIFY(!transform->TransformInput); Y_VERIFY(!transform->TransformOutput); @@ -593,7 +599,7 @@ public: if (outputDesc.HasSink()) { auto sink = CreateDqAsyncOutputBuffer(i, *taskOutputType, memoryLimits.ChannelBufferSize, Settings.CollectProfileStats); - auto [_, inserted] = Sinks.emplace(i, sink); + auto [_, inserted] = AllocatedHolder->Sinks.emplace(i, sink); Y_VERIFY(inserted); outputs.emplace_back(sink); } else { @@ -615,7 +621,7 @@ public: auto outputChannel = CreateDqOutputChannel(channelId, *taskOutputType, typeEnv, holderFactory, settings, LogFunc); - auto ret = OutputChannels.emplace(channelId, outputChannel); + auto ret = AllocatedHolder->OutputChannels.emplace(channelId, outputChannel); YQL_ENSURE(ret.second, "task: " << TaskId << ", duplicated output channelId: " << channelId); outputs.emplace_back(outputChannel); } @@ -638,12 +644,12 @@ public: } if (outputConsumers.empty()) { - Output = nullptr; + AllocatedHolder->Output = nullptr; } else if (outputConsumers.size() == 1) { - Output = std::move(outputConsumers[0]); + AllocatedHolder->Output = std::move(outputConsumers[0]); } else { auto guard = BindAllocator(); - Output = CreateOutputMultiConsumer(std::move(outputConsumers)); + AllocatedHolder->Output = CreateOutputMultiConsumer(std::move(outputConsumers)); } auto prepareTime = TInstant::Now() - startTime; @@ -659,13 +665,13 @@ public: if (Stats) { Stats->BuildCpuTime += prepareTime; - for (auto&[channelId, inputChannel] : InputChannels) { + for (auto&[channelId, inputChannel] : AllocatedHolder->InputChannels) { Stats->InputChannels.emplace(channelId, inputChannel->GetStats()); } - for (auto&[inputIndex, source] : Sources) { + for (auto&[inputIndex, source] : AllocatedHolder->Sources) { Stats->Sources.emplace(inputIndex, source->GetStats()); } - for (auto&[channelId, outputChannel] : OutputChannels) { + for (auto&[channelId, outputChannel] : AllocatedHolder->OutputChannels) { Stats->OutputChannels.emplace(channelId, outputChannel->GetStats()); } } @@ -673,10 +679,10 @@ public: ERunStatus Run() final { LOG(TStringBuilder() << "Run task: " << TaskId); - if (!ResultStream) { + if (!AllocatedHolder->ResultStream) { auto guard = BindAllocator(); - TBindTerminator term(ProgramParsed.CompGraph->GetTerminator()); - ResultStream = ProgramParsed.CompGraph->GetValue(); + TBindTerminator term(AllocatedHolder->ProgramParsed.CompGraph->GetTerminator()); + AllocatedHolder->ResultStream = AllocatedHolder->ProgramParsed.CompGraph->GetValue(); } RunComputeTime = TDuration::Zero(); @@ -689,9 +695,9 @@ public: if (Y_UNLIKELY(CollectProfileStats)) { Stats->ComputeCpuTimeByRun->Collect(RunComputeTime.MilliSeconds()); - if (ProgramParsed.StatsRegistry) { + if (AllocatedHolder->ProgramParsed.StatsRegistry) { Stats->MkqlStats.clear(); - ProgramParsed.StatsRegistry->ForEachStat([this](const TStatKey& key, i64 value) { + AllocatedHolder->ProgramParsed.StatsRegistry->ForEachStat([this](const TStatKey& key, i64 value) { Stats->MkqlStats.emplace_back(TMkqlStat{key, value}); }); } @@ -732,43 +738,43 @@ public: } IDqInputChannel::TPtr GetInputChannel(ui64 channelId) override { - auto ptr = InputChannels.FindPtr(channelId); + auto ptr = AllocatedHolder->InputChannels.FindPtr(channelId); YQL_ENSURE(ptr, "task: " << TaskId << " does not have input channelId: " << channelId); return *ptr; } IDqAsyncInputBuffer::TPtr GetSource(ui64 inputIndex) override { - auto ptr = Sources.FindPtr(inputIndex); + auto ptr = AllocatedHolder->Sources.FindPtr(inputIndex); YQL_ENSURE(ptr, "task: " << TaskId << " does not have input index: " << inputIndex); return *ptr; } IDqOutputChannel::TPtr GetOutputChannel(ui64 channelId) override { - auto ptr = OutputChannels.FindPtr(channelId); + auto ptr = AllocatedHolder->OutputChannels.FindPtr(channelId); YQL_ENSURE(ptr, "task: " << TaskId << " does not have output channelId: " << channelId); return *ptr; } IDqAsyncOutputBuffer::TPtr GetSink(ui64 outputIndex) override { - auto ptr = Sinks.FindPtr(outputIndex); + auto ptr = AllocatedHolder->Sinks.FindPtr(outputIndex); YQL_ENSURE(ptr, "task: " << TaskId << " does not have output index: " << outputIndex); return *ptr; } std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr> GetInputTransform(ui64 inputIndex) override { - auto ptr = InputTransforms.FindPtr(inputIndex); + auto ptr = AllocatedHolder->InputTransforms.FindPtr(inputIndex); YQL_ENSURE(ptr, "task: " << TaskId << " does not have input index: " << inputIndex << " or such transform"); return {ptr->TransformInput, ptr->TransformOutput}; } std::pair<IDqAsyncOutputBuffer::TPtr, IDqOutputConsumer::TPtr> GetOutputTransform(ui64 outputIndex) override { - auto ptr = OutputTransforms.FindPtr(outputIndex); + auto ptr = AllocatedHolder->OutputTransforms.FindPtr(outputIndex); YQL_ENSURE(ptr, "task: " << TaskId << " does not have output index: " << outputIndex << " or such transform"); return {ptr->TransformInput, ptr->TransformOutput}; } TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit = {}) override { - auto guard = Context.TypeEnv ? Context.TypeEnv->BindAllocator() : SelfTypeEnv->BindAllocator(); + auto guard = Context.TypeEnv ? Context.TypeEnv->BindAllocator() : AllocatedHolder->SelfTypeEnv->BindAllocator(); if (memoryLimit) { guard.GetMutex()->SetLimit(*memoryLimit); } @@ -776,15 +782,19 @@ public: } bool IsAllocatorAttached() override { - return Context.TypeEnv ? Context.TypeEnv->GetAllocator().IsAttached() : SelfTypeEnv->GetAllocator().IsAttached(); + return Context.TypeEnv ? Context.TypeEnv->GetAllocator().IsAttached() : AllocatedHolder->SelfTypeEnv->GetAllocator().IsAttached(); } const NKikimr::NMiniKQL::TTypeEnvironment& GetTypeEnv() const override { - return Context.TypeEnv ? *Context.TypeEnv : *SelfTypeEnv; + return Context.TypeEnv ? *Context.TypeEnv : *AllocatedHolder->SelfTypeEnv; } const NKikimr::NMiniKQL::THolderFactory& GetHolderFactory() const override { - return ProgramParsed.CompGraph->GetHolderFactory(); + return AllocatedHolder->ProgramParsed.CompGraph->GetHolderFactory(); + } + + std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> GetAllocatorPtr() const override { + return SelfAlloc; } const THashMap<TString, TString>& GetSecureParams() const override { @@ -810,17 +820,17 @@ public: } TString Save() const override { - return ProgramParsed.CompGraph->SaveGraphState(); + return AllocatedHolder->ProgramParsed.CompGraph->SaveGraphState(); } void Load(TStringBuf in) override { - Y_VERIFY(!ResultStream); - ProgramParsed.CompGraph->LoadGraphState(in); + Y_VERIFY(!AllocatedHolder->ResultStream); + AllocatedHolder->ProgramParsed.CompGraph->LoadGraphState(in); } private: NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv() { - return Context.TypeEnv ? *Context.TypeEnv : *SelfTypeEnv; + return Context.TypeEnv ? *Context.TypeEnv : *AllocatedHolder->SelfTypeEnv; } NKikimr::NMiniKQL::TScopedAlloc& Alloc() { @@ -829,16 +839,16 @@ private: void FinishImpl() { LOG(TStringBuilder() << "task" << TaskId << ", execution finished, finish consumers"); - Output->Finish(); + AllocatedHolder->Output->Finish(); } ERunStatus FetchAndDispatch() { - if (!Output) { + if (!AllocatedHolder->Output) { LOG("no consumers, Finish execution"); return ERunStatus::Finished; } - TBindTerminator term(ProgramParsed.CompGraph->GetTerminator()); + TBindTerminator term(AllocatedHolder->ProgramParsed.CompGraph->GetTerminator()); auto startComputeTime = TInstant::Now(); Y_DEFER { @@ -852,15 +862,15 @@ private: }; auto guard = BindAllocator(); - if (Output->IsFinishing()) { - if (Output->TryFinish()) { + if (AllocatedHolder->Output->IsFinishing()) { + if (AllocatedHolder->Output->TryFinish()) { FinishImpl(); return ERunStatus::Finished; } else { return ERunStatus::PendingOutput; } } - while (!Output->IsFull()) { + while (!AllocatedHolder->Output->IsFull()) { if (Y_UNLIKELY(CollectProfileStats)) { auto now = TInstant::Now(); StopWaitingOutput(now); @@ -868,15 +878,15 @@ private: } NUdf::TUnboxedValue value; - auto fetchStatus = ResultStream.Fetch(value); + auto fetchStatus = AllocatedHolder->ResultStream.Fetch(value); switch (fetchStatus) { case NUdf::EFetchStatus::Ok: { - Output->Consume(std::move(value)); + AllocatedHolder->Output->Consume(std::move(value)); break; } case NUdf::EFetchStatus::Finish: { - if (!Output->TryFinish()) { + if (!AllocatedHolder->Output->TryFinish()) { break; } FinishImpl(); @@ -898,8 +908,7 @@ private: TLogFunc LogFunc; std::unique_ptr<NUdf::ISecureParamsProvider> SecureParamsProvider; - std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> SelfAlloc; // if not set -> use Context.Alloc - std::unique_ptr<NKikimr::NMiniKQL::TTypeEnvironment> SelfTypeEnv; // if not set -> use Context.TypeEnv + std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> SelfAlloc; // if not set -> use Context.Alloc struct TInputTransformInfo { NUdf::TUnboxedValue TransformInput; @@ -922,16 +931,24 @@ private: return PatternCacheEntry->Pattern.Get(); } }; - TProgramParsed ProgramParsed; - - THashMap<ui64, IDqInputChannel::TPtr> InputChannels; // Channel id -> Channel - THashMap<ui64, IDqAsyncInputBuffer::TPtr> Sources; // Input index -> Source - THashMap<ui64, TInputTransformInfo> InputTransforms; // Output index -> Transform - THashMap<ui64, IDqOutputChannel::TPtr> OutputChannels; // Channel id -> Channel - THashMap<ui64, IDqAsyncOutputBuffer::TPtr> Sinks; // Output index -> Sink - THashMap<ui64, TOutputTransformInfo> OutputTransforms; // Output index -> Transform - IDqOutputConsumer::TPtr Output; - NUdf::TUnboxedValue ResultStream; + + struct TAllocatedHolder { + std::unique_ptr<NKikimr::NMiniKQL::TTypeEnvironment> SelfTypeEnv; // if not set -> use Context.TypeEnv + + TProgramParsed ProgramParsed; + + THashMap<ui64, IDqInputChannel::TPtr> InputChannels; // Channel id -> Channel + THashMap<ui64, IDqAsyncInputBuffer::TPtr> Sources; // Input index -> Source + THashMap<ui64, TInputTransformInfo> InputTransforms; // Output index -> Transform + THashMap<ui64, IDqOutputChannel::TPtr> OutputChannels; // Channel id -> Channel + THashMap<ui64, IDqAsyncOutputBuffer::TPtr> Sinks; // Output index -> Sink + THashMap<ui64, TOutputTransformInfo> OutputTransforms; // Output index -> Transform + + IDqOutputConsumer::TPtr Output; + NUdf::TUnboxedValue ResultStream; + }; + + std::optional<TAllocatedHolder> AllocatedHolder; NKikimr::NMiniKQL::TWatermark Watermark; bool TaskHasEffects = false; diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index 262fe77413e..0e6960b126c 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -311,6 +311,7 @@ public: virtual bool IsAllocatorAttached() = 0; virtual const NKikimr::NMiniKQL::TTypeEnvironment& GetTypeEnv() const = 0; virtual const NKikimr::NMiniKQL::THolderFactory& GetHolderFactory() const = 0; + virtual std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> GetAllocatorPtr() const = 0; virtual const THashMap<TString, TString>& GetSecureParams() const = 0; virtual const THashMap<TString, TString>& GetTaskParams() const = 0; diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index fda87142148..f8efe2d3d36 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -1244,10 +1244,9 @@ public: const TString& traceId) : TraceId(traceId) , Task(task) - , Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true) - , TypeEnv(Alloc) - , MemInfo("TDqTaskRunnerProxy") - , HolderFactory(Alloc.Ref(), MemInfo) + , Alloc(new NKikimr::NMiniKQL::TScopedAlloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true), + [](NKikimr::NMiniKQL::TScopedAlloc* ptr) { ptr->Acquire(); delete ptr; }) + , AllocatedHolder(std::make_optional<TAllocatedHolder>(*Alloc, "TDqTaskRunnerProxy")) , Running(true) , Command(std::move(command)) , StderrReader(MakeHolder<TThread>([this] () { ReadStderr(); })) @@ -1256,13 +1255,15 @@ public: , TaskId(Task.GetId()) , StageId(stageId) { - Alloc.Release(); + Alloc->Release(); StderrReader->Start(); InitTaskMeta(); } ~TTaskRunner() { - Alloc.Acquire(); + Alloc->Acquire(); + AllocatedHolder.reset(); + Alloc->Release(); Command->Kill(); Command->Wait(TDuration::Seconds(0)); } @@ -1343,11 +1344,15 @@ public: } const NMiniKQL::TTypeEnvironment& GetTypeEnv() const override { - return TypeEnv; + return AllocatedHolder->TypeEnv; } const NMiniKQL::THolderFactory& GetHolderFactory() const override { - return HolderFactory; + return AllocatedHolder->HolderFactory; + } + + std::shared_ptr<NMiniKQL::TScopedAlloc> GetAllocatorPtr() const { + return Alloc; } const THashMap<TString, TString>& GetSecureParams() const override { @@ -1359,7 +1364,7 @@ public: } TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit) override { - auto guard = TypeEnv.BindAllocator(); + auto guard = AllocatedHolder->TypeEnv.BindAllocator(); if (memoryLimit) { guard.GetMutex()->SetLimit(*memoryLimit); } @@ -1367,7 +1372,7 @@ public: } bool IsAllocatorAttached() override { - return TypeEnv.GetAllocator().IsAttached(); + return AllocatedHolder->TypeEnv.GetAllocator().IsAttached(); } void Kill() override { @@ -1435,10 +1440,20 @@ private: THashMap<TString, TString> SecureParams; THashMap<TString, TString> TaskParams; - NKikimr::NMiniKQL::TScopedAlloc Alloc; - NKikimr::NMiniKQL::TTypeEnvironment TypeEnv; - NKikimr::NMiniKQL::TMemoryUsageInfo MemInfo; - NKikimr::NMiniKQL::THolderFactory HolderFactory; + std::shared_ptr <NKikimr::NMiniKQL::TScopedAlloc> Alloc; + + struct TAllocatedHolder { + TAllocatedHolder(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TStringBuf& memInfoTitle) + : TypeEnv(alloc) + , MemInfo(memInfoTitle) + , HolderFactory(alloc.Ref(), MemInfo) {} + + NKikimr::NMiniKQL::TTypeEnvironment TypeEnv; + NKikimr::NMiniKQL::TMemoryUsageInfo MemInfo; + NKikimr::NMiniKQL::THolderFactory HolderFactory; + }; + + std::optional<TAllocatedHolder> AllocatedHolder; std::atomic<bool> Running; int Code = -1; @@ -1565,6 +1580,10 @@ public: return Delegate->GetHolderFactory(); } + std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> GetAllocatorPtr() const override { + return Delegate->GetAllocatorPtr(); + } + const THashMap<TString, TString>& GetSecureParams() const override { return Delegate->GetSecureParams(); } |