diff options
author | aozeritsky <aozeritsky@ydb.tech> | 2023-11-20 22:34:04 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@ydb.tech> | 2023-11-21 00:05:50 +0300 |
commit | a3fd9aa6b812dc5e1eb456b2cfc101417ade4040 (patch) | |
tree | a05e139e2ae098fcfbfceab51464c8369992bab5 | |
parent | 48a2655edf2e92be3712cd10d74327fd6e1ef77c (diff) | |
download | ydb-a3fd9aa6b812dc5e1eb456b2cfc101417ade4040.tar.gz |
Pass memory limits to piped task runner
4 files changed, 12 insertions, 10 deletions
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp index de612ac4d4..1188231a1b 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp @@ -114,10 +114,10 @@ public: return Task.GetId(); } - NYql::NDqProto::TPrepareResponse Prepare() override { + NYql::NDqProto::TPrepareResponse Prepare(const NDq::TDqTaskRunnerMemoryLimits& limits) override { NYql::NDqProto::TPrepareResponse ret; TDqTaskRunnerExecutionContextDefault ctx; - Runner->Prepare(Task, DefaultMemoryLimits(), ctx); + Runner->Prepare(Task, limits, ctx); return ret; } diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index 749df5783f..3990ee8676 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -1284,7 +1284,8 @@ public: return ProtocolVersion; } - NYql::NDqProto::TPrepareResponse Prepare() override { + NYql::NDqProto::TPrepareResponse Prepare(const NDq::TDqTaskRunnerMemoryLimits& limits) override { + Y_UNUSED(limits); NDqProto::TCommandHeader header; header.SetVersion(1); header.SetCommand(NDqProto::TCommandHeader::PREPARE); @@ -1538,11 +1539,10 @@ public: void Prepare(const TDqTaskSettings& task, const TDqTaskRunnerMemoryLimits& memoryLimits, const IDqTaskRunnerExecutionContext& execCtx) override { - Y_UNUSED(memoryLimits); Y_UNUSED(execCtx); Y_ABORT_UNLESS(Task.GetId() == task.GetId()); try { - auto result = Delegate->Prepare(); + auto result = Delegate->Prepare(memoryLimits); Y_UNUSED(result); } catch (...) { Delegate->RaiseException(); diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h index c23ac98da5..ec3b9dd728 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h @@ -14,6 +14,7 @@ extern const TString UseMetaParamName; // COMPAT(aozeritsky) void SaveRopeToPipe(IOutputStream& output, const TRope& rope); void LoadRopeFromPipe(IInputStream& input, TRope& rope); +NDq::TDqTaskRunnerMemoryLimits DefaultMemoryLimits(); class IInputChannel : public TThrRefBase, private TNonCopyable { public: @@ -48,7 +49,7 @@ public: virtual ui64 GetTaskId() const = 0; - virtual NYql::NDqProto::TPrepareResponse Prepare() = 0; + virtual NYql::NDqProto::TPrepareResponse Prepare(const NDq::TDqTaskRunnerMemoryLimits& limits = DefaultMemoryLimits()) = 0; virtual NYql::NDqProto::TRunResponse Run() = 0; virtual IInputChannel::TPtr GetInputChannel(ui64 channelId) = 0; @@ -87,7 +88,4 @@ public: virtual TIntrusivePtr<NDq::IDqTaskRunner> Get(const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TString& traceId = "TODO") = 0; }; - -NDq::TDqTaskRunnerMemoryLimits DefaultMemoryLimits(); - } // namespace NYql::NTaskRunnerProxy diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index 0047308089..c16c670f53 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -471,7 +471,11 @@ private: Invoker->Invoke([taskRunner=TaskRunner, replyTo, selfId, cookie, actorSystem, settings=Settings, stageId=StageId, startTime, clusterName = ClusterName](){ try { //auto guard = taskRunner->BindAllocator(); // only for local mode - auto result = taskRunner->Prepare(); + NDq::TDqTaskRunnerMemoryLimits limits; + limits.ChannelBufferSize = settings->ChannelBufferSize.Get().GetOrElse(TDqSettings::TDefault::ChannelBufferSize); + limits.OutputChunkMaxSize = settings->OutputChunkMaxSize.Get().GetOrElse(TDqSettings::TDefault::OutputChunkMaxSize); + limits.ChunkSizeLimit = settings->ChunkSizeLimit.Get().GetOrElse(TDqSettings::TDefault::ChunkSizeLimit); + auto result = taskRunner->Prepare(limits); auto sensors = GetSensors(result); auto sensorName = TCounters::GetCounterName( "Actor", |