diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2024-01-26 18:14:35 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-26 18:14:35 +0300 |
commit | bc843db89bbfff632aaa66932069ae506ecdbfe6 (patch) | |
tree | 2e301583c19a0e1bc249bd3807532116b9d27cc6 | |
parent | 9f3844c68cef0369e5b2daae02e19c21b23ee65e (diff) | |
download | ydb-bc843db89bbfff632aaa66932069ae506ecdbfe6.tar.gz |
YQL-17542 move allocator ownership from TDqTaskRunner to actors (#1335)
25 files changed, 152 insertions, 101 deletions
diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index 3204f11ec7..b2158db7df 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -246,6 +246,7 @@ void Init( if (!mkqlAllocSize) { mkqlAllocSize = 30_MB; } + Y_ABORT_UNLESS(appData->FunctionRegistry); NYql::NDqs::TLocalWorkerManagerOptions lwmOptions; lwmOptions.Counters = workerManagerCounters; lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, nullptr, false); @@ -257,8 +258,9 @@ void Init( lwmOptions.MkqlProgramHardMemoryLimit = protoConfig.GetResourceManager().GetMkqlTaskHardMemoryLimit(); lwmOptions.MkqlMinAllocSize = mkqlAllocSize; lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory( - [=](const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) { - return lwmOptions.Factory->Get(task, statsMode); + *appData->FunctionRegistry, + [=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) { + return lwmOptions.Factory->Get(alloc, task, statsMode); }); if (protoConfig.GetRateLimiter().GetDataPlaneEnabled()) { lwmOptions.QuoterServiceActorId = NFq::YqQuoterServiceActorId(); diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index 1dfe1fbab6..cc9a3dcae6 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -46,7 +46,6 @@ void TKqpComputeActor::DoBootstrap() { execCtx.ComputeCtx = &ComputeCtx; execCtx.ComputationFactory = NMiniKQL::GetKqpActorComputeFactory(&ComputeCtx); execCtx.ApplyCtx = nullptr; - execCtx.Alloc = nullptr; execCtx.TypeEnv = nullptr; execCtx.PatternCache = GetKqpResourceManager()->GetPatternCache(); @@ -68,7 +67,7 @@ void TKqpComputeActor::DoBootstrap() { settings.ReadRanges.push_back(readRange); } - auto taskRunner = MakeDqTaskRunner(execCtx, settings, logger); + auto taskRunner = MakeDqTaskRunner(TBase::GetAllocator(), execCtx, settings, logger); SetTaskRunner(taskRunner); auto wakeup = [this]{ ContinueExecute(); }; diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index c60a029b7d..f6a48b208c 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -188,7 +188,6 @@ void TKqpScanComputeActor::DoBootstrap() { execCtx.RandomProvider = TAppData::RandomProvider.Get(); execCtx.TimeProvider = TAppData::TimeProvider.Get(); execCtx.ApplyCtx = nullptr; - execCtx.Alloc = nullptr; execCtx.TypeEnv = nullptr; execCtx.PatternCache = GetKqpResourceManager()->GetPatternCache(); @@ -219,7 +218,7 @@ void TKqpScanComputeActor::DoBootstrap() { }; } - auto taskRunner = MakeDqTaskRunner(execCtx, settings, logger); + auto taskRunner = MakeDqTaskRunner(GetAllocator(), execCtx, settings, logger); TBase::SetTaskRunner(taskRunner); auto wakeup = [this] { ContinueExecute(); }; diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp index 5156ed69e9..3d1383bf08 100644 --- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp @@ -19,7 +19,7 @@ using namespace NYql::NDq; namespace { -std::unique_ptr<TDqTaskRunnerContext> CreateTaskRunnerContext(NMiniKQL::TKqpComputeContextBase* computeCtx, NMiniKQL::TScopedAlloc* alloc, +std::unique_ptr<TDqTaskRunnerContext> CreateTaskRunnerContext(NMiniKQL::TKqpComputeContextBase* computeCtx, NMiniKQL::TTypeEnvironment* typeEnv) { std::unique_ptr<TDqTaskRunnerContext> context = std::make_unique<TDqTaskRunnerContext>(); @@ -44,7 +44,6 @@ std::unique_ptr<TDqTaskRunnerContext> CreateTaskRunnerContext(NMiniKQL::TKqpComp return nullptr; }; - context->Alloc = alloc; context->TypeEnv = typeEnv; context->ApplyCtx = nullptr; return context; @@ -167,12 +166,12 @@ public: // task runner settings ComputeCtx = std::make_unique<NMiniKQL::TKqpComputeContextBase>(); - RunnerContext = CreateTaskRunnerContext(ComputeCtx.get(), &Request.TxAlloc->Alloc, &Request.TxAlloc->TypeEnv); + RunnerContext = CreateTaskRunnerContext(ComputeCtx.get(), &Request.TxAlloc->TypeEnv); RunnerContext->PatternCache = GetKqpResourceManager()->GetPatternCache(); TDqTaskRunnerSettings settings = CreateTaskRunnerSettings(Request.StatsMode); for (auto& task : TasksGraph.GetTasks()) { - RunTask(task, *RunnerContext, settings); + RunTask(Request.TxAlloc->Alloc, task, *RunnerContext, settings); if (TerminateIfTimeout()) { return; @@ -183,7 +182,7 @@ public: UpdateCounters(); } - void RunTask(TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) { + void RunTask(NMiniKQL::TScopedAlloc& alloc, TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) { auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); @@ -218,7 +217,7 @@ public: << message); }; - auto taskRunner = MakeDqTaskRunner(context, settings, log); + auto taskRunner = MakeDqTaskRunner(alloc, context, settings, log); TaskRunners.emplace_back(taskRunner); auto taskSettings = NDq::TDqTaskSettings(&protoTask); @@ -228,7 +227,7 @@ public: auto status = taskRunner->Run(); YQL_ENSURE(status == ERunStatus::Finished); - with_lock (*context.Alloc) { // allocator is used only by outputChannel->PopAll() + with_lock (alloc) { // allocator is used only by outputChannel->PopAll() for (auto& taskOutput : task.Outputs) { for (ui64 outputChannelId : taskOutput.Channels) { auto outputChannel = taskRunner->GetOutputChannel(outputChannelId); diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp index ce0f9fc211..d9c4aa41bf 100644 --- a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp +++ b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp @@ -70,11 +70,11 @@ IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outp TKqpTasksRunner::TKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks, + NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc) : LogFunc(logFunc) - , Alloc(execCtx.Alloc) + , Alloc(alloc) { - YQL_ENSURE(execCtx.Alloc); YQL_ENSURE(execCtx.TypeEnv); ApplyCtx = dynamic_cast<NMiniKQL::TKqpDatashardApplyContext *>(execCtx.ApplyCtx); @@ -86,7 +86,7 @@ TKqpTasksRunner::TKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TD try { for (auto&& task : tasks) { ui64 taskId = task.GetId(); - auto runner = MakeDqTaskRunner(execCtx, settings, logFunc); + auto runner = MakeDqTaskRunner(alloc, execCtx, settings, logFunc); if (auto* stats = runner->GetStats()) { Stats.emplace(taskId, stats); } @@ -230,15 +230,16 @@ const NYql::NDq::TDqTaskSettings& TKqpTasksRunner::GetTask(ui64 taskId) const { TGuard<NMiniKQL::TScopedAlloc> TKqpTasksRunner::BindAllocator(TMaybe<ui64> memoryLimit) { if (memoryLimit) { - Alloc->SetLimit(*memoryLimit); + Alloc.SetLimit(*memoryLimit); } - return TGuard(*Alloc); + return TGuard(Alloc); } TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks, + NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc) { - return new TKqpTasksRunner(std::move(tasks), execCtx, settings, logFunc); + return new TKqpTasksRunner(std::move(tasks), alloc, execCtx, settings, logFunc); } } // namespace NKqp diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.h b/ydb/core/kqp/runtime/kqp_tasks_runner.h index 9835aeb4b2..ec84181050 100644 --- a/ydb/core/kqp/runtime/kqp_tasks_runner.h +++ b/ydb/core/kqp/runtime/kqp_tasks_runner.h @@ -16,6 +16,7 @@ NYql::NDq::IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NYql::NDqProto:: class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCopyable { public: TKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks, + NKikimr::NMiniKQL::TScopedAlloc& alloc, const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings, const NYql::NDq::TLogFunc& logFunc); @@ -50,7 +51,7 @@ public: // otherwise use particular memory limit TGuard<NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit = Nothing()); - ui64 GetAllocatedMemory() const { return Alloc->GetAllocated(); } + ui64 GetAllocatedMemory() const { return Alloc.GetAllocated(); } const TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> GetTasksStats() const { return Stats; } private: @@ -58,7 +59,7 @@ private: TMap<ui64, NYql::NDq::TDqTaskSettings> Tasks; TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> Stats; NYql::NDq::TLogFunc LogFunc; - NMiniKQL::TScopedAlloc* Alloc; + NMiniKQL::TScopedAlloc& Alloc; NMiniKQL::TKqpComputeContextBase* ComputeCtx; NMiniKQL::TKqpDatashardApplyContext* ApplyCtx; @@ -72,6 +73,7 @@ private: TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks, + NKikimr::NMiniKQL::TScopedAlloc& alloc, const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings, const NYql::NDq::TLogFunc& logFunc); diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 086dcffd3a..8e2d2cbc47 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -521,7 +521,6 @@ TEngineBay::TEngineBay(TDataShard* self, TTransactionContext& txc, const TActorC KqpExecCtx.RandomProvider = TAppData::RandomProvider.Get(); KqpExecCtx.TimeProvider = TAppData::TimeProvider.Get(); KqpExecCtx.ApplyCtx = KqpApplyCtx.Get(); - KqpExecCtx.Alloc = KqpAlloc.Get(); KqpExecCtx.TypeEnv = KqpTypeEnv.Get(); if (auto rm = NKqp::TryGetKqpResourceManager()) { KqpExecCtx.PatternCache = rm->GetPatternCache(); @@ -701,9 +700,9 @@ NKqp::TKqpTasksRunner& TEngineBay::GetKqpTasksRunner(NKikimrTxDataShard::TKqpTra settings.OptLLVM = "OFF"; settings.TerminateOnError = false; - + Y_ABORT_UNLESS(KqpAlloc); KqpAlloc->SetLimit(10_MB); - KqpTasksRunner = NKqp::CreateKqpTasksRunner(std::move(*tx.MutableTasks()), KqpExecCtx, settings, KqpLogFunc); + KqpTasksRunner = NKqp::CreateKqpTasksRunner(std::move(*tx.MutableTasks()), *KqpAlloc.Get(), KqpExecCtx, settings, KqpLogFunc); } return *KqpTasksRunner; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp index 1811bb4b17..ccb6b638e6 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp @@ -54,7 +54,7 @@ public: }; } - auto taskRunner = TaskRunnerFactory(Task, RuntimeSettings.StatsMode, logger); + auto taskRunner = TaskRunnerFactory(GetAllocator(), Task, RuntimeSettings.StatsMode, logger); SetTaskRunner(taskRunner); auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); }; TDqTaskRunnerExecutionContext execCtx(TxId, RuntimeSettings.UseSpilling, std::move(wakeup)); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index 5aac450998..f2ec5e2d9f 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -364,8 +364,11 @@ struct TComputeMemoryLimits { IMemoryQuotaManager::TPtr MemoryQuotaManager; }; +//temporary flag to integarate changes in interface +#define Y_YQL_DQ_TASK_RUNNER_REQUIRES_ALLOCATOR 1 + using TTaskRunnerFactory = std::function< - TIntrusivePtr<IDqTaskRunner>(const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TLogFunc& logFunc) + TIntrusivePtr<IDqTaskRunner>(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TLogFunc& logFunc) >; void FillAsyncStats(NDqProto::TDqAsyncBufferStats& proto, TDqAsyncStats stats); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index bfd081e4cc..854f63e523 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -206,6 +206,12 @@ protected: , Running(!Task.GetCreateSuspended()) , PassExceptions(passExceptions) { + Alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>( + __LOCATION__, + NKikimr::TAlignedPagePoolCounters(), + FunctionRegistry->SupportsSizedAllocators(), + false + ); InitMonCounters(taskCounters); InitializeTask(); if (ownMemoryQuota) { @@ -626,8 +632,8 @@ protected: void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssues issues) { CA_LOG_E(InternalErrorLogString(statusCode, issues)); if (TaskRunner) { - TaskRunner->GetAllocatorPtr()->InvalidateMemInfo(); - TaskRunner->GetAllocatorPtr()->DisableStrictAllocationCheck(); + TaskRunner->GetAllocator().InvalidateMemInfo(); + TaskRunner->GetAllocator().DisableStrictAllocationCheck(); } State = NDqProto::COMPUTE_STATE_FAILURE; ReportStateAndMaybeDie(statusCode, issues); @@ -1365,6 +1371,10 @@ protected: } } +protected: + NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() { + return *Alloc.get(); + } private: virtual const TDqMemoryQuota::TProfileStats* GetMemoryProfileStats() const { Y_ABORT_UNLESS(MemoryQuota); @@ -1586,7 +1596,7 @@ protected: .TypeEnv = typeEnv, .HolderFactory = holderFactory, .TaskCounters = TaskCounters, - .Alloc = TaskRunner ? TaskRunner->GetAllocatorPtr() : nullptr, + .Alloc = TaskRunner ? Alloc : nullptr, .MemoryQuotaManager = MemoryLimits.MemoryQuotaManager, .SourceSettings = (!settings.empty() ? settings.at(inputIndex) : nullptr), .Arena = Task.GetArena(), @@ -1619,7 +1629,7 @@ protected: .TypeEnv = typeEnv, .HolderFactory = holderFactory, .ProgramBuilder = *transform.ProgramBuilder, - .Alloc = TaskRunner->GetAllocatorPtr(), + .Alloc = Alloc, .TraceId = ComputeActorSpan.GetTraceId() }); } catch (const std::exception& ex) { @@ -2222,7 +2232,8 @@ protected: LastSendStatsTime = now; } - +private: + std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; //must be declared on top to be destroyed after all the rest protected: const NActors::TActorId ExecuterId; const TTxId TxId; diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h b/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h index 9ae31976c1..ebd3dd0838 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h @@ -49,7 +49,7 @@ struct ITaskRunnerActorFactory { THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota = {}) = 0; }; -ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory); +ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory); } // namespace NTaskRunnerActor diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index f0bb500984..9cf25192b6 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -38,15 +38,23 @@ class TLocalTaskRunnerActor public: static constexpr char ActorName[] = "YQL_DQ_TASK_RUNNER"; - TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota) + TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota) : TActor<TLocalTaskRunnerActor>(&TLocalTaskRunnerActor::Handler) + , FuncRegistry(funcRegistry) , Parent(parent) , Factory(factory) , TxId(txId) , TaskId(taskId) , InputChannelsWithDisabledCheckpoints(std::move(inputChannelsWithDisabledCheckpoints)) , MemoryQuota(std::move(memoryQuota)) - { } + { + Alloc = std::make_unique<NKikimr::NMiniKQL::TScopedAlloc>( + __LOCATION__, + NKikimr::TAlignedPagePoolCounters(), + FuncRegistry.SupportsSizedAllocators(), + false + ); + } ~TLocalTaskRunnerActor() { } @@ -407,7 +415,7 @@ private: void OnDqTask(TEvTaskRunnerCreate::TPtr& ev) { ParentId = ev->Sender; auto settings = NDq::TDqTaskSettings(&ev->Get()->Task); - TaskRunner = Factory(settings, ev->Get()->StatsMode, [this](const TString& message) { + TaskRunner = Factory(*Alloc.get(), settings, ev->Get()->StatsMode, [this](const TString& message) { LOG_D(message); }); @@ -463,6 +471,8 @@ private: THolder<TEvDq::TEvAbortExecution> GetError(const TString& message) { return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::BAD_REQUEST, TVector<TIssue>{TIssue(message).SetCode(TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR)}); } + const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry; + std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; NActors::TActorId ParentId; ITaskRunnerActor::ICallbacks* Parent; @@ -477,8 +487,9 @@ private: }; struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory { - TLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory) + TLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory) : Factory(factory) + , FuncRegistry(funcRegistry) { } std::tuple<ITaskRunnerActor*, NActors::IActor*> Create( @@ -488,7 +499,7 @@ struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory { THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota) override { - auto* actor = new TLocalTaskRunnerActor(parent, Factory, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota)); + auto* actor = new TLocalTaskRunnerActor(parent, Factory, FuncRegistry, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota)); return std::make_tuple( static_cast<ITaskRunnerActor*>(actor), static_cast<NActors::IActor*>(actor) @@ -496,11 +507,12 @@ struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory { } TTaskRunnerFactory Factory; + const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry; }; -ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory) +ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory) { - return ITaskRunnerActorFactory::TPtr(new TLocalTaskRunnerActorFactory(factory)); + return ITaskRunnerActorFactory::TPtr(new TLocalTaskRunnerActorFactory(funcRegistry, factory)); } } // namespace NTaskRunnerActor diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 4fa8b13063..07c85ab6e6 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -223,7 +223,7 @@ inline TCollectStatsLevel StatsModeToCollectStatsLevel(NDqProto::EDqStatsMode st //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class TDqTaskRunner : public IDqTaskRunner { public: - TDqTaskRunner(const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc) + TDqTaskRunner(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc) : Context(context) , Settings(settings) , LogFunc(logFunc) @@ -237,27 +237,18 @@ public: } } - if (!Context.Alloc) { - SelfAlloc = std::shared_ptr<TScopedAlloc>(new TScopedAlloc( - __LOCATION__, - TAlignedPagePoolCounters(), - Context.FuncRegistry->SupportsSizedAllocators(), - false - )); + if (Context.TypeEnv) { + YQL_ENSURE(std::addressof(alloc) == std::addressof(TypeEnv().GetAllocator())); + } else { + AllocatedHolder->SelfTypeEnv = std::make_unique<TTypeEnvironment>(alloc); } - - if (!Context.TypeEnv) { - AllocatedHolder->SelfTypeEnv = std::make_unique<TTypeEnvironment>(Alloc()); - } - YQL_ENSURE(std::addressof(Alloc()) == std::addressof(TypeEnv().GetAllocator())); + } ~TDqTaskRunner() { - if (SelfAlloc) { - auto guard = Guard(*SelfAlloc.get()); - Stats.reset(); - AllocatedHolder.reset(); - } + auto guard = Guard(Alloc()); + Stats.reset(); + AllocatedHolder.reset(); } bool CollectFull() const { @@ -815,10 +806,9 @@ public: const NKikimr::NMiniKQL::THolderFactory& GetHolderFactory() const override { return AllocatedHolder->ProgramParsed.CompGraph->GetHolderFactory(); } - - std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> GetAllocatorPtr() const override { - YQL_ENSURE(SelfAlloc); - return SelfAlloc; + + NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() const override { + return Alloc(); } const THashMap<TString, TString>& GetSecureParams() const override { @@ -851,14 +841,13 @@ public: } private: - NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv() { + NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv() const { return Context.TypeEnv ? *Context.TypeEnv : *AllocatedHolder->SelfTypeEnv; } - NKikimr::NMiniKQL::TScopedAlloc& Alloc() { - return Context.Alloc ? *Context.Alloc : *SelfAlloc; + NKikimr::NMiniKQL::TScopedAlloc& Alloc() const { + return GetTypeEnv().GetAllocator(); } - void FinishImpl() { LOG(TStringBuilder() << "task" << TaskId << ", execution finished, finish consumers"); AllocatedHolder->Output->Finish(); @@ -938,9 +927,6 @@ private: TDqTaskRunnerSettings Settings; TLogFunc LogFunc; std::unique_ptr<NUdf::ISecureParamsProvider> SecureParamsProvider; - - std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> SelfAlloc; // if not set -> use Context.Alloc - struct TInputTransformInfo { NUdf::TUnboxedValue TransformInput; IDqAsyncInputBuffer::TPtr TransformOutput; @@ -1019,10 +1005,10 @@ private: } }; -TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(const TDqTaskRunnerContext& ctx, const TDqTaskRunnerSettings& settings, +TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskRunnerContext& ctx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc) { - return new TDqTaskRunner(ctx, settings, logFunc); + return new TDqTaskRunner(alloc, ctx, settings, logFunc); } } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index b50f1ba904..b5632c0005 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -121,7 +121,6 @@ struct TDqTaskRunnerContext { NKikimr::NMiniKQL::TComputationNodeFactory ComputationFactory; NUdf::IApplyContext* ApplyCtx = nullptr; NKikimr::NMiniKQL::TCallableVisitFuncProvider FuncProvider; - NKikimr::NMiniKQL::TScopedAlloc* Alloc = nullptr; NKikimr::NMiniKQL::TTypeEnvironment* TypeEnv = nullptr; std::shared_ptr<NKikimr::NMiniKQL::TComputationPatternLRUCache> PatternCache; }; @@ -380,7 +379,7 @@ public: virtual bool IsAllocatorAttached() = 0; virtual const NKikimr::NMiniKQL::TTypeEnvironment& GetTypeEnv() const = 0; virtual const NKikimr::NMiniKQL::THolderFactory& GetHolderFactory() const = 0; - virtual std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> GetAllocatorPtr() const = 0; + virtual NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() const = 0; virtual const THashMap<TString, TString>& GetSecureParams() const = 0; virtual const THashMap<TString, TString>& GetTaskParams() const = 0; @@ -397,8 +396,12 @@ public: virtual const NKikimr::NMiniKQL::TWatermark& GetWatermark() const = 0; }; -TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(const TDqTaskRunnerContext& ctx, const TDqTaskRunnerSettings& settings, - const TLogFunc& logFunc); +TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner( + NKikimr::NMiniKQL::TScopedAlloc& alloc, + const TDqTaskRunnerContext& ctx, + const TDqTaskRunnerSettings& settings, + const TLogFunc& logFunc +); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp index ea95feb7da..80d5310d8e 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp @@ -58,9 +58,9 @@ IActor* CreateComputeActor( } } - auto taskRunnerFactory = [factory = options.Factory](const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& logger) { + auto taskRunnerFactory = [factory = options.Factory](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& logger) { Y_UNUSED(logger); - return factory->Get(task, statsMode, {}); + return factory->Get(alloc, task, statsMode, {}); }; if (computeActorType.empty() || computeActorType == "old" || computeActorType == "sync") { diff --git a/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp b/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp index 07438e01c6..2f4f4ed032 100644 --- a/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp +++ b/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp @@ -182,7 +182,7 @@ public: NYql::NDqs::TLocalWorkerManagerOptions lwmOptions; lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory(); lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateTaskRunnerActorFactory( - lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory); + lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory, nullptr); auto localWM = CreateLocalWorkerManager(lwmOptions); ActorRuntime_->AddLocalService(MakeWorkerManagerActorID(NodeId(i)), TActorSetupCmd{localWM, TMailboxType::Simple, 0}, i); diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp index 372b7be9fc..70ca324ce3 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp @@ -67,9 +67,10 @@ public: lwmOptions.FunctionRegistry = functionRegistry; lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory(); lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory( - [factory=lwmOptions.Factory](const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& ) + *functionRegistry, + [factory=lwmOptions.Factory](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& ) { - return factory->Get(task, statsMode); + return factory->Get(alloc, task, statsMode); }); lwmOptions.Counters = NDqs::TWorkerManagerCounters(lwmGroup); lwmOptions.DropTaskCountersOnFinish = false; diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 939a7faafd..da56086027 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -122,6 +122,11 @@ public: ? CreateDeterministicRandomProvider(1) : State->RandomProvider; + TScopedAlloc alloc( + __LOCATION__, + NKikimr::TAlignedPagePoolCounters(), + State->FunctionRegistry->SupportsSizedAllocators(), + false); NDq::TDqTaskRunnerContext executionContext; executionContext.FuncRegistry = State->FunctionRegistry; @@ -138,7 +143,7 @@ public: settings.OptLLVM = "OFF"; // Don't use LLVM for local execution settings.SecureParams = secureParams; settings.StatsMode = NDqProto::DQ_STATS_MODE_BASIC; - auto runner = NDq::MakeDqTaskRunner(executionContext, settings, {}); + auto runner = NDq::MakeDqTaskRunner(alloc, executionContext, settings, {}); auto runnerSettings = NDq::TDqTaskSettings(&task); { diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index 2445da2e8a..e92dc952a3 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -724,7 +724,16 @@ public: Ctx.FuncProvider = TaskTransformFactory(taskParams, Ctx.FuncRegistry); - Runner = MakeDqTaskRunner(Ctx, settings, nullptr); + Y_ABORT_UNLESS(!Alloc); + Y_ABORT_UNLESS(FunctionRegistry); + Alloc = std::make_unique<NKikimr::NMiniKQL::TScopedAlloc>( + __LOCATION__, + NKikimr::TAlignedPagePoolCounters(), + FunctionRegistry->SupportsSizedAllocators(), + false + ); + + Runner = MakeDqTaskRunner(*Alloc.get(), Ctx, settings, nullptr); }); auto guard = Runner->BindAllocator(DqConfiguration->MemoryLimit.Get().GetOrElse(0)); @@ -753,7 +762,8 @@ public: result.Save(&output); } - +private: + std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; NKikimr::NMiniKQL::TComputationNodeFactory ComputationFactory; TTaskTransformFactory TaskTransformFactory; THashMap<TString, i64> CurrentJobStats; diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp index 69395978b4..2d633c5399 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp @@ -220,11 +220,11 @@ public: ExecutionContext.PatternCache = patternCache; } - ITaskRunner::TPtr GetOld(const TDqTaskSettings& task, const TString& traceId) override { - return new TLocalTaskRunner(task, Get(task, NDqProto::DQ_STATS_MODE_BASIC, traceId)); + ITaskRunner::TPtr GetOld(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, const TString& traceId) override { + return new TLocalTaskRunner(task, Get(alloc, task, NDqProto::DQ_STATS_MODE_BASIC, traceId)); } - TIntrusivePtr<NDq::IDqTaskRunner> Get(const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TString& traceId) override { + TIntrusivePtr<NDq::IDqTaskRunner> Get(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TString& traceId) override { Y_UNUSED(traceId); NDq::TDqTaskRunnerSettings settings; settings.TerminateOnError = TerminateOnError; @@ -262,7 +262,7 @@ public: } auto ctx = ExecutionContext; ctx.FuncProvider = TaskTransformFactory(settings.TaskParams, ctx.FuncRegistry); - return MakeDqTaskRunner(ctx, settings, { }); + return MakeDqTaskRunner(alloc, ctx, settings, { }); } private: diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index 2df5b962bd..a52c0e5256 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -1394,8 +1394,8 @@ public: return AllocatedHolder->HolderFactory; } - std::shared_ptr<NMiniKQL::TScopedAlloc> GetAllocatorPtr() const { - return Alloc; + NMiniKQL::TScopedAlloc& GetAllocator() const { + return *Alloc.get(); } const THashMap<TString, TString>& GetSecureParams() const override { @@ -1697,8 +1697,8 @@ public: return Delegate->GetHolderFactory(); } - std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> GetAllocatorPtr() const override { - return Delegate->GetAllocatorPtr(); + NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() const override { + return Delegate->GetAllocator(); } const THashMap<TString, TString>& GetSecureParams() const override { @@ -1881,7 +1881,8 @@ public: TaskScheduler.Start(); } - ITaskRunner::TPtr GetOld(const NDq::TDqTaskSettings& tmp, const TString& traceId) override { + ITaskRunner::TPtr GetOld(NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& tmp, const TString& traceId) override { + Y_UNUSED(alloc); Yql::DqsProto::TTaskMeta taskMeta; tmp.GetMeta().UnpackTo(&taskMeta); ui64 stageId = taskMeta.GetStageId(); @@ -1890,9 +1891,10 @@ public: return new TTaskRunner(task, std::move(result), stageId, traceId); } - TIntrusivePtr<NDq::IDqTaskRunner> Get(const NDq::TDqTaskSettings& tmp, NDqProto::EDqStatsMode statsMode, const TString& traceId) override + TIntrusivePtr<NDq::IDqTaskRunner> Get(NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& tmp, NDqProto::EDqStatsMode statsMode, const TString& traceId) override { Y_UNUSED(statsMode); + Y_UNUSED(alloc); Yql::DqsProto::TTaskMeta taskMeta; tmp.GetMeta().UnpackTo(&taskMeta); diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h index 011d9e196e..ec9fcec176 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h @@ -83,9 +83,9 @@ class IProxyFactory: public TThrRefBase, private TNonCopyable { public: using TPtr = TIntrusivePtr<IProxyFactory>; - virtual ITaskRunner::TPtr GetOld(const NDq::TDqTaskSettings& task, const TString& traceId = "") = 0; + virtual ITaskRunner::TPtr GetOld(NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, const TString& traceId = "") = 0; - virtual TIntrusivePtr<NDq::IDqTaskRunner> Get(const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TString& traceId = "TODO") = 0; + virtual TIntrusivePtr<NDq::IDqTaskRunner> Get(NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TString& traceId = "TODO") = 0; }; } // namespace NYql::NTaskRunnerProxy diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index 093ffbca84..0b33b9c75a 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -192,6 +192,7 @@ public: TTaskRunnerActor( ITaskRunnerActor::ICallbacks* parent, const NTaskRunnerProxy::IProxyFactory::TPtr& factory, + const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, const ITaskRunnerInvoker::TPtr& invoker, const TTxId& txId, ui64 taskId, @@ -201,6 +202,7 @@ public: , TraceId(TStringBuilder() << txId) , TaskId(taskId) , Factory(factory) + , FuncRegistry(funcRegistry) , Invoker(invoker) , Local(Invoker->IsLocal()) , Settings(MakeIntrusive<TDqConfiguration>()) @@ -596,7 +598,15 @@ private: StageId = taskMeta.GetStageId(); NDq::TDqTaskSettings settings(&ev->Get()->Task); - TaskRunner = Factory->GetOld(settings, TraceId); + YQL_ENSURE(!Alloc); + YQL_ENSURE(FuncRegistry); + Alloc = std::make_unique<NKikimr::NMiniKQL::TScopedAlloc>( + __LOCATION__, + NKikimr::TAlignedPagePoolCounters(), + FuncRegistry->SupportsSizedAllocators(), + false + ); + TaskRunner = Factory->GetOld(*Alloc.get(), settings, TraceId); } catch (...) { TString message = "Could not create TaskRunner for " + ToString(taskId) + " on node " + ToString(replyTo.NodeId()) + ", error: " + CurrentExceptionMessage(); Send(replyTo, TEvDq::TEvAbortExecution::InternalError(message), 0, cookie); @@ -721,11 +731,13 @@ private: return spillingStorageInfo; } + std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; NActors::TActorId ParentId; ITaskRunnerActor::ICallbacks* Parent; const TString TraceId; const ui64 TaskId; NTaskRunnerProxy::IProxyFactory::TPtr Factory; + const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry; NTaskRunnerProxy::ITaskRunner::TPtr TaskRunner; ITaskRunnerInvoker::TPtr Invoker; bool Local; @@ -746,9 +758,11 @@ public: TTaskRunnerActorFactory( const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory, const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory, + const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, TWorkerRuntimeData* runtimeData) : ProxyFactory(proxyFactory) , InvokerFactory(invokerFactory) + , FuncRegistry(funcRegistry) , RuntimeData(runtimeData) { } @@ -759,7 +773,7 @@ public: THashSet<ui32>&&, THolder<NYql::NDq::TDqMemoryQuota>&&) override { - auto* actor = new TTaskRunnerActor(parent, ProxyFactory, InvokerFactory->Create(), txId, taskId, RuntimeData); + auto* actor = new TTaskRunnerActor(parent, ProxyFactory, FuncRegistry, InvokerFactory->Create(), txId, taskId, RuntimeData); return std::make_tuple( static_cast<ITaskRunnerActor*>(actor), static_cast<NActors::IActor*>(actor) @@ -769,15 +783,17 @@ public: private: NTaskRunnerProxy::IProxyFactory::TPtr ProxyFactory; NDqs::ITaskRunnerInvokerFactory::TPtr InvokerFactory; + const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry; TWorkerRuntimeData* RuntimeData; }; ITaskRunnerActorFactory::TPtr CreateTaskRunnerActorFactory( const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory, const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory, + const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, TWorkerRuntimeData* runtimeData) { - return ITaskRunnerActorFactory::TPtr(new TTaskRunnerActorFactory(proxyFactory, invokerFactory, runtimeData)); + return ITaskRunnerActorFactory::TPtr(new TTaskRunnerActorFactory(proxyFactory, invokerFactory, funcRegistry, runtimeData)); } } // namespace NTaskRunnerActor diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h index 260b2a7e0d..9439364618 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h @@ -29,6 +29,7 @@ namespace NTaskRunnerActor { ITaskRunnerActorFactory::TPtr CreateTaskRunnerActorFactory( const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory, const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory, + const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, TWorkerRuntimeData* runtimeData = nullptr); } // namespace NTaskRunnerActor diff --git a/ydb/library/yql/tools/dq/worker_node/main.cpp b/ydb/library/yql/tools/dq/worker_node/main.cpp index 900eae7e2c..72720ddcc9 100644 --- a/ydb/library/yql/tools/dq/worker_node/main.cpp +++ b/ydb/library/yql/tools/dq/worker_node/main.cpp @@ -389,7 +389,6 @@ int main(int argc, char** argv) { NYql::NDqs::TLocalWorkerManagerOptions lwmOptions; bool disablePipe = res.Has("disable_pipe"); NKikimr::NMiniKQL::IStatsRegistryPtr statsRegistry = NKikimr::NMiniKQL::CreateDefaultStatsRegistry(); - lwmOptions.Factory = disablePipe ? NTaskRunnerProxy::CreateFactory(functionRegistry.Get(), dqCompFactory, dqTaskTransformFactory, patternCache, true) : NTaskRunnerProxy::CreatePipeFactory(pfOptions); @@ -399,12 +398,13 @@ int main(int argc, char** argv) { lwmOptions.TaskRunnerInvokerFactory = disablePipe ? TTaskRunnerInvokerFactory::TPtr(new NDqs::TTaskRunnerInvokerFactory()) : TTaskRunnerInvokerFactory::TPtr(new TConcurrentInvokerFactory(2*capacity)); + YQL_ENSURE(functionRegistry); lwmOptions.TaskRunnerActorFactory = disablePipe - ? NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory([=](const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& ) + ? NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(*functionRegistry.Get(), [=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& ) { - return lwmOptions.Factory->Get(task, statsMode); + return lwmOptions.Factory->Get(alloc, task, statsMode); }) - : NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory); + : NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory, functionRegistry.Get()); lwmOptions.ComputeActorOwnsCounters = true; lwmOptions.UseSpilling = res.Has("enable-spilling"); auto resman = NDqs::CreateLocalWorkerManager(lwmOptions); |