aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2024-01-31 11:14:47 +0300
committerGitHub <noreply@github.com>2024-01-31 11:14:47 +0300
commit90d11102bc60465d4d581372bd1a3a5c114dbae0 (patch)
tree7b21d9244de44d8a05f12709607c95b5de0d2e03
parentd224e9843d2bd10b29b0eb2067b1bc13c2a6040c (diff)
downloadydb-90d11102bc60465d4d581372bd1a3a5c114dbae0.tar.gz
YQL-17542 pass allocator from ComputeActor to TaskRunnerActor (#1445)
-rw-r--r--ydb/core/fq/libs/init/init.cpp1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h3
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor.h5
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp24
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp11
-rw-r--r--ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp2
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp1
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp26
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h1
-rw-r--r--ydb/library/yql/tools/dq/worker_node/main.cpp4
11 files changed, 36 insertions, 44 deletions
diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp
index b2158db7df..0714425a6e 100644
--- a/ydb/core/fq/libs/init/init.cpp
+++ b/ydb/core/fq/libs/init/init.cpp
@@ -258,7 +258,6 @@ void Init(
lwmOptions.MkqlProgramHardMemoryLimit = protoConfig.GetResourceManager().GetMkqlTaskHardMemoryLimit();
lwmOptions.MkqlMinAllocSize = mkqlAllocSize;
lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
- *appData->FunctionRegistry,
[=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) {
return lwmOptions.Factory->Get(alloc, task, statsMode);
});
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 50ead996d9..4c2b2beedf 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -113,7 +113,7 @@ public:
}
}
std::tie(TaskRunnerActor, actor) = TaskRunnerActorFactory->Create(
- this, GetTxId(), Task.GetId(), std::move(inputWithDisabledCheckpointing), InitMemoryQuota());
+ this, TBase::GetAllocatorPtr(), GetTxId(), Task.GetId(), std::move(inputWithDisabledCheckpointing), InitMemoryQuota());
TaskRunnerActorId = RegisterWithSameMailbox(actor);
TDqTaskRunnerMemoryLimits limits;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 94aaa3811e..5b4a3fa597 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1299,6 +1299,9 @@ protected:
}
protected:
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> GetAllocatorPtr() {
+ return Alloc;
+ }
NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() {
return *Alloc.get();
}
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h b/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h
index ebd3dd0838..295e5ce693 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h
@@ -43,13 +43,16 @@ struct ITaskRunnerActorFactory {
virtual std::tuple<ITaskRunnerActor*, NActors::IActor*> Create(
ITaskRunnerActor::ICallbacks* parent,
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const TTxId& txId,
ui64 taskId,
THashSet<ui32>&& inputChannelsWithDisabledCheckpoints = {},
THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota = {}) = 0;
};
-ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory);
+// temporary for YQL-17542
+#define Y_YQL_DQ_TASK_RUNNER_ACTOR_FACTORY_COMPATIBILITY_1
+ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory);
} // namespace NTaskRunnerActor
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index 9cf25192b6..2cb8e72eeb 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -38,9 +38,9 @@ class TLocalTaskRunnerActor
public:
static constexpr char ActorName[] = "YQL_DQ_TASK_RUNNER";
- TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota)
+ TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota)
: TActor<TLocalTaskRunnerActor>(&TLocalTaskRunnerActor::Handler)
- , FuncRegistry(funcRegistry)
+ , Alloc(alloc)
, Parent(parent)
, Factory(factory)
, TxId(txId)
@@ -48,12 +48,6 @@ public:
, InputChannelsWithDisabledCheckpoints(std::move(inputChannelsWithDisabledCheckpoints))
, MemoryQuota(std::move(memoryQuota))
{
- Alloc = std::make_unique<NKikimr::NMiniKQL::TScopedAlloc>(
- __LOCATION__,
- NKikimr::TAlignedPagePoolCounters(),
- FuncRegistry.SupportsSizedAllocators(),
- false
- );
}
~TLocalTaskRunnerActor()
@@ -471,8 +465,7 @@ private:
THolder<TEvDq::TEvAbortExecution> GetError(const TString& message) {
return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::BAD_REQUEST, TVector<TIssue>{TIssue(message).SetCode(TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR)});
}
- const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry;
- std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NActors::TActorId ParentId;
ITaskRunnerActor::ICallbacks* Parent;
@@ -487,19 +480,19 @@ private:
};
struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory {
- TLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory)
+ TLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory)
: Factory(factory)
- , FuncRegistry(funcRegistry)
{ }
std::tuple<ITaskRunnerActor*, NActors::IActor*> Create(
ITaskRunnerActor::ICallbacks* parent,
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const TTxId& txId,
ui64 taskId,
THashSet<ui32>&& inputChannelsWithDisabledCheckpoints,
THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota) override
{
- auto* actor = new TLocalTaskRunnerActor(parent, Factory, FuncRegistry, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota));
+ auto* actor = new TLocalTaskRunnerActor(parent, Factory, alloc, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota));
return std::make_tuple(
static_cast<ITaskRunnerActor*>(actor),
static_cast<NActors::IActor*>(actor)
@@ -507,12 +500,11 @@ struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory {
}
TTaskRunnerFactory Factory;
- const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry;
};
-ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory)
+ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory)
{
- return ITaskRunnerActorFactory::TPtr(new TLocalTaskRunnerActorFactory(funcRegistry, factory));
+ return ITaskRunnerActorFactory::TPtr(new TLocalTaskRunnerActorFactory(factory));
}
} // namespace NTaskRunnerActor
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index 0d1e760b2e..a4588de0a1 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -253,7 +253,16 @@ private:
}
NActors::IActor* actor;
- std::tie(Actor, actor) = TaskRunnerActorFactory->Create(this, TraceId, Task.GetId());
+ std::tie(Actor, actor) = TaskRunnerActorFactory->Create(
+ this,
+ std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(
+ __LOCATION__,
+ NKikimr::TAlignedPagePoolCounters(),
+ true,
+ false
+ ),
+ TraceId,
+ Task.GetId());
TaskRunnerActor = RegisterLocalChild(actor);
TDqTaskRunnerMemoryLimits limits; // used for local mode only
limits.ChannelBufferSize = 20_MB;
diff --git a/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp b/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp
index 2f4f4ed032..07438e01c6 100644
--- a/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp
+++ b/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp
@@ -182,7 +182,7 @@ public:
NYql::NDqs::TLocalWorkerManagerOptions lwmOptions;
lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory();
lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateTaskRunnerActorFactory(
- lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory, nullptr);
+ lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory);
auto localWM = CreateLocalWorkerManager(lwmOptions);
ActorRuntime_->AddLocalService(MakeWorkerManagerActorID(NodeId(i)),
TActorSetupCmd{localWM, TMailboxType::Simple, 0}, i);
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
index 06b76ae212..ac5552080c 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
@@ -67,7 +67,6 @@ public:
lwmOptions.FunctionRegistry = functionRegistry;
lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory();
lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
- *functionRegistry,
[factory=lwmOptions.Factory](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& )
{
return factory->Get(alloc, task, statsMode);
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index 771e27e764..55f844a076 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -191,18 +191,18 @@ public:
TTaskRunnerActor(
ITaskRunnerActor::ICallbacks* parent,
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const NTaskRunnerProxy::IProxyFactory::TPtr& factory,
- const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry,
const ITaskRunnerInvoker::TPtr& invoker,
const TTxId& txId,
ui64 taskId,
TWorkerRuntimeData* runtimeData)
: TActor<TTaskRunnerActor>(&TTaskRunnerActor::Handler)
, Parent(parent)
+ , Alloc(alloc)
, TraceId(TStringBuilder() << txId)
, TaskId(taskId)
, Factory(factory)
- , FuncRegistry(funcRegistry)
, Invoker(invoker)
, Local(Invoker->IsLocal())
, Settings(MakeIntrusive<TDqConfiguration>())
@@ -608,14 +608,6 @@ private:
StageId = taskMeta.GetStageId();
NDq::TDqTaskSettings settings(&ev->Get()->Task);
- YQL_ENSURE(!Alloc);
- YQL_ENSURE(FuncRegistry);
- Alloc = std::make_unique<NKikimr::NMiniKQL::TScopedAlloc>(
- __LOCATION__,
- NKikimr::TAlignedPagePoolCounters(),
- FuncRegistry->SupportsSizedAllocators(),
- false
- );
TaskRunner = Factory->GetOld(*Alloc.get(), settings, TraceId);
} catch (...) {
TString message = "Could not create TaskRunner for " + ToString(taskId) + " on node " + ToString(replyTo.NodeId()) + ", error: " + CurrentExceptionMessage();
@@ -746,13 +738,12 @@ private:
}
}
- std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NActors::TActorId ParentId;
ITaskRunnerActor::ICallbacks* Parent;
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
const TString TraceId;
const ui64 TaskId;
NTaskRunnerProxy::IProxyFactory::TPtr Factory;
- const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry;
NTaskRunnerProxy::ITaskRunner::TPtr TaskRunner;
ITaskRunnerInvoker::TPtr Invoker;
bool Local;
@@ -773,22 +764,21 @@ public:
TTaskRunnerActorFactory(
const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory,
const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory,
- const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry,
- TWorkerRuntimeData* runtimeData)
+ TWorkerRuntimeData* runtimeData)
: ProxyFactory(proxyFactory)
, InvokerFactory(invokerFactory)
- , FuncRegistry(funcRegistry)
, RuntimeData(runtimeData)
{ }
std::tuple<ITaskRunnerActor*, NActors::IActor*> Create(
ITaskRunnerActor::ICallbacks* parent,
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const TTxId& txId,
ui64 taskId,
THashSet<ui32>&&,
THolder<NYql::NDq::TDqMemoryQuota>&&) override
{
- auto* actor = new TTaskRunnerActor(parent, ProxyFactory, FuncRegistry, InvokerFactory->Create(), txId, taskId, RuntimeData);
+ auto* actor = new TTaskRunnerActor(parent, alloc, ProxyFactory, InvokerFactory->Create(), txId, taskId, RuntimeData);
return std::make_tuple(
static_cast<ITaskRunnerActor*>(actor),
static_cast<NActors::IActor*>(actor)
@@ -798,17 +788,15 @@ public:
private:
NTaskRunnerProxy::IProxyFactory::TPtr ProxyFactory;
NDqs::ITaskRunnerInvokerFactory::TPtr InvokerFactory;
- const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry;
TWorkerRuntimeData* RuntimeData;
};
ITaskRunnerActorFactory::TPtr CreateTaskRunnerActorFactory(
const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory,
const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory,
- const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry,
TWorkerRuntimeData* runtimeData)
{
- return ITaskRunnerActorFactory::TPtr(new TTaskRunnerActorFactory(proxyFactory, invokerFactory, funcRegistry, runtimeData));
+ return ITaskRunnerActorFactory::TPtr(new TTaskRunnerActorFactory(proxyFactory, invokerFactory, runtimeData));
}
} // namespace NTaskRunnerActor
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h
index 9439364618..260b2a7e0d 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h
@@ -29,7 +29,6 @@ namespace NTaskRunnerActor {
ITaskRunnerActorFactory::TPtr CreateTaskRunnerActorFactory(
const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory,
const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory,
- const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry,
TWorkerRuntimeData* runtimeData = nullptr);
} // namespace NTaskRunnerActor
diff --git a/ydb/library/yql/tools/dq/worker_node/main.cpp b/ydb/library/yql/tools/dq/worker_node/main.cpp
index 16d0e1c49a..9e1a9e56f9 100644
--- a/ydb/library/yql/tools/dq/worker_node/main.cpp
+++ b/ydb/library/yql/tools/dq/worker_node/main.cpp
@@ -400,11 +400,11 @@ int main(int argc, char** argv) {
: TTaskRunnerInvokerFactory::TPtr(new TConcurrentInvokerFactory(2*capacity));
YQL_ENSURE(functionRegistry);
lwmOptions.TaskRunnerActorFactory = disablePipe
- ? NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(*functionRegistry.Get(), [=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& )
+ ? NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory([=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& )
{
return lwmOptions.Factory->Get(alloc, task, statsMode);
})
- : NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory, functionRegistry.Get());
+ : NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory);
lwmOptions.ComputeActorOwnsCounters = true;
bool enableSpilling = res.Has("enable-spilling");
auto resman = NDqs::CreateLocalWorkerManager(lwmOptions);