aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-03-14 16:18:00 +0300
committerhor911 <hor911@ydb.tech>2023-03-14 16:18:00 +0300
commitffd2bcafb632a9b2551e17809519d159c4c0775d (patch)
tree973d02f59abf3102c21ffc54eae6513f6dd18723
parent4e8f8398c4ed7aa1faf5228e6a401a90e11e2085 (diff)
downloadydb-ffd2bcafb632a9b2551e17809519d159c4c0775d.tar.gz
Adjust task mem limits (precomputes) for precise accounting
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp23
1 files changed, 13 insertions, 10 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 714a696f2ce..ac7b89eec37 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -702,7 +702,7 @@ private:
}
}
- void FillMemoryInfo() {
+ void FillGraphMemoryInfo(NYq::NProto::TGraphParams& graphParams) {
auto mkqlDefaultLimit = Params.Config.GetResourceManager().GetMkqlInitialMemoryLimit();
if (mkqlDefaultLimit == 0) {
mkqlDefaultLimit = 8_GB;
@@ -713,17 +713,15 @@ private:
s3ReadDefaultInflightLimit = 200_MB;
}
- for (auto& graphParams : DqGraphParams) {
- for (NYql::NDqProto::TDqTask& task : *graphParams.MutableTasks()) {
- if (task.GetInitialTaskMemoryLimit() == 0) {
- ui64 limitTotal = mkqlDefaultLimit;
- for (auto& input : *task.MutableInputs()) {
- if (input.HasSource() && input.GetSource().GetType() == "S3Source") {
- limitTotal += s3ReadDefaultInflightLimit;
- }
+ for (NYql::NDqProto::TDqTask& task : *graphParams.MutableTasks()) {
+ if (task.GetInitialTaskMemoryLimit() == 0) {
+ ui64 limitTotal = mkqlDefaultLimit;
+ for (auto& input : *task.MutableInputs()) {
+ if (input.HasSource() && input.GetSource().GetType() == "S3Source") {
+ limitTotal += s3ReadDefaultInflightLimit;
}
- task.SetInitialTaskMemoryLimit(limitTotal);
}
+ task.SetInitialTaskMemoryLimit(limitTotal);
}
}
}
@@ -1310,6 +1308,8 @@ private:
LOG_D("RunEvalDqGraph");
+ FillGraphMemoryInfo(dqGraphParams);
+
TDqConfiguration::TPtr dqConfiguration = MakeIntrusive<TDqConfiguration>();
dqConfiguration->Dispatch(dqGraphParams.GetSettings());
dqConfiguration->FreezeDefaults();
@@ -1798,6 +1798,9 @@ private:
SendTransientIssues(issues);
}
PrepareGraphs();
+ for (auto& graphParams : DqGraphParams) {
+ FillGraphMemoryInfo(graphParams);
+ }
} else {
TString abortMessage = message;
if (abortMessage == "") {