diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2024-01-31 11:14:47 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-31 11:14:47 +0300 |
commit | 90d11102bc60465d4d581372bd1a3a5c114dbae0 (patch) | |
tree | 7b21d9244de44d8a05f12709607c95b5de0d2e03 | |
parent | d224e9843d2bd10b29b0eb2067b1bc13c2a6040c (diff) | |
download | ydb-90d11102bc60465d4d581372bd1a3a5c114dbae0.tar.gz |
YQL-17542 pass allocator from ComputeActor to TaskRunnerActor (#1445)
11 files changed, 36 insertions, 44 deletions
diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index b2158db7df..0714425a6e 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -258,7 +258,6 @@ void Init( lwmOptions.MkqlProgramHardMemoryLimit = protoConfig.GetResourceManager().GetMkqlTaskHardMemoryLimit(); lwmOptions.MkqlMinAllocSize = mkqlAllocSize; lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory( - *appData->FunctionRegistry, [=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) { return lwmOptions.Factory->Get(alloc, task, statsMode); }); diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 50ead996d9..4c2b2beedf 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -113,7 +113,7 @@ public: } } std::tie(TaskRunnerActor, actor) = TaskRunnerActorFactory->Create( - this, GetTxId(), Task.GetId(), std::move(inputWithDisabledCheckpointing), InitMemoryQuota()); + this, TBase::GetAllocatorPtr(), GetTxId(), Task.GetId(), std::move(inputWithDisabledCheckpointing), InitMemoryQuota()); TaskRunnerActorId = RegisterWithSameMailbox(actor); TDqTaskRunnerMemoryLimits limits; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 94aaa3811e..5b4a3fa597 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1299,6 +1299,9 @@ protected: } protected: + std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> GetAllocatorPtr() { + return Alloc; + } NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() { return *Alloc.get(); } diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h b/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h index ebd3dd0838..295e5ce693 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h @@ -43,13 +43,16 @@ struct ITaskRunnerActorFactory { virtual std::tuple<ITaskRunnerActor*, NActors::IActor*> Create( ITaskRunnerActor::ICallbacks* parent, + std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints = {}, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota = {}) = 0; }; -ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory); +// temporary for YQL-17542 +#define Y_YQL_DQ_TASK_RUNNER_ACTOR_FACTORY_COMPATIBILITY_1 +ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory); } // namespace NTaskRunnerActor diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index 9cf25192b6..2cb8e72eeb 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -38,9 +38,9 @@ class TLocalTaskRunnerActor public: static constexpr char ActorName[] = "YQL_DQ_TASK_RUNNER"; - TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota) + TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota) : TActor<TLocalTaskRunnerActor>(&TLocalTaskRunnerActor::Handler) - , FuncRegistry(funcRegistry) + , Alloc(alloc) , Parent(parent) , Factory(factory) , TxId(txId) @@ -48,12 +48,6 @@ public: , InputChannelsWithDisabledCheckpoints(std::move(inputChannelsWithDisabledCheckpoints)) , MemoryQuota(std::move(memoryQuota)) { - Alloc = std::make_unique<NKikimr::NMiniKQL::TScopedAlloc>( - __LOCATION__, - NKikimr::TAlignedPagePoolCounters(), - FuncRegistry.SupportsSizedAllocators(), - false - ); } ~TLocalTaskRunnerActor() @@ -471,8 +465,7 @@ private: THolder<TEvDq::TEvAbortExecution> GetError(const TString& message) { return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::BAD_REQUEST, TVector<TIssue>{TIssue(message).SetCode(TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR)}); } - const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry; - std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; + std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; NActors::TActorId ParentId; ITaskRunnerActor::ICallbacks* Parent; @@ -487,19 +480,19 @@ private: }; struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory { - TLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory) + TLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory) : Factory(factory) - , FuncRegistry(funcRegistry) { } std::tuple<ITaskRunnerActor*, NActors::IActor*> Create( ITaskRunnerActor::ICallbacks* parent, + std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota) override { - auto* actor = new TLocalTaskRunnerActor(parent, Factory, FuncRegistry, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota)); + auto* actor = new TLocalTaskRunnerActor(parent, Factory, alloc, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota)); return std::make_tuple( static_cast<ITaskRunnerActor*>(actor), static_cast<NActors::IActor*>(actor) @@ -507,12 +500,11 @@ struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory { } TTaskRunnerFactory Factory; - const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry; }; -ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory) +ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory) { - return ITaskRunnerActorFactory::TPtr(new TLocalTaskRunnerActorFactory(funcRegistry, factory)); + return ITaskRunnerActorFactory::TPtr(new TLocalTaskRunnerActorFactory(factory)); } } // namespace NTaskRunnerActor diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 0d1e760b2e..a4588de0a1 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -253,7 +253,16 @@ private: } NActors::IActor* actor; - std::tie(Actor, actor) = TaskRunnerActorFactory->Create(this, TraceId, Task.GetId()); + std::tie(Actor, actor) = TaskRunnerActorFactory->Create( + this, + std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>( + __LOCATION__, + NKikimr::TAlignedPagePoolCounters(), + true, + false + ), + TraceId, + Task.GetId()); TaskRunnerActor = RegisterLocalChild(actor); TDqTaskRunnerMemoryLimits limits; // used for local mode only limits.ChannelBufferSize = 20_MB; diff --git a/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp b/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp index 2f4f4ed032..07438e01c6 100644 --- a/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp +++ b/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp @@ -182,7 +182,7 @@ public: NYql::NDqs::TLocalWorkerManagerOptions lwmOptions; lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory(); lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateTaskRunnerActorFactory( - lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory, nullptr); + lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory); auto localWM = CreateLocalWorkerManager(lwmOptions); ActorRuntime_->AddLocalService(MakeWorkerManagerActorID(NodeId(i)), TActorSetupCmd{localWM, TMailboxType::Simple, 0}, i); diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp index 06b76ae212..ac5552080c 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp @@ -67,7 +67,6 @@ public: lwmOptions.FunctionRegistry = functionRegistry; lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory(); lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory( - *functionRegistry, [factory=lwmOptions.Factory](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& ) { return factory->Get(alloc, task, statsMode); diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index 771e27e764..55f844a076 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -191,18 +191,18 @@ public: TTaskRunnerActor( ITaskRunnerActor::ICallbacks* parent, + std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const NTaskRunnerProxy::IProxyFactory::TPtr& factory, - const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, const ITaskRunnerInvoker::TPtr& invoker, const TTxId& txId, ui64 taskId, TWorkerRuntimeData* runtimeData) : TActor<TTaskRunnerActor>(&TTaskRunnerActor::Handler) , Parent(parent) + , Alloc(alloc) , TraceId(TStringBuilder() << txId) , TaskId(taskId) , Factory(factory) - , FuncRegistry(funcRegistry) , Invoker(invoker) , Local(Invoker->IsLocal()) , Settings(MakeIntrusive<TDqConfiguration>()) @@ -608,14 +608,6 @@ private: StageId = taskMeta.GetStageId(); NDq::TDqTaskSettings settings(&ev->Get()->Task); - YQL_ENSURE(!Alloc); - YQL_ENSURE(FuncRegistry); - Alloc = std::make_unique<NKikimr::NMiniKQL::TScopedAlloc>( - __LOCATION__, - NKikimr::TAlignedPagePoolCounters(), - FuncRegistry->SupportsSizedAllocators(), - false - ); TaskRunner = Factory->GetOld(*Alloc.get(), settings, TraceId); } catch (...) { TString message = "Could not create TaskRunner for " + ToString(taskId) + " on node " + ToString(replyTo.NodeId()) + ", error: " + CurrentExceptionMessage(); @@ -746,13 +738,12 @@ private: } } - std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; NActors::TActorId ParentId; ITaskRunnerActor::ICallbacks* Parent; + std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; const TString TraceId; const ui64 TaskId; NTaskRunnerProxy::IProxyFactory::TPtr Factory; - const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry; NTaskRunnerProxy::ITaskRunner::TPtr TaskRunner; ITaskRunnerInvoker::TPtr Invoker; bool Local; @@ -773,22 +764,21 @@ public: TTaskRunnerActorFactory( const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory, const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory, - const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, - TWorkerRuntimeData* runtimeData) + TWorkerRuntimeData* runtimeData) : ProxyFactory(proxyFactory) , InvokerFactory(invokerFactory) - , FuncRegistry(funcRegistry) , RuntimeData(runtimeData) { } std::tuple<ITaskRunnerActor*, NActors::IActor*> Create( ITaskRunnerActor::ICallbacks* parent, + std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const TTxId& txId, ui64 taskId, THashSet<ui32>&&, THolder<NYql::NDq::TDqMemoryQuota>&&) override { - auto* actor = new TTaskRunnerActor(parent, ProxyFactory, FuncRegistry, InvokerFactory->Create(), txId, taskId, RuntimeData); + auto* actor = new TTaskRunnerActor(parent, alloc, ProxyFactory, InvokerFactory->Create(), txId, taskId, RuntimeData); return std::make_tuple( static_cast<ITaskRunnerActor*>(actor), static_cast<NActors::IActor*>(actor) @@ -798,17 +788,15 @@ public: private: NTaskRunnerProxy::IProxyFactory::TPtr ProxyFactory; NDqs::ITaskRunnerInvokerFactory::TPtr InvokerFactory; - const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry; TWorkerRuntimeData* RuntimeData; }; ITaskRunnerActorFactory::TPtr CreateTaskRunnerActorFactory( const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory, const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory, - const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, TWorkerRuntimeData* runtimeData) { - return ITaskRunnerActorFactory::TPtr(new TTaskRunnerActorFactory(proxyFactory, invokerFactory, funcRegistry, runtimeData)); + return ITaskRunnerActorFactory::TPtr(new TTaskRunnerActorFactory(proxyFactory, invokerFactory, runtimeData)); } } // namespace NTaskRunnerActor diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h index 9439364618..260b2a7e0d 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h @@ -29,7 +29,6 @@ namespace NTaskRunnerActor { ITaskRunnerActorFactory::TPtr CreateTaskRunnerActorFactory( const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory, const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory, - const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, TWorkerRuntimeData* runtimeData = nullptr); } // namespace NTaskRunnerActor diff --git a/ydb/library/yql/tools/dq/worker_node/main.cpp b/ydb/library/yql/tools/dq/worker_node/main.cpp index 16d0e1c49a..9e1a9e56f9 100644 --- a/ydb/library/yql/tools/dq/worker_node/main.cpp +++ b/ydb/library/yql/tools/dq/worker_node/main.cpp @@ -400,11 +400,11 @@ int main(int argc, char** argv) { : TTaskRunnerInvokerFactory::TPtr(new TConcurrentInvokerFactory(2*capacity)); YQL_ENSURE(functionRegistry); lwmOptions.TaskRunnerActorFactory = disablePipe - ? NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(*functionRegistry.Get(), [=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& ) + ? NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory([=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& ) { return lwmOptions.Factory->Get(alloc, task, statsMode); }) - : NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory, functionRegistry.Get()); + : NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory); lwmOptions.ComputeActorOwnsCounters = true; bool enableSpilling = res.Has("enable-spilling"); auto resman = NDqs::CreateLocalWorkerManager(lwmOptions); |