aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2024-01-26 18:14:35 +0300
committerGitHub <noreply@github.com>2024-01-26 18:14:35 +0300
commitbc843db89bbfff632aaa66932069ae506ecdbfe6 (patch)
tree2e301583c19a0e1bc249bd3807532116b9d27cc6
parent9f3844c68cef0369e5b2daae02e19c21b23ee65e (diff)
downloadydb-bc843db89bbfff632aaa66932069ae506ecdbfe6.tar.gz
YQL-17542 move allocator ownership from TDqTaskRunner to actors (#1335)
-rw-r--r--ydb/core/fq/libs/init/init.cpp6
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp3
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_literal_executer.cpp13
-rw-r--r--ydb/core/kqp/runtime/kqp_tasks_runner.cpp13
-rw-r--r--ydb/core/kqp/runtime/kqp_tasks_runner.h6
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp5
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.h5
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h21
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor.h2
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp26
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp48
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h11
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.cpp4
-rw-r--r--ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp2
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp5
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp7
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp14
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp8
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp14
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h4
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp22
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h1
-rw-r--r--ydb/library/yql/tools/dq/worker_node/main.cpp8
25 files changed, 152 insertions, 101 deletions
diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp
index 3204f11ec7..b2158db7df 100644
--- a/ydb/core/fq/libs/init/init.cpp
+++ b/ydb/core/fq/libs/init/init.cpp
@@ -246,6 +246,7 @@ void Init(
if (!mkqlAllocSize) {
mkqlAllocSize = 30_MB;
}
+ Y_ABORT_UNLESS(appData->FunctionRegistry);
NYql::NDqs::TLocalWorkerManagerOptions lwmOptions;
lwmOptions.Counters = workerManagerCounters;
lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, nullptr, false);
@@ -257,8 +258,9 @@ void Init(
lwmOptions.MkqlProgramHardMemoryLimit = protoConfig.GetResourceManager().GetMkqlTaskHardMemoryLimit();
lwmOptions.MkqlMinAllocSize = mkqlAllocSize;
lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
- [=](const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) {
- return lwmOptions.Factory->Get(task, statsMode);
+ *appData->FunctionRegistry,
+ [=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) {
+ return lwmOptions.Factory->Get(alloc, task, statsMode);
});
if (protoConfig.GetRateLimiter().GetDataPlaneEnabled()) {
lwmOptions.QuoterServiceActorId = NFq::YqQuoterServiceActorId();
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index 1dfe1fbab6..cc9a3dcae6 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -46,7 +46,6 @@ void TKqpComputeActor::DoBootstrap() {
execCtx.ComputeCtx = &ComputeCtx;
execCtx.ComputationFactory = NMiniKQL::GetKqpActorComputeFactory(&ComputeCtx);
execCtx.ApplyCtx = nullptr;
- execCtx.Alloc = nullptr;
execCtx.TypeEnv = nullptr;
execCtx.PatternCache = GetKqpResourceManager()->GetPatternCache();
@@ -68,7 +67,7 @@ void TKqpComputeActor::DoBootstrap() {
settings.ReadRanges.push_back(readRange);
}
- auto taskRunner = MakeDqTaskRunner(execCtx, settings, logger);
+ auto taskRunner = MakeDqTaskRunner(TBase::GetAllocator(), execCtx, settings, logger);
SetTaskRunner(taskRunner);
auto wakeup = [this]{ ContinueExecute(); };
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index c60a029b7d..f6a48b208c 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -188,7 +188,6 @@ void TKqpScanComputeActor::DoBootstrap() {
execCtx.RandomProvider = TAppData::RandomProvider.Get();
execCtx.TimeProvider = TAppData::TimeProvider.Get();
execCtx.ApplyCtx = nullptr;
- execCtx.Alloc = nullptr;
execCtx.TypeEnv = nullptr;
execCtx.PatternCache = GetKqpResourceManager()->GetPatternCache();
@@ -219,7 +218,7 @@ void TKqpScanComputeActor::DoBootstrap() {
};
}
- auto taskRunner = MakeDqTaskRunner(execCtx, settings, logger);
+ auto taskRunner = MakeDqTaskRunner(GetAllocator(), execCtx, settings, logger);
TBase::SetTaskRunner(taskRunner);
auto wakeup = [this] { ContinueExecute(); };
diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
index 5156ed69e9..3d1383bf08 100644
--- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
@@ -19,7 +19,7 @@ using namespace NYql::NDq;
namespace {
-std::unique_ptr<TDqTaskRunnerContext> CreateTaskRunnerContext(NMiniKQL::TKqpComputeContextBase* computeCtx, NMiniKQL::TScopedAlloc* alloc,
+std::unique_ptr<TDqTaskRunnerContext> CreateTaskRunnerContext(NMiniKQL::TKqpComputeContextBase* computeCtx,
NMiniKQL::TTypeEnvironment* typeEnv)
{
std::unique_ptr<TDqTaskRunnerContext> context = std::make_unique<TDqTaskRunnerContext>();
@@ -44,7 +44,6 @@ std::unique_ptr<TDqTaskRunnerContext> CreateTaskRunnerContext(NMiniKQL::TKqpComp
return nullptr;
};
- context->Alloc = alloc;
context->TypeEnv = typeEnv;
context->ApplyCtx = nullptr;
return context;
@@ -167,12 +166,12 @@ public:
// task runner settings
ComputeCtx = std::make_unique<NMiniKQL::TKqpComputeContextBase>();
- RunnerContext = CreateTaskRunnerContext(ComputeCtx.get(), &Request.TxAlloc->Alloc, &Request.TxAlloc->TypeEnv);
+ RunnerContext = CreateTaskRunnerContext(ComputeCtx.get(), &Request.TxAlloc->TypeEnv);
RunnerContext->PatternCache = GetKqpResourceManager()->GetPatternCache();
TDqTaskRunnerSettings settings = CreateTaskRunnerSettings(Request.StatsMode);
for (auto& task : TasksGraph.GetTasks()) {
- RunTask(task, *RunnerContext, settings);
+ RunTask(Request.TxAlloc->Alloc, task, *RunnerContext, settings);
if (TerminateIfTimeout()) {
return;
@@ -183,7 +182,7 @@ public:
UpdateCounters();
}
- void RunTask(TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) {
+ void RunTask(NMiniKQL::TScopedAlloc& alloc, TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) {
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
@@ -218,7 +217,7 @@ public:
<< message);
};
- auto taskRunner = MakeDqTaskRunner(context, settings, log);
+ auto taskRunner = MakeDqTaskRunner(alloc, context, settings, log);
TaskRunners.emplace_back(taskRunner);
auto taskSettings = NDq::TDqTaskSettings(&protoTask);
@@ -228,7 +227,7 @@ public:
auto status = taskRunner->Run();
YQL_ENSURE(status == ERunStatus::Finished);
- with_lock (*context.Alloc) { // allocator is used only by outputChannel->PopAll()
+ with_lock (alloc) { // allocator is used only by outputChannel->PopAll()
for (auto& taskOutput : task.Outputs) {
for (ui64 outputChannelId : taskOutput.Channels) {
auto outputChannel = taskRunner->GetOutputChannel(outputChannelId);
diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
index ce0f9fc211..d9c4aa41bf 100644
--- a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
+++ b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
@@ -70,11 +70,11 @@ IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outp
TKqpTasksRunner::TKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks,
+ NKikimr::NMiniKQL::TScopedAlloc& alloc,
const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
: LogFunc(logFunc)
- , Alloc(execCtx.Alloc)
+ , Alloc(alloc)
{
- YQL_ENSURE(execCtx.Alloc);
YQL_ENSURE(execCtx.TypeEnv);
ApplyCtx = dynamic_cast<NMiniKQL::TKqpDatashardApplyContext *>(execCtx.ApplyCtx);
@@ -86,7 +86,7 @@ TKqpTasksRunner::TKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TD
try {
for (auto&& task : tasks) {
ui64 taskId = task.GetId();
- auto runner = MakeDqTaskRunner(execCtx, settings, logFunc);
+ auto runner = MakeDqTaskRunner(alloc, execCtx, settings, logFunc);
if (auto* stats = runner->GetStats()) {
Stats.emplace(taskId, stats);
}
@@ -230,15 +230,16 @@ const NYql::NDq::TDqTaskSettings& TKqpTasksRunner::GetTask(ui64 taskId) const {
TGuard<NMiniKQL::TScopedAlloc> TKqpTasksRunner::BindAllocator(TMaybe<ui64> memoryLimit) {
if (memoryLimit) {
- Alloc->SetLimit(*memoryLimit);
+ Alloc.SetLimit(*memoryLimit);
}
- return TGuard(*Alloc);
+ return TGuard(Alloc);
}
TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks,
+ NKikimr::NMiniKQL::TScopedAlloc& alloc,
const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
{
- return new TKqpTasksRunner(std::move(tasks), execCtx, settings, logFunc);
+ return new TKqpTasksRunner(std::move(tasks), alloc, execCtx, settings, logFunc);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.h b/ydb/core/kqp/runtime/kqp_tasks_runner.h
index 9835aeb4b2..ec84181050 100644
--- a/ydb/core/kqp/runtime/kqp_tasks_runner.h
+++ b/ydb/core/kqp/runtime/kqp_tasks_runner.h
@@ -16,6 +16,7 @@ NYql::NDq::IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NYql::NDqProto::
class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCopyable {
public:
TKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks,
+ NKikimr::NMiniKQL::TScopedAlloc& alloc,
const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings,
const NYql::NDq::TLogFunc& logFunc);
@@ -50,7 +51,7 @@ public:
// otherwise use particular memory limit
TGuard<NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit = Nothing());
- ui64 GetAllocatedMemory() const { return Alloc->GetAllocated(); }
+ ui64 GetAllocatedMemory() const { return Alloc.GetAllocated(); }
const TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> GetTasksStats() const { return Stats; }
private:
@@ -58,7 +59,7 @@ private:
TMap<ui64, NYql::NDq::TDqTaskSettings> Tasks;
TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> Stats;
NYql::NDq::TLogFunc LogFunc;
- NMiniKQL::TScopedAlloc* Alloc;
+ NMiniKQL::TScopedAlloc& Alloc;
NMiniKQL::TKqpComputeContextBase* ComputeCtx;
NMiniKQL::TKqpDatashardApplyContext* ApplyCtx;
@@ -72,6 +73,7 @@ private:
TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks,
+ NKikimr::NMiniKQL::TScopedAlloc& alloc,
const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings,
const NYql::NDq::TLogFunc& logFunc);
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 086dcffd3a..8e2d2cbc47 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -521,7 +521,6 @@ TEngineBay::TEngineBay(TDataShard* self, TTransactionContext& txc, const TActorC
KqpExecCtx.RandomProvider = TAppData::RandomProvider.Get();
KqpExecCtx.TimeProvider = TAppData::TimeProvider.Get();
KqpExecCtx.ApplyCtx = KqpApplyCtx.Get();
- KqpExecCtx.Alloc = KqpAlloc.Get();
KqpExecCtx.TypeEnv = KqpTypeEnv.Get();
if (auto rm = NKqp::TryGetKqpResourceManager()) {
KqpExecCtx.PatternCache = rm->GetPatternCache();
@@ -701,9 +700,9 @@ NKqp::TKqpTasksRunner& TEngineBay::GetKqpTasksRunner(NKikimrTxDataShard::TKqpTra
settings.OptLLVM = "OFF";
settings.TerminateOnError = false;
-
+ Y_ABORT_UNLESS(KqpAlloc);
KqpAlloc->SetLimit(10_MB);
- KqpTasksRunner = NKqp::CreateKqpTasksRunner(std::move(*tx.MutableTasks()), KqpExecCtx, settings, KqpLogFunc);
+ KqpTasksRunner = NKqp::CreateKqpTasksRunner(std::move(*tx.MutableTasks()), *KqpAlloc.Get(), KqpExecCtx, settings, KqpLogFunc);
}
return *KqpTasksRunner;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
index 1811bb4b17..ccb6b638e6 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
@@ -54,7 +54,7 @@ public:
};
}
- auto taskRunner = TaskRunnerFactory(Task, RuntimeSettings.StatsMode, logger);
+ auto taskRunner = TaskRunnerFactory(GetAllocator(), Task, RuntimeSettings.StatsMode, logger);
SetTaskRunner(taskRunner);
auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
TDqTaskRunnerExecutionContext execCtx(TxId, RuntimeSettings.UseSpilling, std::move(wakeup));
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
index 5aac450998..f2ec5e2d9f 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
@@ -364,8 +364,11 @@ struct TComputeMemoryLimits {
IMemoryQuotaManager::TPtr MemoryQuotaManager;
};
+//temporary flag to integarate changes in interface
+#define Y_YQL_DQ_TASK_RUNNER_REQUIRES_ALLOCATOR 1
+
using TTaskRunnerFactory = std::function<
- TIntrusivePtr<IDqTaskRunner>(const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TLogFunc& logFunc)
+ TIntrusivePtr<IDqTaskRunner>(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TLogFunc& logFunc)
>;
void FillAsyncStats(NDqProto::TDqAsyncBufferStats& proto, TDqAsyncStats stats);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index bfd081e4cc..854f63e523 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -206,6 +206,12 @@ protected:
, Running(!Task.GetCreateSuspended())
, PassExceptions(passExceptions)
{
+ Alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(
+ __LOCATION__,
+ NKikimr::TAlignedPagePoolCounters(),
+ FunctionRegistry->SupportsSizedAllocators(),
+ false
+ );
InitMonCounters(taskCounters);
InitializeTask();
if (ownMemoryQuota) {
@@ -626,8 +632,8 @@ protected:
void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssues issues) {
CA_LOG_E(InternalErrorLogString(statusCode, issues));
if (TaskRunner) {
- TaskRunner->GetAllocatorPtr()->InvalidateMemInfo();
- TaskRunner->GetAllocatorPtr()->DisableStrictAllocationCheck();
+ TaskRunner->GetAllocator().InvalidateMemInfo();
+ TaskRunner->GetAllocator().DisableStrictAllocationCheck();
}
State = NDqProto::COMPUTE_STATE_FAILURE;
ReportStateAndMaybeDie(statusCode, issues);
@@ -1365,6 +1371,10 @@ protected:
}
}
+protected:
+ NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() {
+ return *Alloc.get();
+ }
private:
virtual const TDqMemoryQuota::TProfileStats* GetMemoryProfileStats() const {
Y_ABORT_UNLESS(MemoryQuota);
@@ -1586,7 +1596,7 @@ protected:
.TypeEnv = typeEnv,
.HolderFactory = holderFactory,
.TaskCounters = TaskCounters,
- .Alloc = TaskRunner ? TaskRunner->GetAllocatorPtr() : nullptr,
+ .Alloc = TaskRunner ? Alloc : nullptr,
.MemoryQuotaManager = MemoryLimits.MemoryQuotaManager,
.SourceSettings = (!settings.empty() ? settings.at(inputIndex) : nullptr),
.Arena = Task.GetArena(),
@@ -1619,7 +1629,7 @@ protected:
.TypeEnv = typeEnv,
.HolderFactory = holderFactory,
.ProgramBuilder = *transform.ProgramBuilder,
- .Alloc = TaskRunner->GetAllocatorPtr(),
+ .Alloc = Alloc,
.TraceId = ComputeActorSpan.GetTraceId()
});
} catch (const std::exception& ex) {
@@ -2222,7 +2232,8 @@ protected:
LastSendStatsTime = now;
}
-
+private:
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; //must be declared on top to be destroyed after all the rest
protected:
const NActors::TActorId ExecuterId;
const TTxId TxId;
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h b/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h
index 9ae31976c1..ebd3dd0838 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h
@@ -49,7 +49,7 @@ struct ITaskRunnerActorFactory {
THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota = {}) = 0;
};
-ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory);
+ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory);
} // namespace NTaskRunnerActor
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index f0bb500984..9cf25192b6 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -38,15 +38,23 @@ class TLocalTaskRunnerActor
public:
static constexpr char ActorName[] = "YQL_DQ_TASK_RUNNER";
- TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota)
+ TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota)
: TActor<TLocalTaskRunnerActor>(&TLocalTaskRunnerActor::Handler)
+ , FuncRegistry(funcRegistry)
, Parent(parent)
, Factory(factory)
, TxId(txId)
, TaskId(taskId)
, InputChannelsWithDisabledCheckpoints(std::move(inputChannelsWithDisabledCheckpoints))
, MemoryQuota(std::move(memoryQuota))
- { }
+ {
+ Alloc = std::make_unique<NKikimr::NMiniKQL::TScopedAlloc>(
+ __LOCATION__,
+ NKikimr::TAlignedPagePoolCounters(),
+ FuncRegistry.SupportsSizedAllocators(),
+ false
+ );
+ }
~TLocalTaskRunnerActor()
{ }
@@ -407,7 +415,7 @@ private:
void OnDqTask(TEvTaskRunnerCreate::TPtr& ev) {
ParentId = ev->Sender;
auto settings = NDq::TDqTaskSettings(&ev->Get()->Task);
- TaskRunner = Factory(settings, ev->Get()->StatsMode, [this](const TString& message) {
+ TaskRunner = Factory(*Alloc.get(), settings, ev->Get()->StatsMode, [this](const TString& message) {
LOG_D(message);
});
@@ -463,6 +471,8 @@ private:
THolder<TEvDq::TEvAbortExecution> GetError(const TString& message) {
return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::BAD_REQUEST, TVector<TIssue>{TIssue(message).SetCode(TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR)});
}
+ const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry;
+ std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NActors::TActorId ParentId;
ITaskRunnerActor::ICallbacks* Parent;
@@ -477,8 +487,9 @@ private:
};
struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory {
- TLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory)
+ TLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory)
: Factory(factory)
+ , FuncRegistry(funcRegistry)
{ }
std::tuple<ITaskRunnerActor*, NActors::IActor*> Create(
@@ -488,7 +499,7 @@ struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory {
THashSet<ui32>&& inputChannelsWithDisabledCheckpoints,
THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota) override
{
- auto* actor = new TLocalTaskRunnerActor(parent, Factory, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota));
+ auto* actor = new TLocalTaskRunnerActor(parent, Factory, FuncRegistry, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota));
return std::make_tuple(
static_cast<ITaskRunnerActor*>(actor),
static_cast<NActors::IActor*>(actor)
@@ -496,11 +507,12 @@ struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory {
}
TTaskRunnerFactory Factory;
+ const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry;
};
-ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory)
+ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory)
{
- return ITaskRunnerActorFactory::TPtr(new TLocalTaskRunnerActorFactory(factory));
+ return ITaskRunnerActorFactory::TPtr(new TLocalTaskRunnerActorFactory(funcRegistry, factory));
}
} // namespace NTaskRunnerActor
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 4fa8b13063..07c85ab6e6 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -223,7 +223,7 @@ inline TCollectStatsLevel StatsModeToCollectStatsLevel(NDqProto::EDqStatsMode st
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class TDqTaskRunner : public IDqTaskRunner {
public:
- TDqTaskRunner(const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
+ TDqTaskRunner(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
: Context(context)
, Settings(settings)
, LogFunc(logFunc)
@@ -237,27 +237,18 @@ public:
}
}
- if (!Context.Alloc) {
- SelfAlloc = std::shared_ptr<TScopedAlloc>(new TScopedAlloc(
- __LOCATION__,
- TAlignedPagePoolCounters(),
- Context.FuncRegistry->SupportsSizedAllocators(),
- false
- ));
+ if (Context.TypeEnv) {
+ YQL_ENSURE(std::addressof(alloc) == std::addressof(TypeEnv().GetAllocator()));
+ } else {
+ AllocatedHolder->SelfTypeEnv = std::make_unique<TTypeEnvironment>(alloc);
}
-
- if (!Context.TypeEnv) {
- AllocatedHolder->SelfTypeEnv = std::make_unique<TTypeEnvironment>(Alloc());
- }
- YQL_ENSURE(std::addressof(Alloc()) == std::addressof(TypeEnv().GetAllocator()));
+
}
~TDqTaskRunner() {
- if (SelfAlloc) {
- auto guard = Guard(*SelfAlloc.get());
- Stats.reset();
- AllocatedHolder.reset();
- }
+ auto guard = Guard(Alloc());
+ Stats.reset();
+ AllocatedHolder.reset();
}
bool CollectFull() const {
@@ -815,10 +806,9 @@ public:
const NKikimr::NMiniKQL::THolderFactory& GetHolderFactory() const override {
return AllocatedHolder->ProgramParsed.CompGraph->GetHolderFactory();
}
-
- std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> GetAllocatorPtr() const override {
- YQL_ENSURE(SelfAlloc);
- return SelfAlloc;
+
+ NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() const override {
+ return Alloc();
}
const THashMap<TString, TString>& GetSecureParams() const override {
@@ -851,14 +841,13 @@ public:
}
private:
- NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv() {
+ NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv() const {
return Context.TypeEnv ? *Context.TypeEnv : *AllocatedHolder->SelfTypeEnv;
}
- NKikimr::NMiniKQL::TScopedAlloc& Alloc() {
- return Context.Alloc ? *Context.Alloc : *SelfAlloc;
+ NKikimr::NMiniKQL::TScopedAlloc& Alloc() const {
+ return GetTypeEnv().GetAllocator();
}
-
void FinishImpl() {
LOG(TStringBuilder() << "task" << TaskId << ", execution finished, finish consumers");
AllocatedHolder->Output->Finish();
@@ -938,9 +927,6 @@ private:
TDqTaskRunnerSettings Settings;
TLogFunc LogFunc;
std::unique_ptr<NUdf::ISecureParamsProvider> SecureParamsProvider;
-
- std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> SelfAlloc; // if not set -> use Context.Alloc
-
struct TInputTransformInfo {
NUdf::TUnboxedValue TransformInput;
IDqAsyncInputBuffer::TPtr TransformOutput;
@@ -1019,10 +1005,10 @@ private:
}
};
-TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(const TDqTaskRunnerContext& ctx, const TDqTaskRunnerSettings& settings,
+TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskRunnerContext& ctx, const TDqTaskRunnerSettings& settings,
const TLogFunc& logFunc)
{
- return new TDqTaskRunner(ctx, settings, logFunc);
+ return new TDqTaskRunner(alloc, ctx, settings, logFunc);
}
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index b50f1ba904..b5632c0005 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -121,7 +121,6 @@ struct TDqTaskRunnerContext {
NKikimr::NMiniKQL::TComputationNodeFactory ComputationFactory;
NUdf::IApplyContext* ApplyCtx = nullptr;
NKikimr::NMiniKQL::TCallableVisitFuncProvider FuncProvider;
- NKikimr::NMiniKQL::TScopedAlloc* Alloc = nullptr;
NKikimr::NMiniKQL::TTypeEnvironment* TypeEnv = nullptr;
std::shared_ptr<NKikimr::NMiniKQL::TComputationPatternLRUCache> PatternCache;
};
@@ -380,7 +379,7 @@ public:
virtual bool IsAllocatorAttached() = 0;
virtual const NKikimr::NMiniKQL::TTypeEnvironment& GetTypeEnv() const = 0;
virtual const NKikimr::NMiniKQL::THolderFactory& GetHolderFactory() const = 0;
- virtual std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> GetAllocatorPtr() const = 0;
+ virtual NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() const = 0;
virtual const THashMap<TString, TString>& GetSecureParams() const = 0;
virtual const THashMap<TString, TString>& GetTaskParams() const = 0;
@@ -397,8 +396,12 @@ public:
virtual const NKikimr::NMiniKQL::TWatermark& GetWatermark() const = 0;
};
-TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(const TDqTaskRunnerContext& ctx, const TDqTaskRunnerSettings& settings,
- const TLogFunc& logFunc);
+TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(
+ NKikimr::NMiniKQL::TScopedAlloc& alloc,
+ const TDqTaskRunnerContext& ctx,
+ const TDqTaskRunnerSettings& settings,
+ const TLogFunc& logFunc
+);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
index ea95feb7da..80d5310d8e 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
@@ -58,9 +58,9 @@ IActor* CreateComputeActor(
}
}
- auto taskRunnerFactory = [factory = options.Factory](const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& logger) {
+ auto taskRunnerFactory = [factory = options.Factory](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& logger) {
Y_UNUSED(logger);
- return factory->Get(task, statsMode, {});
+ return factory->Get(alloc, task, statsMode, {});
};
if (computeActorType.empty() || computeActorType == "old" || computeActorType == "sync") {
diff --git a/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp b/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp
index 07438e01c6..2f4f4ed032 100644
--- a/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp
+++ b/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp
@@ -182,7 +182,7 @@ public:
NYql::NDqs::TLocalWorkerManagerOptions lwmOptions;
lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory();
lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateTaskRunnerActorFactory(
- lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory);
+ lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory, nullptr);
auto localWM = CreateLocalWorkerManager(lwmOptions);
ActorRuntime_->AddLocalService(MakeWorkerManagerActorID(NodeId(i)),
TActorSetupCmd{localWM, TMailboxType::Simple, 0}, i);
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
index 372b7be9fc..70ca324ce3 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
@@ -67,9 +67,10 @@ public:
lwmOptions.FunctionRegistry = functionRegistry;
lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory();
lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
- [factory=lwmOptions.Factory](const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& )
+ *functionRegistry,
+ [factory=lwmOptions.Factory](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& )
{
- return factory->Get(task, statsMode);
+ return factory->Get(alloc, task, statsMode);
});
lwmOptions.Counters = NDqs::TWorkerManagerCounters(lwmGroup);
lwmOptions.DropTaskCountersOnFinish = false;
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index 939a7faafd..da56086027 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -122,6 +122,11 @@ public:
? CreateDeterministicRandomProvider(1)
: State->RandomProvider;
+ TScopedAlloc alloc(
+ __LOCATION__,
+ NKikimr::TAlignedPagePoolCounters(),
+ State->FunctionRegistry->SupportsSizedAllocators(),
+ false);
NDq::TDqTaskRunnerContext executionContext;
executionContext.FuncRegistry = State->FunctionRegistry;
@@ -138,7 +143,7 @@ public:
settings.OptLLVM = "OFF"; // Don't use LLVM for local execution
settings.SecureParams = secureParams;
settings.StatsMode = NDqProto::DQ_STATS_MODE_BASIC;
- auto runner = NDq::MakeDqTaskRunner(executionContext, settings, {});
+ auto runner = NDq::MakeDqTaskRunner(alloc, executionContext, settings, {});
auto runnerSettings = NDq::TDqTaskSettings(&task);
{
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index 2445da2e8a..e92dc952a3 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -724,7 +724,16 @@ public:
Ctx.FuncProvider = TaskTransformFactory(taskParams, Ctx.FuncRegistry);
- Runner = MakeDqTaskRunner(Ctx, settings, nullptr);
+ Y_ABORT_UNLESS(!Alloc);
+ Y_ABORT_UNLESS(FunctionRegistry);
+ Alloc = std::make_unique<NKikimr::NMiniKQL::TScopedAlloc>(
+ __LOCATION__,
+ NKikimr::TAlignedPagePoolCounters(),
+ FunctionRegistry->SupportsSizedAllocators(),
+ false
+ );
+
+ Runner = MakeDqTaskRunner(*Alloc.get(), Ctx, settings, nullptr);
});
auto guard = Runner->BindAllocator(DqConfiguration->MemoryLimit.Get().GetOrElse(0));
@@ -753,7 +762,8 @@ public:
result.Save(&output);
}
-
+private:
+ std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NKikimr::NMiniKQL::TComputationNodeFactory ComputationFactory;
TTaskTransformFactory TaskTransformFactory;
THashMap<TString, i64> CurrentJobStats;
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
index 69395978b4..2d633c5399 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
@@ -220,11 +220,11 @@ public:
ExecutionContext.PatternCache = patternCache;
}
- ITaskRunner::TPtr GetOld(const TDqTaskSettings& task, const TString& traceId) override {
- return new TLocalTaskRunner(task, Get(task, NDqProto::DQ_STATS_MODE_BASIC, traceId));
+ ITaskRunner::TPtr GetOld(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, const TString& traceId) override {
+ return new TLocalTaskRunner(task, Get(alloc, task, NDqProto::DQ_STATS_MODE_BASIC, traceId));
}
- TIntrusivePtr<NDq::IDqTaskRunner> Get(const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TString& traceId) override {
+ TIntrusivePtr<NDq::IDqTaskRunner> Get(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TString& traceId) override {
Y_UNUSED(traceId);
NDq::TDqTaskRunnerSettings settings;
settings.TerminateOnError = TerminateOnError;
@@ -262,7 +262,7 @@ public:
}
auto ctx = ExecutionContext;
ctx.FuncProvider = TaskTransformFactory(settings.TaskParams, ctx.FuncRegistry);
- return MakeDqTaskRunner(ctx, settings, { });
+ return MakeDqTaskRunner(alloc, ctx, settings, { });
}
private:
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 2df5b962bd..a52c0e5256 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -1394,8 +1394,8 @@ public:
return AllocatedHolder->HolderFactory;
}
- std::shared_ptr<NMiniKQL::TScopedAlloc> GetAllocatorPtr() const {
- return Alloc;
+ NMiniKQL::TScopedAlloc& GetAllocator() const {
+ return *Alloc.get();
}
const THashMap<TString, TString>& GetSecureParams() const override {
@@ -1697,8 +1697,8 @@ public:
return Delegate->GetHolderFactory();
}
- std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> GetAllocatorPtr() const override {
- return Delegate->GetAllocatorPtr();
+ NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() const override {
+ return Delegate->GetAllocator();
}
const THashMap<TString, TString>& GetSecureParams() const override {
@@ -1881,7 +1881,8 @@ public:
TaskScheduler.Start();
}
- ITaskRunner::TPtr GetOld(const NDq::TDqTaskSettings& tmp, const TString& traceId) override {
+ ITaskRunner::TPtr GetOld(NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& tmp, const TString& traceId) override {
+ Y_UNUSED(alloc);
Yql::DqsProto::TTaskMeta taskMeta;
tmp.GetMeta().UnpackTo(&taskMeta);
ui64 stageId = taskMeta.GetStageId();
@@ -1890,9 +1891,10 @@ public:
return new TTaskRunner(task, std::move(result), stageId, traceId);
}
- TIntrusivePtr<NDq::IDqTaskRunner> Get(const NDq::TDqTaskSettings& tmp, NDqProto::EDqStatsMode statsMode, const TString& traceId) override
+ TIntrusivePtr<NDq::IDqTaskRunner> Get(NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& tmp, NDqProto::EDqStatsMode statsMode, const TString& traceId) override
{
Y_UNUSED(statsMode);
+ Y_UNUSED(alloc);
Yql::DqsProto::TTaskMeta taskMeta;
tmp.GetMeta().UnpackTo(&taskMeta);
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
index 011d9e196e..ec9fcec176 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
@@ -83,9 +83,9 @@ class IProxyFactory: public TThrRefBase, private TNonCopyable {
public:
using TPtr = TIntrusivePtr<IProxyFactory>;
- virtual ITaskRunner::TPtr GetOld(const NDq::TDqTaskSettings& task, const TString& traceId = "") = 0;
+ virtual ITaskRunner::TPtr GetOld(NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, const TString& traceId = "") = 0;
- virtual TIntrusivePtr<NDq::IDqTaskRunner> Get(const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TString& traceId = "TODO") = 0;
+ virtual TIntrusivePtr<NDq::IDqTaskRunner> Get(NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TString& traceId = "TODO") = 0;
};
} // namespace NYql::NTaskRunnerProxy
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index 093ffbca84..0b33b9c75a 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -192,6 +192,7 @@ public:
TTaskRunnerActor(
ITaskRunnerActor::ICallbacks* parent,
const NTaskRunnerProxy::IProxyFactory::TPtr& factory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry,
const ITaskRunnerInvoker::TPtr& invoker,
const TTxId& txId,
ui64 taskId,
@@ -201,6 +202,7 @@ public:
, TraceId(TStringBuilder() << txId)
, TaskId(taskId)
, Factory(factory)
+ , FuncRegistry(funcRegistry)
, Invoker(invoker)
, Local(Invoker->IsLocal())
, Settings(MakeIntrusive<TDqConfiguration>())
@@ -596,7 +598,15 @@ private:
StageId = taskMeta.GetStageId();
NDq::TDqTaskSettings settings(&ev->Get()->Task);
- TaskRunner = Factory->GetOld(settings, TraceId);
+ YQL_ENSURE(!Alloc);
+ YQL_ENSURE(FuncRegistry);
+ Alloc = std::make_unique<NKikimr::NMiniKQL::TScopedAlloc>(
+ __LOCATION__,
+ NKikimr::TAlignedPagePoolCounters(),
+ FuncRegistry->SupportsSizedAllocators(),
+ false
+ );
+ TaskRunner = Factory->GetOld(*Alloc.get(), settings, TraceId);
} catch (...) {
TString message = "Could not create TaskRunner for " + ToString(taskId) + " on node " + ToString(replyTo.NodeId()) + ", error: " + CurrentExceptionMessage();
Send(replyTo, TEvDq::TEvAbortExecution::InternalError(message), 0, cookie);
@@ -721,11 +731,13 @@ private:
return spillingStorageInfo;
}
+ std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NActors::TActorId ParentId;
ITaskRunnerActor::ICallbacks* Parent;
const TString TraceId;
const ui64 TaskId;
NTaskRunnerProxy::IProxyFactory::TPtr Factory;
+ const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry;
NTaskRunnerProxy::ITaskRunner::TPtr TaskRunner;
ITaskRunnerInvoker::TPtr Invoker;
bool Local;
@@ -746,9 +758,11 @@ public:
TTaskRunnerActorFactory(
const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory,
const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry,
TWorkerRuntimeData* runtimeData)
: ProxyFactory(proxyFactory)
, InvokerFactory(invokerFactory)
+ , FuncRegistry(funcRegistry)
, RuntimeData(runtimeData)
{ }
@@ -759,7 +773,7 @@ public:
THashSet<ui32>&&,
THolder<NYql::NDq::TDqMemoryQuota>&&) override
{
- auto* actor = new TTaskRunnerActor(parent, ProxyFactory, InvokerFactory->Create(), txId, taskId, RuntimeData);
+ auto* actor = new TTaskRunnerActor(parent, ProxyFactory, FuncRegistry, InvokerFactory->Create(), txId, taskId, RuntimeData);
return std::make_tuple(
static_cast<ITaskRunnerActor*>(actor),
static_cast<NActors::IActor*>(actor)
@@ -769,15 +783,17 @@ public:
private:
NTaskRunnerProxy::IProxyFactory::TPtr ProxyFactory;
NDqs::ITaskRunnerInvokerFactory::TPtr InvokerFactory;
+ const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry;
TWorkerRuntimeData* RuntimeData;
};
ITaskRunnerActorFactory::TPtr CreateTaskRunnerActorFactory(
const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory,
const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry,
TWorkerRuntimeData* runtimeData)
{
- return ITaskRunnerActorFactory::TPtr(new TTaskRunnerActorFactory(proxyFactory, invokerFactory, runtimeData));
+ return ITaskRunnerActorFactory::TPtr(new TTaskRunnerActorFactory(proxyFactory, invokerFactory, funcRegistry, runtimeData));
}
} // namespace NTaskRunnerActor
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h
index 260b2a7e0d..9439364618 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h
@@ -29,6 +29,7 @@ namespace NTaskRunnerActor {
ITaskRunnerActorFactory::TPtr CreateTaskRunnerActorFactory(
const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory,
const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry,
TWorkerRuntimeData* runtimeData = nullptr);
} // namespace NTaskRunnerActor
diff --git a/ydb/library/yql/tools/dq/worker_node/main.cpp b/ydb/library/yql/tools/dq/worker_node/main.cpp
index 900eae7e2c..72720ddcc9 100644
--- a/ydb/library/yql/tools/dq/worker_node/main.cpp
+++ b/ydb/library/yql/tools/dq/worker_node/main.cpp
@@ -389,7 +389,6 @@ int main(int argc, char** argv) {
NYql::NDqs::TLocalWorkerManagerOptions lwmOptions;
bool disablePipe = res.Has("disable_pipe");
NKikimr::NMiniKQL::IStatsRegistryPtr statsRegistry = NKikimr::NMiniKQL::CreateDefaultStatsRegistry();
-
lwmOptions.Factory = disablePipe
? NTaskRunnerProxy::CreateFactory(functionRegistry.Get(), dqCompFactory, dqTaskTransformFactory, patternCache, true)
: NTaskRunnerProxy::CreatePipeFactory(pfOptions);
@@ -399,12 +398,13 @@ int main(int argc, char** argv) {
lwmOptions.TaskRunnerInvokerFactory = disablePipe
? TTaskRunnerInvokerFactory::TPtr(new NDqs::TTaskRunnerInvokerFactory())
: TTaskRunnerInvokerFactory::TPtr(new TConcurrentInvokerFactory(2*capacity));
+ YQL_ENSURE(functionRegistry);
lwmOptions.TaskRunnerActorFactory = disablePipe
- ? NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory([=](const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& )
+ ? NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(*functionRegistry.Get(), [=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& )
{
- return lwmOptions.Factory->Get(task, statsMode);
+ return lwmOptions.Factory->Get(alloc, task, statsMode);
})
- : NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory);
+ : NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory, functionRegistry.Get());
lwmOptions.ComputeActorOwnsCounters = true;
lwmOptions.UseSpilling = res.Has("enable-spilling");
auto resman = NDqs::CreateLocalWorkerManager(lwmOptions);