diff options
author | Ivan <ilezhankin@yandex-team.ru> | 2025-07-24 22:27:31 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-07-24 22:27:31 +0300 |
commit | a5cd972162d55fccb611128a64aa8ece364167a9 (patch) | |
tree | 12380aeca0b5bede474413b7d1ed493a5910a3ae | |
parent | 6c6075a53d2f391f2fef51e946e6ea057988dee6 (diff) | |
download | ydb-a5cd972162d55fccb611128a64aa8ece364167a9.tar.gz |
Add introspection for predictor of tasks count in graph (#21336)
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 141 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_stats.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_query_plan.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/query_data/kqp_predictor.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/query_data/kqp_predictor.h | 5 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/protos/dq_stats.proto | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/tasks/dq_tasks_graph.h | 2 |
7 files changed, 122 insertions, 49 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index aa5e0abfffe..4f1431a4454 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -1031,6 +1031,7 @@ protected: } void BuildReadTasksFromSource(TStageInfo& stageInfo, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot, ui32 scheduledTaskCount) { + auto& intros = stageInfo.Introspections; const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); YQL_ENSURE(stage.GetSources(0).HasExternalSource()); @@ -1040,20 +1041,28 @@ protected: const auto& externalSource = stageSource.GetExternalSource(); ui32 taskCount = externalSource.GetPartitionedTaskParams().size(); + intros.push_back("Using number of PartitionedTaskParams from external source - " + ToString(taskCount)); auto taskCountHint = stage.GetTaskCount(); + TString introHint; if (taskCountHint == 0) { taskCountHint = scheduledTaskCount; + introHint = "(Using scheduled task count as hint for override - " + ToString(taskCountHint) + ")"; } if (taskCountHint) { if (taskCount > taskCountHint) { taskCount = taskCountHint; + if (!introHint.empty()) { + intros.push_back(introHint); + } + intros.push_back("Manually overridden - " + ToString(taskCount)); } } else if (!resourceSnapshot.empty()) { ui32 maxTaskcount = resourceSnapshot.size() * 2; if (taskCount > maxTaskcount) { taskCount = maxTaskcount; + intros.push_back("Using less tasks because of resource snapshot size - " + ToString(taskCount)); } } @@ -1151,8 +1160,11 @@ protected: } TMaybe<size_t> BuildScanTasksFromSource(TStageInfo& stageInfo, bool limitTasksPerNode) { + auto& intros = stageInfo.Introspections; + if (EnableReadsMerge) { limitTasksPerNode = true; + intros.push_back("Using tasks count limit because of enabled reads merge"); } THashMap<ui64, std::vector<ui64>> nodeTasks; @@ -1298,8 +1310,15 @@ protected: THashMap<ui64, TVector<ui64>> nodeIdToTasks; THashMap<ui64, TVector<TShardRangesWithShardId>> nodeIdToShardKeyRanges; + Y_DEFER { + intros.push_back("Built scan tasks from source and shards to read for node"); + for (const auto& [nodeId, tasks] : nodeIdToTasks) { + intros.push_back(ToString(nodeId) + " - " + ToString(tasks.size()) + ", " + ToString(nodeIdToShardKeyRanges.at(nodeId).size())); + } + intros.push_back("Total built scan tasks from source - " + ToString(createdTasksIds.size())); + }; - auto addPartiton = [&]( + auto addPartition = [&]( ui64 taskLocation, TMaybe<ui64> nodeId, TMaybe<ui64> shardId, @@ -1405,7 +1424,7 @@ protected: if (shardInfo.KeyReadRanges) { const TMaybe<ui64> nodeId = (isParallelPointRead) ? TMaybe<ui64>{SelfId().NodeId()} : Nothing(); - addPartiton(startShard, nodeId, {}, shardInfo, inFlightShards); + addPartition(startShard, nodeId, {}, shardInfo, inFlightShards); fillRangesForTasks(); buildSinks(); return (isParallelPointRead) ? TMaybe<size_t>(partitions.size()) : Nothing(); @@ -1414,7 +1433,7 @@ protected: } } else { for (auto& [shardId, shardInfo] : partitions) { - addPartiton(shardId, {}, shardId, shardInfo, {}); + addPartition(shardId, {}, shardId, shardInfo, {}); } fillRangesForTasks(); buildSinks(); @@ -1423,17 +1442,23 @@ protected: } ui32 GetMaxTasksAggregation(TStageInfo& stageInfo, const ui32 previousTasksCount, const ui32 nodesCount) const { + auto& intros = stageInfo.Introspections; if (AggregationSettings.HasAggregationComputeThreads()) { + intros.push_back("Considering AggregationComputeThreads value - " + ToString(AggregationSettings.GetAggregationComputeThreads())); return std::max<ui32>(1, AggregationSettings.GetAggregationComputeThreads()); } else if (nodesCount) { const TStagePredictor& predictor = stageInfo.Meta.Tx.Body->GetCalculationPredictor(stageInfo.Id.StageId); - return predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), previousTasksCount / nodesCount) * nodesCount; + auto result = predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), previousTasksCount / nodesCount, intros) * nodesCount; + intros.push_back("Predicted value for aggregation - " + ToString(result)); + return result; } else { + intros.push_back("Unknown nodes count for aggregation - using value 1"); return 1; } } void BuildComputeTasks(TStageInfo& stageInfo, const ui32 nodesCount) { + auto& intros = stageInfo.Introspections; auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); ui32 partitionsCount = 1; @@ -1476,11 +1501,13 @@ protected: case NKqpProto::TKqpPhyConnection::kStreamLookup: partitionsCount = originStageInfo.Tasks.size(); UnknownAffectedShardCount = true; + intros.push_back("Resetting compute tasks count because input " + ToString(inputIndex) + " is StreamLookup - " + ToString(partitionsCount)); break; case NKqpProto::TKqpPhyConnection::kMap: partitionsCount = originStageInfo.Tasks.size(); forceMapTasks = true; ++mapCnt; + intros.push_back("Resetting compute tasks count because input " + ToString(inputIndex) + " is Map - " + ToString(partitionsCount)); break; default: break; @@ -1493,11 +1520,14 @@ protected: if (isShuffle && !forceMapTasks) { if (stage.GetTaskCount()) { partitionsCount = stage.GetTaskCount(); + intros.push_back("Manually overridden - " + ToString(partitionsCount)); } else { partitionsCount = std::max(partitionsCount, GetMaxTasksAggregation(stageInfo, inputTasks, nodesCount)); } } + intros.push_back("Actual number of compute tasks - " + ToString(partitionsCount)); + for (ui32 i = 0; i < partitionsCount; ++i) { auto& task = TasksGraph.AddTask(stageInfo); task.Meta.Type = TTaskMeta::TTaskType::Compute; @@ -1582,6 +1612,7 @@ protected: const NKqpProto::TKqpPhyTableOperation& op, bool isPersistentScan) const { TTaskMeta::TShardReadInfo readInfo = { + .Ranges = {}, .Columns = columns, }; if (keyReadRanges) { @@ -1607,77 +1638,50 @@ protected: ui32 GetScanTasksPerNode( TStageInfo& stageInfo, const bool isOlapScan, - const ui64 /*nodeId*/, + const ui64 nodeId, bool enableShuffleElimination = false ) const { + auto& intros = stageInfo.Introspections; const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); if (const auto taskCount = stage.GetTaskCount()) { + intros.push_back("Manually overridden - " + ToString(taskCount)); return taskCount; } ui32 result = 0; if (isOlapScan) { + intros.push_back("This is OLAP scan"); if (AggregationSettings.HasCSScanThreadsPerNode()) { result = AggregationSettings.GetCSScanThreadsPerNode(); + intros.push_back("Using the CSScanThreadsPerNode value - " + ToString(result)); } else { const TStagePredictor& predictor = stageInfo.Meta.Tx.Body->GetCalculationPredictor(stageInfo.Id.StageId); - result = predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), {}); + result = predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), {}, intros); + intros.push_back("Predicted value for OLAP scan - " + ToString(result)); } } else { result = AggregationSettings.GetDSScanMinimalThreads(); + intros.push_back("Using the DSScanMinimalThreads value - " + ToString(result)); if (stage.GetProgram().GetSettings().GetHasSort()) { result = std::max(result, AggregationSettings.GetDSBaseSortScanThreads()); + intros.push_back("Considering DSBaseSortScanThreads value because program has sort - " + ToString(result)); } if (stage.GetProgram().GetSettings().GetHasMapJoin()) { result = std::max(result, AggregationSettings.GetDSBaseJoinScanThreads()); + intros.push_back("Considering DSBaseJoinScanThreads value because program has MapJoin - " + ToString(result)); } } result = Max<ui32>(1, result); if (enableShuffleElimination) { result *= 2; + intros.push_back("Multiply by 2 because of Shuffle Elimination - " + ToString(result)); } + intros.push_back("Predicted number of scan tasks per node " + ToString(nodeId) + " - " + ToString(result)); return result; } - TTask& AssignScanTaskToShard( - TStageInfo& stageInfo, const ui64 shardId, - THashMap<ui64, std::vector<ui64>>& nodeTasks, - THashMap<ui64, ui64>& assignedShardsCount, - const bool sorted, const bool isOlapScan) - { - const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); - ui64 nodeId = ShardIdToNodeId.at(shardId); - if (stageInfo.Meta.IsOlap() && sorted) { - auto& task = TasksGraph.AddTask(stageInfo); - task.Meta.ExecuterId = SelfId(); - task.Meta.NodeId = nodeId; - task.Meta.ScanTask = true; - task.Meta.Type = TTaskMeta::TTaskType::Scan; - FillSecureParamsFromStage(task.Meta.SecureParams, stage); - return task; - } - - auto& tasks = nodeTasks[nodeId]; - auto& cnt = assignedShardsCount[nodeId]; - const ui32 maxScansPerNode = GetScanTasksPerNode(stageInfo, isOlapScan, nodeId); - if (cnt < maxScansPerNode) { - auto& task = TasksGraph.AddTask(stageInfo); - task.Meta.NodeId = nodeId; - task.Meta.ScanTask = true; - task.Meta.Type = TTaskMeta::TTaskType::Scan; - FillSecureParamsFromStage(task.Meta.SecureParams, stage); - tasks.push_back(task.Id); - ++cnt; - return task; - } else { - ui64 taskIdx = cnt % maxScansPerNode; - ++cnt; - return TasksGraph.GetTask(tasks[taskIdx]); - } - } - bool BuildPlannerAndSubmitTasks() { Planner = CreateKqpPlanner({ .TasksGraph = TasksGraph, @@ -1719,6 +1723,7 @@ protected: THashMap<ui64, std::vector<ui64>> nodeTasks; THashMap<ui64, std::vector<TShardInfoWithId>> nodeShards; THashMap<ui64, ui64> assignedShardsCount; + auto& intros = stageInfo.Introspections; auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); auto& columnShardHashV1Params = stageInfo.Meta.ColumnShardHashV1Params; @@ -1770,10 +1775,45 @@ protected: } if (!AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead() && !shuffleEliminated || (!isOlapScan && readSettings.IsSorted())) { + THashMap<ui64 /* nodeId */, ui64 /* tasks count */> olapAndSortedTasksCount; + + auto AssignScanTaskToShard = [&](const ui64 shardId, const bool sorted) -> TTask& { + ui64 nodeId = ShardIdToNodeId.at(shardId); + if (stageInfo.Meta.IsOlap() && sorted) { + auto& task = TasksGraph.AddTask(stageInfo); + task.Meta.ExecuterId = SelfId(); + task.Meta.NodeId = nodeId; + task.Meta.ScanTask = true; + task.Meta.Type = TTaskMeta::TTaskType::Scan; + FillSecureParamsFromStage(task.Meta.SecureParams, stage); + ++olapAndSortedTasksCount[nodeId]; + return task; + } + + auto& tasks = nodeTasks[nodeId]; + auto& cnt = assignedShardsCount[nodeId]; + const ui32 maxScansPerNode = GetScanTasksPerNode(stageInfo, isOlapScan, nodeId); + if (cnt < maxScansPerNode) { + auto& task = TasksGraph.AddTask(stageInfo); + task.Meta.NodeId = nodeId; + task.Meta.ScanTask = true; + task.Meta.Type = TTaskMeta::TTaskType::Scan; + FillSecureParamsFromStage(task.Meta.SecureParams, stage); + tasks.push_back(task.Id); + ++cnt; + return task; + } else { + ui64 taskIdx = cnt % maxScansPerNode; + ++cnt; + intros.push_back("Scan task for node " + ToString(nodeId) + " not created"); + return TasksGraph.GetTask(tasks[taskIdx]); + } + }; + for (auto&& pair : nodeShards) { auto& shardsInfo = pair.second; for (auto&& shardInfo : shardsInfo) { - auto& task = AssignScanTaskToShard(stageInfo, shardInfo.ShardId, nodeTasks, assignedShardsCount, readSettings.IsSorted(), isOlapScan); + auto& task = AssignScanTaskToShard(shardInfo.ShardId, readSettings.IsSorted()); MergeReadInfoToTaskMeta(task.Meta, shardInfo.ShardId, shardInfo.KeyReadRanges, readSettings, columns, op, /*isPersistentScan*/ true); } @@ -1786,8 +1826,13 @@ protected: PrepareScanMetaForUsage(task.Meta, keyTypes); BuildSinks(stage, stageInfo, task); } + + intros.push_back("Actual number of scan tasks for node " + ToString(pair.first) + " - " + ToString(pair.second.size())); } + for (const auto& [nodeId, count] : olapAndSortedTasksCount) { + intros.push_back("Actual number of scan tasks (olap+sorted) for node " + ToString(nodeId) + " - " + ToString(count)); + } } else if (shuffleEliminated /* save partitioning for shuffle elimination */) { std::size_t stageInternalTaskId = 0; columnShardHashV1Params.TaskIndexByHash = std::make_shared<TVector<ui64>>(); @@ -1828,6 +1873,8 @@ protected: hashByShardId.insert({sharding.GetColumnShards(i), i}); } + intros.push_back("Actual number of scan tasks from shards with shuffle elimination for node " + ToString(nodeId) + " - " + ToString(maxTasksPerNode)); + for (ui32 t = 0; t < maxTasksPerNode; ++t, ++stageInternalTaskId) { auto& task = TasksGraph.AddTask(stageInfo); task.Meta = metas[t]; @@ -1869,7 +1916,11 @@ protected: LOG_D("Stage " << stageInfo.Id << " create scan task meta for node: " << nodeId << ", meta: " << meta.ToString(keyTypes, *AppData()->TypeRegistry)); } - for (ui32 t = 0; t < GetScanTasksPerNode(stageInfo, isOlapScan, nodeId); ++t) { + + const auto maxTasksPerNode = GetScanTasksPerNode(stageInfo, isOlapScan, nodeId); + intros.push_back("Actual number of scan tasks from shards without shuffle elimination for node " + ToString(nodeId) + " - " + ToString(maxTasksPerNode)); + + for (ui32 t = 0; t < maxTasksPerNode; ++t) { auto& task = TasksGraph.AddTask(stageInfo); task.Meta = meta; task.Meta.SetEnableShardsSequentialScan(false); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp index 6aea9cc18bd..98278433100 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp @@ -791,6 +791,9 @@ NDqProto::TDqStageStats* GetOrCreateStageStats(const NYql::NDq::TStageId& stageI newStage->SetStageId(stageId.StageId); newStage->SetStageGuid(stageProto.GetStageGuid()); newStage->SetProgram(stageProto.GetProgramAst()); + for (const auto& intro: stageInfo.Introspections) { + newStage->AddIntrospections(intro); + } return newStage; } diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index 1ddcbbebb23..0873e26653b 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -2963,6 +2963,11 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD stats["UpdateTimeMs"] = updateTimeUs; } + auto& introspections = stats.InsertValue("Introspections", NJson::JSON_ARRAY); + for (const auto& intro : (*stat)->GetIntrospections()) { + introspections.AppendValue(intro); + } + stats["StageDurationUs"] = (*stat)->GetStageDurationUs(); if ((*stat)->GetBaseTimeMs()) { diff --git a/ydb/core/kqp/query_data/kqp_predictor.cpp b/ydb/core/kqp/query_data/kqp_predictor.cpp index d2face9ec0e..ac1650f9a0e 100644 --- a/ydb/core/kqp/query_data/kqp_predictor.cpp +++ b/ydb/core/kqp/query_data/kqp_predictor.cpp @@ -131,16 +131,26 @@ ui32 TStagePredictor::GetUsableThreads() { return Max<ui32>(1, *userPoolSize); } -ui32 TStagePredictor::CalcTasksOptimalCount(const ui32 availableThreadsCount, const std::optional<ui32> previousStageTasksCount) const { +ui32 TStagePredictor::CalcTasksOptimalCount( + const ui32 availableThreadsCount, const std::optional<ui32> previousStageTasksCount, + TVector<TString>& intros) const +{ ui32 result = 0; if (!LevelDataPrediction || *LevelDataPrediction == 0) { - ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "level difficult not defined for correct calculation"; + ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "level difficulty not defined for correct calculation"; result = availableThreadsCount; + intros.push_back("Level difficulty not defined for correct calculation"); } else { result = (availableThreadsCount - previousStageTasksCount.value_or(0) * 0.25) * (InputDataPrediction / *LevelDataPrediction); + intros.push_back("Using level data prediction: (availableThreadsCount - previousStageTasksCount * 0.25) * (InputDataPrediction / LevelDataPrediction) = " + ToString(result)); + intros.push_back("(availableThreadsCount = " + ToString(availableThreadsCount) + ")"); + intros.push_back("(previousStageTasksCount = " + ToString(previousStageTasksCount.value_or(0)) + ")"); + intros.push_back("(InputDataPrediction = " + ToString(InputDataPrediction) + ")"); + intros.push_back("(LevelDataPrediction = " + ToString(*LevelDataPrediction) + ")"); } if (previousStageTasksCount) { result = std::min<ui32>(result, *previousStageTasksCount); + intros.push_back("Shrinking result to previous stage tasks count - " + ToString(result)); } return std::max<ui32>(1, result); } diff --git a/ydb/core/kqp/query_data/kqp_predictor.h b/ydb/core/kqp/query_data/kqp_predictor.h index d142ed267f8..b3b16d59586 100644 --- a/ydb/core/kqp/query_data/kqp_predictor.h +++ b/ydb/core/kqp/query_data/kqp_predictor.h @@ -22,7 +22,7 @@ private: YDB_READONLY_FLAG(HasCondense, false); YDB_READONLY(ui32, NodesCount, 0); - + YDB_READONLY_FLAG(HasSort, false); YDB_READONLY_FLAG(HasMapJoin, false); YDB_READONLY_FLAG(HasUdf, false); @@ -45,7 +45,8 @@ public: bool DeserializeFromKqpSettings(const NYql::NDqProto::TProgram::TSettings& kqpProto); static ui32 GetUsableThreads(); bool NeedLLVM() const; - ui32 CalcTasksOptimalCount(const ui32 availableThreadsCount, const std::optional<ui32> previousStageTasksCount) const; + ui32 CalcTasksOptimalCount(const ui32 availableThreadsCount, const std::optional<ui32> previousStageTasksCount, + TVector<TString>& introspections) const; }; } diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto index 2137df7bb31..e249f2b0369 100644 --- a/ydb/library/yql/dq/actors/protos/dq_stats.proto +++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto @@ -407,6 +407,7 @@ message TDqStageStats { uint32 TotalTasksCount = 5; uint32 FailedTasksCount = 6; uint32 FinishedTasksCount = 50; + repeated string Introspections = 52; // the human-readable reasons for choosing this exact number of tasks TDqStatsAggr CpuTimeUs = 8; TDqStatsAggr SourceCpuTimeUs = 25; diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h index 1b8de608d5c..668e8f4391d 100644 --- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h +++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h @@ -66,6 +66,8 @@ struct TStageInfo : private TMoveOnly { ui32 InputsCount = 0; ui32 OutputsCount = 0; + // Describes step-by-step the decisions leading to this exact number of tasks - in human-readable way. + TVector<TString> Introspections; // TODO(#20120): maybe find a better place? TVector<ui64> Tasks; TStageInfoMeta Meta; |