aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIvan <ilezhankin@yandex-team.ru>2025-07-24 22:27:31 +0300
committerGitHub <noreply@github.com>2025-07-24 22:27:31 +0300
commita5cd972162d55fccb611128a64aa8ece364167a9 (patch)
tree12380aeca0b5bede474413b7d1ed493a5910a3ae
parent6c6075a53d2f391f2fef51e946e6ea057988dee6 (diff)
downloadydb-a5cd972162d55fccb611128a64aa8ece364167a9.tar.gz
Add introspection for predictor of tasks count in graph (#21336)
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h141
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_stats.cpp3
-rw-r--r--ydb/core/kqp/opt/kqp_query_plan.cpp5
-rw-r--r--ydb/core/kqp/query_data/kqp_predictor.cpp14
-rw-r--r--ydb/core/kqp/query_data/kqp_predictor.h5
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_stats.proto1
-rw-r--r--ydb/library/yql/dq/tasks/dq_tasks_graph.h2
7 files changed, 122 insertions, 49 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index aa5e0abfffe..4f1431a4454 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -1031,6 +1031,7 @@ protected:
}
void BuildReadTasksFromSource(TStageInfo& stageInfo, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot, ui32 scheduledTaskCount) {
+ auto& intros = stageInfo.Introspections;
const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
YQL_ENSURE(stage.GetSources(0).HasExternalSource());
@@ -1040,20 +1041,28 @@ protected:
const auto& externalSource = stageSource.GetExternalSource();
ui32 taskCount = externalSource.GetPartitionedTaskParams().size();
+ intros.push_back("Using number of PartitionedTaskParams from external source - " + ToString(taskCount));
auto taskCountHint = stage.GetTaskCount();
+ TString introHint;
if (taskCountHint == 0) {
taskCountHint = scheduledTaskCount;
+ introHint = "(Using scheduled task count as hint for override - " + ToString(taskCountHint) + ")";
}
if (taskCountHint) {
if (taskCount > taskCountHint) {
taskCount = taskCountHint;
+ if (!introHint.empty()) {
+ intros.push_back(introHint);
+ }
+ intros.push_back("Manually overridden - " + ToString(taskCount));
}
} else if (!resourceSnapshot.empty()) {
ui32 maxTaskcount = resourceSnapshot.size() * 2;
if (taskCount > maxTaskcount) {
taskCount = maxTaskcount;
+ intros.push_back("Using less tasks because of resource snapshot size - " + ToString(taskCount));
}
}
@@ -1151,8 +1160,11 @@ protected:
}
TMaybe<size_t> BuildScanTasksFromSource(TStageInfo& stageInfo, bool limitTasksPerNode) {
+ auto& intros = stageInfo.Introspections;
+
if (EnableReadsMerge) {
limitTasksPerNode = true;
+ intros.push_back("Using tasks count limit because of enabled reads merge");
}
THashMap<ui64, std::vector<ui64>> nodeTasks;
@@ -1298,8 +1310,15 @@ protected:
THashMap<ui64, TVector<ui64>> nodeIdToTasks;
THashMap<ui64, TVector<TShardRangesWithShardId>> nodeIdToShardKeyRanges;
+ Y_DEFER {
+ intros.push_back("Built scan tasks from source and shards to read for node");
+ for (const auto& [nodeId, tasks] : nodeIdToTasks) {
+ intros.push_back(ToString(nodeId) + " - " + ToString(tasks.size()) + ", " + ToString(nodeIdToShardKeyRanges.at(nodeId).size()));
+ }
+ intros.push_back("Total built scan tasks from source - " + ToString(createdTasksIds.size()));
+ };
- auto addPartiton = [&](
+ auto addPartition = [&](
ui64 taskLocation,
TMaybe<ui64> nodeId,
TMaybe<ui64> shardId,
@@ -1405,7 +1424,7 @@ protected:
if (shardInfo.KeyReadRanges) {
const TMaybe<ui64> nodeId = (isParallelPointRead) ? TMaybe<ui64>{SelfId().NodeId()} : Nothing();
- addPartiton(startShard, nodeId, {}, shardInfo, inFlightShards);
+ addPartition(startShard, nodeId, {}, shardInfo, inFlightShards);
fillRangesForTasks();
buildSinks();
return (isParallelPointRead) ? TMaybe<size_t>(partitions.size()) : Nothing();
@@ -1414,7 +1433,7 @@ protected:
}
} else {
for (auto& [shardId, shardInfo] : partitions) {
- addPartiton(shardId, {}, shardId, shardInfo, {});
+ addPartition(shardId, {}, shardId, shardInfo, {});
}
fillRangesForTasks();
buildSinks();
@@ -1423,17 +1442,23 @@ protected:
}
ui32 GetMaxTasksAggregation(TStageInfo& stageInfo, const ui32 previousTasksCount, const ui32 nodesCount) const {
+ auto& intros = stageInfo.Introspections;
if (AggregationSettings.HasAggregationComputeThreads()) {
+ intros.push_back("Considering AggregationComputeThreads value - " + ToString(AggregationSettings.GetAggregationComputeThreads()));
return std::max<ui32>(1, AggregationSettings.GetAggregationComputeThreads());
} else if (nodesCount) {
const TStagePredictor& predictor = stageInfo.Meta.Tx.Body->GetCalculationPredictor(stageInfo.Id.StageId);
- return predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), previousTasksCount / nodesCount) * nodesCount;
+ auto result = predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), previousTasksCount / nodesCount, intros) * nodesCount;
+ intros.push_back("Predicted value for aggregation - " + ToString(result));
+ return result;
} else {
+ intros.push_back("Unknown nodes count for aggregation - using value 1");
return 1;
}
}
void BuildComputeTasks(TStageInfo& stageInfo, const ui32 nodesCount) {
+ auto& intros = stageInfo.Introspections;
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
ui32 partitionsCount = 1;
@@ -1476,11 +1501,13 @@ protected:
case NKqpProto::TKqpPhyConnection::kStreamLookup:
partitionsCount = originStageInfo.Tasks.size();
UnknownAffectedShardCount = true;
+ intros.push_back("Resetting compute tasks count because input " + ToString(inputIndex) + " is StreamLookup - " + ToString(partitionsCount));
break;
case NKqpProto::TKqpPhyConnection::kMap:
partitionsCount = originStageInfo.Tasks.size();
forceMapTasks = true;
++mapCnt;
+ intros.push_back("Resetting compute tasks count because input " + ToString(inputIndex) + " is Map - " + ToString(partitionsCount));
break;
default:
break;
@@ -1493,11 +1520,14 @@ protected:
if (isShuffle && !forceMapTasks) {
if (stage.GetTaskCount()) {
partitionsCount = stage.GetTaskCount();
+ intros.push_back("Manually overridden - " + ToString(partitionsCount));
} else {
partitionsCount = std::max(partitionsCount, GetMaxTasksAggregation(stageInfo, inputTasks, nodesCount));
}
}
+ intros.push_back("Actual number of compute tasks - " + ToString(partitionsCount));
+
for (ui32 i = 0; i < partitionsCount; ++i) {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.Type = TTaskMeta::TTaskType::Compute;
@@ -1582,6 +1612,7 @@ protected:
const NKqpProto::TKqpPhyTableOperation& op, bool isPersistentScan) const
{
TTaskMeta::TShardReadInfo readInfo = {
+ .Ranges = {},
.Columns = columns,
};
if (keyReadRanges) {
@@ -1607,77 +1638,50 @@ protected:
ui32 GetScanTasksPerNode(
TStageInfo& stageInfo,
const bool isOlapScan,
- const ui64 /*nodeId*/,
+ const ui64 nodeId,
bool enableShuffleElimination = false
) const {
+ auto& intros = stageInfo.Introspections;
const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
if (const auto taskCount = stage.GetTaskCount()) {
+ intros.push_back("Manually overridden - " + ToString(taskCount));
return taskCount;
}
ui32 result = 0;
if (isOlapScan) {
+ intros.push_back("This is OLAP scan");
if (AggregationSettings.HasCSScanThreadsPerNode()) {
result = AggregationSettings.GetCSScanThreadsPerNode();
+ intros.push_back("Using the CSScanThreadsPerNode value - " + ToString(result));
} else {
const TStagePredictor& predictor = stageInfo.Meta.Tx.Body->GetCalculationPredictor(stageInfo.Id.StageId);
- result = predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), {});
+ result = predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), {}, intros);
+ intros.push_back("Predicted value for OLAP scan - " + ToString(result));
}
} else {
result = AggregationSettings.GetDSScanMinimalThreads();
+ intros.push_back("Using the DSScanMinimalThreads value - " + ToString(result));
if (stage.GetProgram().GetSettings().GetHasSort()) {
result = std::max(result, AggregationSettings.GetDSBaseSortScanThreads());
+ intros.push_back("Considering DSBaseSortScanThreads value because program has sort - " + ToString(result));
}
if (stage.GetProgram().GetSettings().GetHasMapJoin()) {
result = std::max(result, AggregationSettings.GetDSBaseJoinScanThreads());
+ intros.push_back("Considering DSBaseJoinScanThreads value because program has MapJoin - " + ToString(result));
}
}
result = Max<ui32>(1, result);
if (enableShuffleElimination) {
result *= 2;
+ intros.push_back("Multiply by 2 because of Shuffle Elimination - " + ToString(result));
}
+ intros.push_back("Predicted number of scan tasks per node " + ToString(nodeId) + " - " + ToString(result));
return result;
}
- TTask& AssignScanTaskToShard(
- TStageInfo& stageInfo, const ui64 shardId,
- THashMap<ui64, std::vector<ui64>>& nodeTasks,
- THashMap<ui64, ui64>& assignedShardsCount,
- const bool sorted, const bool isOlapScan)
- {
- const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
- ui64 nodeId = ShardIdToNodeId.at(shardId);
- if (stageInfo.Meta.IsOlap() && sorted) {
- auto& task = TasksGraph.AddTask(stageInfo);
- task.Meta.ExecuterId = SelfId();
- task.Meta.NodeId = nodeId;
- task.Meta.ScanTask = true;
- task.Meta.Type = TTaskMeta::TTaskType::Scan;
- FillSecureParamsFromStage(task.Meta.SecureParams, stage);
- return task;
- }
-
- auto& tasks = nodeTasks[nodeId];
- auto& cnt = assignedShardsCount[nodeId];
- const ui32 maxScansPerNode = GetScanTasksPerNode(stageInfo, isOlapScan, nodeId);
- if (cnt < maxScansPerNode) {
- auto& task = TasksGraph.AddTask(stageInfo);
- task.Meta.NodeId = nodeId;
- task.Meta.ScanTask = true;
- task.Meta.Type = TTaskMeta::TTaskType::Scan;
- FillSecureParamsFromStage(task.Meta.SecureParams, stage);
- tasks.push_back(task.Id);
- ++cnt;
- return task;
- } else {
- ui64 taskIdx = cnt % maxScansPerNode;
- ++cnt;
- return TasksGraph.GetTask(tasks[taskIdx]);
- }
- }
-
bool BuildPlannerAndSubmitTasks() {
Planner = CreateKqpPlanner({
.TasksGraph = TasksGraph,
@@ -1719,6 +1723,7 @@ protected:
THashMap<ui64, std::vector<ui64>> nodeTasks;
THashMap<ui64, std::vector<TShardInfoWithId>> nodeShards;
THashMap<ui64, ui64> assignedShardsCount;
+ auto& intros = stageInfo.Introspections;
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
auto& columnShardHashV1Params = stageInfo.Meta.ColumnShardHashV1Params;
@@ -1770,10 +1775,45 @@ protected:
}
if (!AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead() && !shuffleEliminated || (!isOlapScan && readSettings.IsSorted())) {
+ THashMap<ui64 /* nodeId */, ui64 /* tasks count */> olapAndSortedTasksCount;
+
+ auto AssignScanTaskToShard = [&](const ui64 shardId, const bool sorted) -> TTask& {
+ ui64 nodeId = ShardIdToNodeId.at(shardId);
+ if (stageInfo.Meta.IsOlap() && sorted) {
+ auto& task = TasksGraph.AddTask(stageInfo);
+ task.Meta.ExecuterId = SelfId();
+ task.Meta.NodeId = nodeId;
+ task.Meta.ScanTask = true;
+ task.Meta.Type = TTaskMeta::TTaskType::Scan;
+ FillSecureParamsFromStage(task.Meta.SecureParams, stage);
+ ++olapAndSortedTasksCount[nodeId];
+ return task;
+ }
+
+ auto& tasks = nodeTasks[nodeId];
+ auto& cnt = assignedShardsCount[nodeId];
+ const ui32 maxScansPerNode = GetScanTasksPerNode(stageInfo, isOlapScan, nodeId);
+ if (cnt < maxScansPerNode) {
+ auto& task = TasksGraph.AddTask(stageInfo);
+ task.Meta.NodeId = nodeId;
+ task.Meta.ScanTask = true;
+ task.Meta.Type = TTaskMeta::TTaskType::Scan;
+ FillSecureParamsFromStage(task.Meta.SecureParams, stage);
+ tasks.push_back(task.Id);
+ ++cnt;
+ return task;
+ } else {
+ ui64 taskIdx = cnt % maxScansPerNode;
+ ++cnt;
+ intros.push_back("Scan task for node " + ToString(nodeId) + " not created");
+ return TasksGraph.GetTask(tasks[taskIdx]);
+ }
+ };
+
for (auto&& pair : nodeShards) {
auto& shardsInfo = pair.second;
for (auto&& shardInfo : shardsInfo) {
- auto& task = AssignScanTaskToShard(stageInfo, shardInfo.ShardId, nodeTasks, assignedShardsCount, readSettings.IsSorted(), isOlapScan);
+ auto& task = AssignScanTaskToShard(shardInfo.ShardId, readSettings.IsSorted());
MergeReadInfoToTaskMeta(task.Meta, shardInfo.ShardId, shardInfo.KeyReadRanges, readSettings,
columns, op, /*isPersistentScan*/ true);
}
@@ -1786,8 +1826,13 @@ protected:
PrepareScanMetaForUsage(task.Meta, keyTypes);
BuildSinks(stage, stageInfo, task);
}
+
+ intros.push_back("Actual number of scan tasks for node " + ToString(pair.first) + " - " + ToString(pair.second.size()));
}
+ for (const auto& [nodeId, count] : olapAndSortedTasksCount) {
+ intros.push_back("Actual number of scan tasks (olap+sorted) for node " + ToString(nodeId) + " - " + ToString(count));
+ }
} else if (shuffleEliminated /* save partitioning for shuffle elimination */) {
std::size_t stageInternalTaskId = 0;
columnShardHashV1Params.TaskIndexByHash = std::make_shared<TVector<ui64>>();
@@ -1828,6 +1873,8 @@ protected:
hashByShardId.insert({sharding.GetColumnShards(i), i});
}
+ intros.push_back("Actual number of scan tasks from shards with shuffle elimination for node " + ToString(nodeId) + " - " + ToString(maxTasksPerNode));
+
for (ui32 t = 0; t < maxTasksPerNode; ++t, ++stageInternalTaskId) {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta = metas[t];
@@ -1869,7 +1916,11 @@ protected:
LOG_D("Stage " << stageInfo.Id << " create scan task meta for node: " << nodeId
<< ", meta: " << meta.ToString(keyTypes, *AppData()->TypeRegistry));
}
- for (ui32 t = 0; t < GetScanTasksPerNode(stageInfo, isOlapScan, nodeId); ++t) {
+
+ const auto maxTasksPerNode = GetScanTasksPerNode(stageInfo, isOlapScan, nodeId);
+ intros.push_back("Actual number of scan tasks from shards without shuffle elimination for node " + ToString(nodeId) + " - " + ToString(maxTasksPerNode));
+
+ for (ui32 t = 0; t < maxTasksPerNode; ++t) {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta = meta;
task.Meta.SetEnableShardsSequentialScan(false);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
index 6aea9cc18bd..98278433100 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
@@ -791,6 +791,9 @@ NDqProto::TDqStageStats* GetOrCreateStageStats(const NYql::NDq::TStageId& stageI
newStage->SetStageId(stageId.StageId);
newStage->SetStageGuid(stageProto.GetStageGuid());
newStage->SetProgram(stageProto.GetProgramAst());
+ for (const auto& intro: stageInfo.Introspections) {
+ newStage->AddIntrospections(intro);
+ }
return newStage;
}
diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp
index 1ddcbbebb23..0873e26653b 100644
--- a/ydb/core/kqp/opt/kqp_query_plan.cpp
+++ b/ydb/core/kqp/opt/kqp_query_plan.cpp
@@ -2963,6 +2963,11 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
stats["UpdateTimeMs"] = updateTimeUs;
}
+ auto& introspections = stats.InsertValue("Introspections", NJson::JSON_ARRAY);
+ for (const auto& intro : (*stat)->GetIntrospections()) {
+ introspections.AppendValue(intro);
+ }
+
stats["StageDurationUs"] = (*stat)->GetStageDurationUs();
if ((*stat)->GetBaseTimeMs()) {
diff --git a/ydb/core/kqp/query_data/kqp_predictor.cpp b/ydb/core/kqp/query_data/kqp_predictor.cpp
index d2face9ec0e..ac1650f9a0e 100644
--- a/ydb/core/kqp/query_data/kqp_predictor.cpp
+++ b/ydb/core/kqp/query_data/kqp_predictor.cpp
@@ -131,16 +131,26 @@ ui32 TStagePredictor::GetUsableThreads() {
return Max<ui32>(1, *userPoolSize);
}
-ui32 TStagePredictor::CalcTasksOptimalCount(const ui32 availableThreadsCount, const std::optional<ui32> previousStageTasksCount) const {
+ui32 TStagePredictor::CalcTasksOptimalCount(
+ const ui32 availableThreadsCount, const std::optional<ui32> previousStageTasksCount,
+ TVector<TString>& intros) const
+{
ui32 result = 0;
if (!LevelDataPrediction || *LevelDataPrediction == 0) {
- ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "level difficult not defined for correct calculation";
+ ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "level difficulty not defined for correct calculation";
result = availableThreadsCount;
+ intros.push_back("Level difficulty not defined for correct calculation");
} else {
result = (availableThreadsCount - previousStageTasksCount.value_or(0) * 0.25) * (InputDataPrediction / *LevelDataPrediction);
+ intros.push_back("Using level data prediction: (availableThreadsCount - previousStageTasksCount * 0.25) * (InputDataPrediction / LevelDataPrediction) = " + ToString(result));
+ intros.push_back("(availableThreadsCount = " + ToString(availableThreadsCount) + ")");
+ intros.push_back("(previousStageTasksCount = " + ToString(previousStageTasksCount.value_or(0)) + ")");
+ intros.push_back("(InputDataPrediction = " + ToString(InputDataPrediction) + ")");
+ intros.push_back("(LevelDataPrediction = " + ToString(*LevelDataPrediction) + ")");
}
if (previousStageTasksCount) {
result = std::min<ui32>(result, *previousStageTasksCount);
+ intros.push_back("Shrinking result to previous stage tasks count - " + ToString(result));
}
return std::max<ui32>(1, result);
}
diff --git a/ydb/core/kqp/query_data/kqp_predictor.h b/ydb/core/kqp/query_data/kqp_predictor.h
index d142ed267f8..b3b16d59586 100644
--- a/ydb/core/kqp/query_data/kqp_predictor.h
+++ b/ydb/core/kqp/query_data/kqp_predictor.h
@@ -22,7 +22,7 @@ private:
YDB_READONLY_FLAG(HasCondense, false);
YDB_READONLY(ui32, NodesCount, 0);
-
+
YDB_READONLY_FLAG(HasSort, false);
YDB_READONLY_FLAG(HasMapJoin, false);
YDB_READONLY_FLAG(HasUdf, false);
@@ -45,7 +45,8 @@ public:
bool DeserializeFromKqpSettings(const NYql::NDqProto::TProgram::TSettings& kqpProto);
static ui32 GetUsableThreads();
bool NeedLLVM() const;
- ui32 CalcTasksOptimalCount(const ui32 availableThreadsCount, const std::optional<ui32> previousStageTasksCount) const;
+ ui32 CalcTasksOptimalCount(const ui32 availableThreadsCount, const std::optional<ui32> previousStageTasksCount,
+ TVector<TString>& introspections) const;
};
}
diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto
index 2137df7bb31..e249f2b0369 100644
--- a/ydb/library/yql/dq/actors/protos/dq_stats.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto
@@ -407,6 +407,7 @@ message TDqStageStats {
uint32 TotalTasksCount = 5;
uint32 FailedTasksCount = 6;
uint32 FinishedTasksCount = 50;
+ repeated string Introspections = 52; // the human-readable reasons for choosing this exact number of tasks
TDqStatsAggr CpuTimeUs = 8;
TDqStatsAggr SourceCpuTimeUs = 25;
diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
index 1b8de608d5c..668e8f4391d 100644
--- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h
+++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
@@ -66,6 +66,8 @@ struct TStageInfo : private TMoveOnly {
ui32 InputsCount = 0;
ui32 OutputsCount = 0;
+ // Describes step-by-step the decisions leading to this exact number of tasks - in human-readable way.
+ TVector<TString> Introspections; // TODO(#20120): maybe find a better place?
TVector<ui64> Tasks;
TStageInfoMeta Meta;