aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHor911 <hor911@ydb.tech>2025-02-24 18:09:35 +0300
committerGitHub <noreply@github.com>2025-02-24 18:09:35 +0300
commit68cd15551f08a13b4b0f83b6b5c439cd357420dc (patch)
tree55a564ab5ed9db2cfbec2cc3bfc4736a7bb80c4f
parent8f70b94bf3d46ec1414716d69ecd70b5b5a42491 (diff)
downloadydb-68cd15551f08a13b4b0f83b6b5c439cd357420dc.tar.gz
Report finished tasks per stage (#14934)
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_stats.cpp25
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_stats.h11
-rw-r--r--ydb/core/kqp/executer_actor/kqp_literal_executer.cpp2
-rw-r--r--ydb/core/kqp/opt/kqp_query_plan.cpp1
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_stats.proto1
-rw-r--r--ydb/public/lib/ydb_cli/common/plan2svg.cpp18
-rw-r--r--ydb/public/lib/ydb_cli/common/plan2svg.h1
8 files changed, 48 insertions, 14 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 0c498d0562..59c4c9623c 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -428,7 +428,7 @@ protected:
if (state.HasStats()) {
ui64 cycleCount = GetCycleCountFast();
- Stats->UpdateTaskStats(taskId, state.GetStats());
+ Stats->UpdateTaskStats(taskId, state.GetStats(), (NYql::NDqProto::EComputeState) state.GetState());
if (Request.ProgressStatsPeriod) {
auto now = TInstant::Now();
if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
@@ -471,6 +471,7 @@ protected:
Stats->AddComputeActorStats(
computeActor.NodeId(),
std::move(*state.MutableStats()),
+ (NYql::NDqProto::EComputeState) state.GetState(),
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
index 360c39ffde..3cf3dfbf60 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
@@ -213,6 +213,7 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
for (auto& [_, a] : Aggregations) a.Resize(taskCount);
MaxMemoryUsage.Resize(taskCount);
+ Finished.resize(taskCount);
}
void TStageExecutionStats::SetHistorySampleCount(ui32 historySampleCount) {
@@ -320,7 +321,7 @@ ui64 TStageExecutionStats::UpdateAsyncStats(ui32 index, TAsyncStats& aggrAsyncSt
return baseTimeMs;
}
-ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs) {
+ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, NYql::NDqProto::EComputeState state, ui64 maxMemoryUsage, ui64 durationUs) {
auto taskId = taskStats.GetTaskId();
auto it = Task2Index.find(taskId);
ui64 baseTimeMs = 0;
@@ -339,6 +340,13 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
index = it->second;
}
+ if (state == NYql::NDqProto::COMPUTE_STATE_FINISHED) {
+ if (!Finished[index]) {
+ Finished[index] = true;
+ FinishedCount++;
+ }
+ }
+
CpuTimeUs.SetNonZero(index, taskStats.GetCpuTimeUs());
SetNonZero(SourceCpuTimeUs[index], taskStats.GetSourceCpuTimeUs());
@@ -661,11 +669,15 @@ ui64 TQueryExecutionStats::EstimateFinishMem() {
void TQueryExecutionStats::AddComputeActorFullStatsByTask(
const NYql::NDqProto::TDqTaskStats& task,
- const NYql::NDqProto::TDqComputeActorStats& stats
+ const NYql::NDqProto::TDqComputeActorStats& stats,
+ NYql::NDqProto::EComputeState state
) {
auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result);
stageStats->SetTotalTasksCount(stageStats->GetTotalTasksCount() + 1);
+ if (state == NYql::NDqProto::COMPUTE_STATE_FINISHED) {
+ stageStats->SetFinishedTasksCount(stageStats->GetFinishedTasksCount() + 1);
+ }
UpdateAggr(stageStats->MutableMaxMemoryUsage(), stats.GetMaxMemoryUsage()); // only 1 task per CA now
UpdateAggr(stageStats->MutableCpuTimeUs(), task.GetCpuTimeUs());
UpdateAggr(stageStats->MutableSourceCpuTimeUs(), task.GetSourceCpuTimeUs());
@@ -782,7 +794,7 @@ void TQueryExecutionStats::AddComputeActorProfileStatsByTask(
}
void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProto::TDqComputeActorStats&& stats,
- TDuration collectLongTaskStatsTimeout) {
+ NYql::NDqProto::EComputeState state, TDuration collectLongTaskStatsTimeout) {
// Cerr << (TStringBuilder() << "::AddComputeActorStats " << stats.DebugString() << Endl);
Result->SetCpuTimeUs(Result->GetCpuTimeUs() + stats.GetCpuTimeUs());
@@ -847,7 +859,7 @@ void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProt
if (CollectFullStats(StatsMode)) {
for (const auto& task : stats.GetTasks()) {
- AddComputeActorFullStatsByTask(task, stats);
+ AddComputeActorFullStatsByTask(task, stats, state);
}
}
@@ -1049,7 +1061,7 @@ void TQueryExecutionStats::AddBufferStats(NYql::NDqProto::TDqTaskStats&& taskSta
}
}
-void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats) {
+void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats, NYql::NDqProto::EComputeState state) {
Y_ASSERT(stats.GetTasks().size() == 1);
const NYql::NDqProto::TDqTaskStats& taskStats = stats.GetTasks(0);
Y_ASSERT(taskStats.GetTaskId() == taskId);
@@ -1059,7 +1071,7 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD
it->second.StageId = TasksGraph->GetTask(taskStats.GetTaskId()).StageId;
it->second.SetHistorySampleCount(HistorySampleCount);
}
- BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, stats.GetMaxMemoryUsage(), stats.GetDurationUs()));
+ BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, state, stats.GetMaxMemoryUsage(), stats.GetDurationUs()));
}
void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) {
@@ -1219,6 +1231,7 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
for (auto& [stageId, stageStat] : StageStats) {
auto& stageStats = *protoStages[stageStat.StageId.StageId];
stageStats.SetTotalTasksCount(stageStat.Task2Index.size());
+ stageStats.SetFinishedTasksCount(stageStat.FinishedCount);
stageStats.SetBaseTimeMs(BaseTimeMs);
stageStat.CpuTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableCpuTimeUs());
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h
index 85f6ca778b..72f3df581b 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h
@@ -3,6 +3,7 @@
#include "kqp_tasks_graph.h"
#include <util/generic/vector.h>
#include <ydb/library/yql/dq/actors/protos/dq_stats.pb.h>
+#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/core/protos/query_stats.pb.h>
#include <ydb/library/yql/dq/runtime/dq_tasks_counters.h>
@@ -189,6 +190,8 @@ struct TStageExecutionStats {
ui32 HistorySampleCount = 0;
ui32 TaskCount = 0;
+ std::vector<bool> Finished;
+ ui32 FinishedCount = 0;
void Resize(ui32 taskCount);
ui32 EstimateMem() {
@@ -201,7 +204,7 @@ struct TStageExecutionStats {
void SetHistorySampleCount(ui32 historySampleCount);
void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStageStats& stageStats);
ui64 UpdateAsyncStats(ui32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats);
- ui64 UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs);
+ ui64 UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, NYql::NDqProto::EComputeState state, ui64 maxMemoryUsage, ui64 durationUs);
};
struct TExternalPartitionStat {
@@ -277,6 +280,7 @@ public:
void AddComputeActorStats(
ui32 nodeId,
NYql::NDqProto::TDqComputeActorStats&& stats,
+ NYql::NDqProto::EComputeState state,
TDuration collectLongTaskStatsTimeout = TDuration::Max()
);
void AddNodeShardsCount(const ui32 stageId, const ui32 nodeId, const ui32 shardsCount) {
@@ -295,7 +299,7 @@ public:
void AddDatashardStats(NKikimrQueryStats::TTxStats&& txStats);
void AddBufferStats(NYql::NDqProto::TDqTaskStats&& taskStats);
- void UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats);
+ void UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats, NYql::NDqProto::EComputeState state);
void ExportExecStats(NYql::NDqProto::TDqExecutionStats& stats);
void FillStageDurationUs(NYql::NDqProto::TDqStageStats& stats);
ui64 EstimateCollectMem();
@@ -305,7 +309,8 @@ public:
private:
void AddComputeActorFullStatsByTask(
const NYql::NDqProto::TDqTaskStats& task,
- const NYql::NDqProto::TDqComputeActorStats& stats);
+ const NYql::NDqProto::TDqComputeActorStats& stats,
+ NYql::NDqProto::EComputeState state);
void AddComputeActorProfileStatsByTask(
const NYql::NDqProto::TDqTaskStats& task,
const NYql::NDqProto::TDqComputeActorStats& stats,
diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
index 4cc631ed8a..06cedd02a7 100644
--- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
@@ -251,7 +251,7 @@ public:
fakeComputeActorStats.SetDurationUs(elapsedMicros);
- Stats->AddComputeActorStats(OwnerActor.NodeId(), std::move(fakeComputeActorStats));
+ Stats->AddComputeActorStats(OwnerActor.NodeId(), std::move(fakeComputeActorStats), NYql::NDqProto::COMPUTE_STATE_FINISHED);
Stats->ExecuterCpuTime = executerCpuTime;
Stats->FinishTs = Stats->StartTs + TDuration::MicroSeconds(elapsedMicros);
diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp
index f915f59c71..2010a95ac3 100644
--- a/ydb/core/kqp/opt/kqp_query_plan.cpp
+++ b/ydb/core/kqp/opt/kqp_query_plan.cpp
@@ -2845,6 +2845,7 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
stats["PhysicalStageId"] = (*stat)->GetStageId();
stats["Tasks"] = (*stat)->GetTotalTasksCount();
+ stats["FinishedTasks"] = (*stat)->GetFinishedTasksCount();
stats["StageDurationUs"] = (*stat)->GetStageDurationUs();
diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto
index 4b8b97d028..fb48712613 100644
--- a/ydb/library/yql/dq/actors/protos/dq_stats.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto
@@ -401,6 +401,7 @@ message TDqStageStats {
uint32 TotalTasksCount = 5;
uint32 FailedTasksCount = 6;
+ uint32 FinishedTasksCount = 50;
TDqStatsAggr CpuTimeUs = 8;
TDqStatsAggr SourceCpuTimeUs = 25;
diff --git a/ydb/public/lib/ydb_cli/common/plan2svg.cpp b/ydb/public/lib/ydb_cli/common/plan2svg.cpp
index 04b77e9f64..91994a72d9 100644
--- a/ydb/public/lib/ydb_cli/common/plan2svg.cpp
+++ b/ydb/public/lib/ydb_cli/common/plan2svg.cpp
@@ -850,6 +850,9 @@ void TPlan::LoadStage(std::shared_ptr<TStage> stage, const NJson::TJsonValue& no
stage->Tasks = tasksNode->GetIntegerSafe();
Tasks += stage->Tasks;
}
+ if (auto* finishedTasksNode = stage->StatsNode->GetValueByPath("FinishedTasks")) {
+ stage->FinishedTasks = finishedTasksNode->GetIntegerSafe();
+ }
if (auto* physicalStageIdNode = stage->StatsNode->GetValueByPath("PhysicalStageId")) {
stage->PhysicalStageId = physicalStageIdNode->GetIntegerSafe();
@@ -1554,11 +1557,20 @@ void TPlan::PrintSvg(ui64 maxTime, ui32& offsetY, TStringBuilder& background, TS
<< "' y='" << s->OffsetY + s->Height / 2 + offsetY + INTERNAL_TEXT_HEIGHT / 2 << "'>" << s->Tasks << "</text>" << Endl
<< "</g>" << Endl;
} else {
- canvas
- << "<g><title>Stage " << s->PhysicalStageId << ", tasks: " << taskCount << "</title>" << Endl
+ canvas
+ << "<g><title>Stage " << s->PhysicalStageId << ", tasks: " << s->Tasks << ", finished: " << s->FinishedTasks << "</title>" << Endl;
+ if (s->FinishedTasks && s->FinishedTasks <= s->Tasks) {
+ auto finishedHeight = s->Height * s->FinishedTasks / s->Tasks;
+ auto xx = Config.TaskLeft + Config.TaskWidth - Config.TaskWidth / 8;
+ canvas
+ << "<line x1='" << xx << "' y1='" << s->OffsetY + offsetY + s->Height - finishedHeight
+ << "' x2='" << xx << "' y2='" << s->OffsetY + offsetY + s->Height
+ << "' stroke-width='" << Config.TaskWidth / 4 << "' stroke='" << Config.Palette.StageClone << "' stroke-dasharray='1,1' />" << Endl;
+ }
+ canvas
<< " <text text-anchor='end' font-family='Verdana' font-size='" << INTERNAL_TEXT_HEIGHT << "px' fill='" << Config.Palette.StageText
<< "' x='" << Config.TaskLeft + Config.TaskWidth - 2
- << "' y='" << s->OffsetY + s->Height / 2 + offsetY + INTERNAL_TEXT_HEIGHT / 2 << "'>" << taskCount << "</text>" << Endl
+ << "' y='" << s->OffsetY + s->Height / 2 + offsetY + INTERNAL_TEXT_HEIGHT / 2 << "'>" << s->Tasks << "</text>" << Endl
<< "</g>" << Endl;
}
diff --git a/ydb/public/lib/ydb_cli/common/plan2svg.h b/ydb/public/lib/ydb_cli/common/plan2svg.h
index 1ca30760c7..d452c1e492 100644
--- a/ydb/public/lib/ydb_cli/common/plan2svg.h
+++ b/ydb/public/lib/ydb_cli/common/plan2svg.h
@@ -155,6 +155,7 @@ public:
ui32 PlanNodeId = 0;
ui32 PhysicalStageId = 0;
ui32 Tasks = 0;
+ ui32 FinishedTasks = 0;
const NJson::TJsonValue* StatsNode = nullptr;
ui64 MinTime = 0;
ui64 MaxTime = 0;