diff options
author | Hor911 <hor911@ydb.tech> | 2025-02-24 18:09:35 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-24 18:09:35 +0300 |
commit | 68cd15551f08a13b4b0f83b6b5c439cd357420dc (patch) | |
tree | 55a564ab5ed9db2cfbec2cc3bfc4736a7bb80c4f | |
parent | 8f70b94bf3d46ec1414716d69ecd70b5b5a42491 (diff) | |
download | ydb-68cd15551f08a13b4b0f83b6b5c439cd357420dc.tar.gz |
Report finished tasks per stage (#14934)
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_stats.cpp | 25 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_stats.h | 11 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_literal_executer.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_query_plan.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/protos/dq_stats.proto | 1 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/plan2svg.cpp | 18 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/plan2svg.h | 1 |
8 files changed, 48 insertions, 14 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 0c498d0562..59c4c9623c 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -428,7 +428,7 @@ protected: if (state.HasStats()) { ui64 cycleCount = GetCycleCountFast(); - Stats->UpdateTaskStats(taskId, state.GetStats()); + Stats->UpdateTaskStats(taskId, state.GetStats(), (NYql::NDqProto::EComputeState) state.GetState()); if (Request.ProgressStatsPeriod) { auto now = TInstant::Now(); if (LastProgressStats + Request.ProgressStatsPeriod <= now) { @@ -471,6 +471,7 @@ protected: Stats->AddComputeActorStats( computeActor.NodeId(), std::move(*state.MutableStats()), + (NYql::NDqProto::EComputeState) state.GetState(), TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs()) ); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp index 360c39ffde..3cf3dfbf60 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp @@ -213,6 +213,7 @@ void TStageExecutionStats::Resize(ui32 taskCount) { for (auto& [_, a] : Aggregations) a.Resize(taskCount); MaxMemoryUsage.Resize(taskCount); + Finished.resize(taskCount); } void TStageExecutionStats::SetHistorySampleCount(ui32 historySampleCount) { @@ -320,7 +321,7 @@ ui64 TStageExecutionStats::UpdateAsyncStats(ui32 index, TAsyncStats& aggrAsyncSt return baseTimeMs; } -ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs) { +ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, NYql::NDqProto::EComputeState state, ui64 maxMemoryUsage, ui64 durationUs) { auto taskId = taskStats.GetTaskId(); auto it = Task2Index.find(taskId); ui64 baseTimeMs = 0; @@ -339,6 +340,13 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS index = it->second; } + if (state == NYql::NDqProto::COMPUTE_STATE_FINISHED) { + if (!Finished[index]) { + Finished[index] = true; + FinishedCount++; + } + } + CpuTimeUs.SetNonZero(index, taskStats.GetCpuTimeUs()); SetNonZero(SourceCpuTimeUs[index], taskStats.GetSourceCpuTimeUs()); @@ -661,11 +669,15 @@ ui64 TQueryExecutionStats::EstimateFinishMem() { void TQueryExecutionStats::AddComputeActorFullStatsByTask( const NYql::NDqProto::TDqTaskStats& task, - const NYql::NDqProto::TDqComputeActorStats& stats + const NYql::NDqProto::TDqComputeActorStats& stats, + NYql::NDqProto::EComputeState state ) { auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result); stageStats->SetTotalTasksCount(stageStats->GetTotalTasksCount() + 1); + if (state == NYql::NDqProto::COMPUTE_STATE_FINISHED) { + stageStats->SetFinishedTasksCount(stageStats->GetFinishedTasksCount() + 1); + } UpdateAggr(stageStats->MutableMaxMemoryUsage(), stats.GetMaxMemoryUsage()); // only 1 task per CA now UpdateAggr(stageStats->MutableCpuTimeUs(), task.GetCpuTimeUs()); UpdateAggr(stageStats->MutableSourceCpuTimeUs(), task.GetSourceCpuTimeUs()); @@ -782,7 +794,7 @@ void TQueryExecutionStats::AddComputeActorProfileStatsByTask( } void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProto::TDqComputeActorStats&& stats, - TDuration collectLongTaskStatsTimeout) { + NYql::NDqProto::EComputeState state, TDuration collectLongTaskStatsTimeout) { // Cerr << (TStringBuilder() << "::AddComputeActorStats " << stats.DebugString() << Endl); Result->SetCpuTimeUs(Result->GetCpuTimeUs() + stats.GetCpuTimeUs()); @@ -847,7 +859,7 @@ void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProt if (CollectFullStats(StatsMode)) { for (const auto& task : stats.GetTasks()) { - AddComputeActorFullStatsByTask(task, stats); + AddComputeActorFullStatsByTask(task, stats, state); } } @@ -1049,7 +1061,7 @@ void TQueryExecutionStats::AddBufferStats(NYql::NDqProto::TDqTaskStats&& taskSta } } -void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats) { +void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats, NYql::NDqProto::EComputeState state) { Y_ASSERT(stats.GetTasks().size() == 1); const NYql::NDqProto::TDqTaskStats& taskStats = stats.GetTasks(0); Y_ASSERT(taskStats.GetTaskId() == taskId); @@ -1059,7 +1071,7 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD it->second.StageId = TasksGraph->GetTask(taskStats.GetTaskId()).StageId; it->second.SetHistorySampleCount(HistorySampleCount); } - BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, stats.GetMaxMemoryUsage(), stats.GetDurationUs())); + BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, state, stats.GetMaxMemoryUsage(), stats.GetDurationUs())); } void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) { @@ -1219,6 +1231,7 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st for (auto& [stageId, stageStat] : StageStats) { auto& stageStats = *protoStages[stageStat.StageId.StageId]; stageStats.SetTotalTasksCount(stageStat.Task2Index.size()); + stageStats.SetFinishedTasksCount(stageStat.FinishedCount); stageStats.SetBaseTimeMs(BaseTimeMs); stageStat.CpuTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableCpuTimeUs()); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h index 85f6ca778b..72f3df581b 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h @@ -3,6 +3,7 @@ #include "kqp_tasks_graph.h" #include <util/generic/vector.h> #include <ydb/library/yql/dq/actors/protos/dq_stats.pb.h> +#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/core/protos/query_stats.pb.h> #include <ydb/library/yql/dq/runtime/dq_tasks_counters.h> @@ -189,6 +190,8 @@ struct TStageExecutionStats { ui32 HistorySampleCount = 0; ui32 TaskCount = 0; + std::vector<bool> Finished; + ui32 FinishedCount = 0; void Resize(ui32 taskCount); ui32 EstimateMem() { @@ -201,7 +204,7 @@ struct TStageExecutionStats { void SetHistorySampleCount(ui32 historySampleCount); void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStageStats& stageStats); ui64 UpdateAsyncStats(ui32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats); - ui64 UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs); + ui64 UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, NYql::NDqProto::EComputeState state, ui64 maxMemoryUsage, ui64 durationUs); }; struct TExternalPartitionStat { @@ -277,6 +280,7 @@ public: void AddComputeActorStats( ui32 nodeId, NYql::NDqProto::TDqComputeActorStats&& stats, + NYql::NDqProto::EComputeState state, TDuration collectLongTaskStatsTimeout = TDuration::Max() ); void AddNodeShardsCount(const ui32 stageId, const ui32 nodeId, const ui32 shardsCount) { @@ -295,7 +299,7 @@ public: void AddDatashardStats(NKikimrQueryStats::TTxStats&& txStats); void AddBufferStats(NYql::NDqProto::TDqTaskStats&& taskStats); - void UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats); + void UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats, NYql::NDqProto::EComputeState state); void ExportExecStats(NYql::NDqProto::TDqExecutionStats& stats); void FillStageDurationUs(NYql::NDqProto::TDqStageStats& stats); ui64 EstimateCollectMem(); @@ -305,7 +309,8 @@ public: private: void AddComputeActorFullStatsByTask( const NYql::NDqProto::TDqTaskStats& task, - const NYql::NDqProto::TDqComputeActorStats& stats); + const NYql::NDqProto::TDqComputeActorStats& stats, + NYql::NDqProto::EComputeState state); void AddComputeActorProfileStatsByTask( const NYql::NDqProto::TDqTaskStats& task, const NYql::NDqProto::TDqComputeActorStats& stats, diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp index 4cc631ed8a..06cedd02a7 100644 --- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp @@ -251,7 +251,7 @@ public: fakeComputeActorStats.SetDurationUs(elapsedMicros); - Stats->AddComputeActorStats(OwnerActor.NodeId(), std::move(fakeComputeActorStats)); + Stats->AddComputeActorStats(OwnerActor.NodeId(), std::move(fakeComputeActorStats), NYql::NDqProto::COMPUTE_STATE_FINISHED); Stats->ExecuterCpuTime = executerCpuTime; Stats->FinishTs = Stats->StartTs + TDuration::MicroSeconds(elapsedMicros); diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index f915f59c71..2010a95ac3 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -2845,6 +2845,7 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD stats["PhysicalStageId"] = (*stat)->GetStageId(); stats["Tasks"] = (*stat)->GetTotalTasksCount(); + stats["FinishedTasks"] = (*stat)->GetFinishedTasksCount(); stats["StageDurationUs"] = (*stat)->GetStageDurationUs(); diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto index 4b8b97d028..fb48712613 100644 --- a/ydb/library/yql/dq/actors/protos/dq_stats.proto +++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto @@ -401,6 +401,7 @@ message TDqStageStats { uint32 TotalTasksCount = 5; uint32 FailedTasksCount = 6; + uint32 FinishedTasksCount = 50; TDqStatsAggr CpuTimeUs = 8; TDqStatsAggr SourceCpuTimeUs = 25; diff --git a/ydb/public/lib/ydb_cli/common/plan2svg.cpp b/ydb/public/lib/ydb_cli/common/plan2svg.cpp index 04b77e9f64..91994a72d9 100644 --- a/ydb/public/lib/ydb_cli/common/plan2svg.cpp +++ b/ydb/public/lib/ydb_cli/common/plan2svg.cpp @@ -850,6 +850,9 @@ void TPlan::LoadStage(std::shared_ptr<TStage> stage, const NJson::TJsonValue& no stage->Tasks = tasksNode->GetIntegerSafe(); Tasks += stage->Tasks; } + if (auto* finishedTasksNode = stage->StatsNode->GetValueByPath("FinishedTasks")) { + stage->FinishedTasks = finishedTasksNode->GetIntegerSafe(); + } if (auto* physicalStageIdNode = stage->StatsNode->GetValueByPath("PhysicalStageId")) { stage->PhysicalStageId = physicalStageIdNode->GetIntegerSafe(); @@ -1554,11 +1557,20 @@ void TPlan::PrintSvg(ui64 maxTime, ui32& offsetY, TStringBuilder& background, TS << "' y='" << s->OffsetY + s->Height / 2 + offsetY + INTERNAL_TEXT_HEIGHT / 2 << "'>" << s->Tasks << "</text>" << Endl << "</g>" << Endl; } else { - canvas - << "<g><title>Stage " << s->PhysicalStageId << ", tasks: " << taskCount << "</title>" << Endl + canvas + << "<g><title>Stage " << s->PhysicalStageId << ", tasks: " << s->Tasks << ", finished: " << s->FinishedTasks << "</title>" << Endl; + if (s->FinishedTasks && s->FinishedTasks <= s->Tasks) { + auto finishedHeight = s->Height * s->FinishedTasks / s->Tasks; + auto xx = Config.TaskLeft + Config.TaskWidth - Config.TaskWidth / 8; + canvas + << "<line x1='" << xx << "' y1='" << s->OffsetY + offsetY + s->Height - finishedHeight + << "' x2='" << xx << "' y2='" << s->OffsetY + offsetY + s->Height + << "' stroke-width='" << Config.TaskWidth / 4 << "' stroke='" << Config.Palette.StageClone << "' stroke-dasharray='1,1' />" << Endl; + } + canvas << " <text text-anchor='end' font-family='Verdana' font-size='" << INTERNAL_TEXT_HEIGHT << "px' fill='" << Config.Palette.StageText << "' x='" << Config.TaskLeft + Config.TaskWidth - 2 - << "' y='" << s->OffsetY + s->Height / 2 + offsetY + INTERNAL_TEXT_HEIGHT / 2 << "'>" << taskCount << "</text>" << Endl + << "' y='" << s->OffsetY + s->Height / 2 + offsetY + INTERNAL_TEXT_HEIGHT / 2 << "'>" << s->Tasks << "</text>" << Endl << "</g>" << Endl; } diff --git a/ydb/public/lib/ydb_cli/common/plan2svg.h b/ydb/public/lib/ydb_cli/common/plan2svg.h index 1ca30760c7..d452c1e492 100644 --- a/ydb/public/lib/ydb_cli/common/plan2svg.h +++ b/ydb/public/lib/ydb_cli/common/plan2svg.h @@ -155,6 +155,7 @@ public: ui32 PlanNodeId = 0; ui32 PhysicalStageId = 0; ui32 Tasks = 0; + ui32 FinishedTasks = 0; const NJson::TJsonValue* StatsNode = nullptr; ui64 MinTime = 0; ui64 MaxTime = 0; |