aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@ydb.tech>2023-11-20 22:34:04 +0300
committeraozeritsky <aozeritsky@ydb.tech>2023-11-21 00:05:50 +0300
commita3fd9aa6b812dc5e1eb456b2cfc101417ade4040 (patch)
treea05e139e2ae098fcfbfceab51464c8369992bab5
parent48a2655edf2e92be3712cd10d74327fd6e1ef77c (diff)
downloadydb-a3fd9aa6b812dc5e1eb456b2cfc101417ade4040.tar.gz
Pass memory limits to piped task runner
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp4
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp6
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h6
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp6
4 files changed, 12 insertions, 10 deletions
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
index de612ac4d4..1188231a1b 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
@@ -114,10 +114,10 @@ public:
return Task.GetId();
}
- NYql::NDqProto::TPrepareResponse Prepare() override {
+ NYql::NDqProto::TPrepareResponse Prepare(const NDq::TDqTaskRunnerMemoryLimits& limits) override {
NYql::NDqProto::TPrepareResponse ret;
TDqTaskRunnerExecutionContextDefault ctx;
- Runner->Prepare(Task, DefaultMemoryLimits(), ctx);
+ Runner->Prepare(Task, limits, ctx);
return ret;
}
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 749df5783f..3990ee8676 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -1284,7 +1284,8 @@ public:
return ProtocolVersion;
}
- NYql::NDqProto::TPrepareResponse Prepare() override {
+ NYql::NDqProto::TPrepareResponse Prepare(const NDq::TDqTaskRunnerMemoryLimits& limits) override {
+ Y_UNUSED(limits);
NDqProto::TCommandHeader header;
header.SetVersion(1);
header.SetCommand(NDqProto::TCommandHeader::PREPARE);
@@ -1538,11 +1539,10 @@ public:
void Prepare(const TDqTaskSettings& task, const TDqTaskRunnerMemoryLimits& memoryLimits,
const IDqTaskRunnerExecutionContext& execCtx) override
{
- Y_UNUSED(memoryLimits);
Y_UNUSED(execCtx);
Y_ABORT_UNLESS(Task.GetId() == task.GetId());
try {
- auto result = Delegate->Prepare();
+ auto result = Delegate->Prepare(memoryLimits);
Y_UNUSED(result);
} catch (...) {
Delegate->RaiseException();
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
index c23ac98da5..ec3b9dd728 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
@@ -14,6 +14,7 @@ extern const TString UseMetaParamName; // COMPAT(aozeritsky)
void SaveRopeToPipe(IOutputStream& output, const TRope& rope);
void LoadRopeFromPipe(IInputStream& input, TRope& rope);
+NDq::TDqTaskRunnerMemoryLimits DefaultMemoryLimits();
class IInputChannel : public TThrRefBase, private TNonCopyable {
public:
@@ -48,7 +49,7 @@ public:
virtual ui64 GetTaskId() const = 0;
- virtual NYql::NDqProto::TPrepareResponse Prepare() = 0;
+ virtual NYql::NDqProto::TPrepareResponse Prepare(const NDq::TDqTaskRunnerMemoryLimits& limits = DefaultMemoryLimits()) = 0;
virtual NYql::NDqProto::TRunResponse Run() = 0;
virtual IInputChannel::TPtr GetInputChannel(ui64 channelId) = 0;
@@ -87,7 +88,4 @@ public:
virtual TIntrusivePtr<NDq::IDqTaskRunner> Get(const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TString& traceId = "TODO") = 0;
};
-
-NDq::TDqTaskRunnerMemoryLimits DefaultMemoryLimits();
-
} // namespace NYql::NTaskRunnerProxy
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index 0047308089..c16c670f53 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -471,7 +471,11 @@ private:
Invoker->Invoke([taskRunner=TaskRunner, replyTo, selfId, cookie, actorSystem, settings=Settings, stageId=StageId, startTime, clusterName = ClusterName](){
try {
//auto guard = taskRunner->BindAllocator(); // only for local mode
- auto result = taskRunner->Prepare();
+ NDq::TDqTaskRunnerMemoryLimits limits;
+ limits.ChannelBufferSize = settings->ChannelBufferSize.Get().GetOrElse(TDqSettings::TDefault::ChannelBufferSize);
+ limits.OutputChunkMaxSize = settings->OutputChunkMaxSize.Get().GetOrElse(TDqSettings::TDefault::OutputChunkMaxSize);
+ limits.ChunkSizeLimit = settings->ChunkSizeLimit.Get().GetOrElse(TDqSettings::TDefault::ChunkSizeLimit);
+ auto result = taskRunner->Prepare(limits);
auto sensors = GetSensors(result);
auto sensorName = TCounters::GetCounterName(
"Actor",