aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHor911 <hor911@ydb.tech>2024-07-18 15:37:39 +0300
committerGitHub <noreply@github.com>2024-07-18 15:37:39 +0300
commitab98fb1b80564ef7bf1ca42c0bae0b14df408a32 (patch)
tree6a979fc0b3b2f8b6dad31a84a344dc28f70b5540
parent2d8ae71907f4da2c07c941ea8efa277d4680d216 (diff)
downloadydb-ab98fb1b80564ef7bf1ca42c0bae0b14df408a32.tar.gz
Time series metrics + relative time in stat (#6805)
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_stats.cpp438
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_stats.h43
-rw-r--r--ydb/core/kqp/opt/kqp_query_plan.cpp11
3 files changed, 420 insertions, 72 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
index 33fbb0a001..ff5f0be7f9 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
@@ -6,8 +6,99 @@ namespace NKikimr::NKqp {
using namespace NYql;
using namespace NYql::NDq;
+void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsAggr& stats);
+
+ui64 NonZeroMin(ui64 a, ui64 b) {
+ return (b == 0) ? a : ((a == 0 || a > b) ? b : a);
+}
+
+void TTimeSeriesStats::ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats) {
+ NKikimr::NKqp::ExportAggStats(Values, stats);
+ ExportHistory(baseTimeMs, stats);
+}
+
+void TTimeSeriesStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats) {
+ Pack();
+ if (!History.empty()) {
+ for (auto& h : History) {
+ auto& item = *stats.AddHistory();
+ item.SetTimeMs((h.first <= baseTimeMs) ? 0 : (h.first - baseTimeMs));
+ item.SetValue(h.second);
+ }
+ }
+}
+
+void TTimeSeriesStats::Resize(ui32 taskCount) {
+ Values.resize(taskCount);
+}
+
+void TTimeSeriesStats::SetNonZero(ui32 taskIndex, ui64 value) {
+ if (value) {
+ Sum += value;
+ Sum -= Values[taskIndex];
+ Values[taskIndex] = value;
+ }
+ if (HistorySampleCount) {
+ auto nowMs = Now().MilliSeconds();
+
+ if (!History.empty() && History.back().first == nowMs) {
+ History.back().second = Sum;
+ return;
+ }
+
+ if (History.size() > 1 && History.back().second == Sum && History[History.size() - 2].second == Sum) {
+ History.back().first = nowMs;
+ return;
+ }
+
+ History.emplace_back(nowMs, Sum);
+ if (History.size() >= HistorySampleCount * 2) {
+ Pack();
+ }
+ }
+}
+
+void TTimeSeriesStats::Pack() {
+ if (HistorySampleCount == 0) {
+ History.clear();
+ return;
+ }
+ if (History.size() > HistorySampleCount) {
+
+ if (HistorySampleCount == 1) {
+ History.front() = History.back();
+ return;
+ }
+ if (HistorySampleCount == 2) {
+ History[1] = History.back();
+ History.resize(2);
+ return;
+ }
+
+ std::vector<std::pair<ui64, ui64>> history;
+ ui32 count = History.size();
+ ui32 delta = count - HistorySampleCount;
+ ui64 minTime = History.front().first;
+ ui64 maxTime = History.back().first;
+ ui64 deltaTime = (maxTime - minTime) / (HistorySampleCount - 1);
+ bool first = true;
+ ui64 nextTime = minTime;
+ for (auto& h : History) {
+ if (!first && delta && ((h.first < nextTime) || (delta + 1 == count))) {
+ delta--;
+ } else {
+ history.push_back(h);
+ nextTime += deltaTime;
+ first = false;
+ }
+ count--;
+ }
+ History.swap(history);
+ }
+}
+
void TAsyncStats::Resize(ui32 taskCount) {
- Bytes.resize(taskCount);
+ Bytes.Resize(taskCount);
DecompressedBytes.resize(taskCount);
Rows.resize(taskCount);
Chunks.resize(taskCount);
@@ -16,11 +107,25 @@ void TAsyncStats::Resize(ui32 taskCount) {
PauseMessageMs.resize(taskCount);
ResumeMessageMs.resize(taskCount);
LastMessageMs.resize(taskCount);
- WaitTimeUs.resize(taskCount);
+ WaitTimeUs.Resize(taskCount);
WaitPeriods.resize(taskCount);
ActiveTimeUs.resize(taskCount);
}
+void TAsyncStats::SetHistorySampleCount(ui32 historySampleCount) {
+ Bytes.HistorySampleCount = historySampleCount;
+ WaitTimeUs.HistorySampleCount = historySampleCount;
+}
+
+void TAsyncStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncStatsAggr& stats) {
+ if (stats.HasBytes()) {
+ Bytes.ExportHistory(baseTimeMs, *stats.MutableBytes());
+ }
+ if (stats.HasWaitTimeUs()) {
+ WaitTimeUs.ExportHistory(baseTimeMs, *stats.MutableWaitTimeUs());
+ }
+}
+
void TAsyncBufferStats::Resize(ui32 taskCount) {
Ingress.Resize(taskCount);
Push.Resize(taskCount);
@@ -28,6 +133,28 @@ void TAsyncBufferStats::Resize(ui32 taskCount) {
Egress.Resize(taskCount);
}
+void TAsyncBufferStats::SetHistorySampleCount(ui32 historySampleCount) {
+ Ingress.SetHistorySampleCount(historySampleCount);
+ Push.SetHistorySampleCount(historySampleCount);
+ Pop.SetHistorySampleCount(historySampleCount);
+ Egress.SetHistorySampleCount(historySampleCount);
+}
+
+void TAsyncBufferStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
+ if (stats.HasIngress()) {
+ Ingress.ExportHistory(baseTimeMs, *stats.MutableIngress());
+ }
+ if (stats.HasPush()) {
+ Push.ExportHistory(baseTimeMs, *stats.MutablePush());
+ }
+ if (stats.HasPop()) {
+ Pop.ExportHistory(baseTimeMs, *stats.MutablePop());
+ }
+ if (stats.HasEgress()) {
+ Egress.ExportHistory(baseTimeMs, *stats.MutableEgress());
+ }
+}
+
void TTableStats::Resize(ui32 taskCount) {
ReadRows.resize(taskCount);
ReadBytes.resize(taskCount);
@@ -39,8 +166,8 @@ void TTableStats::Resize(ui32 taskCount) {
}
void TStageExecutionStats::Resize(ui32 taskCount) {
- CpuTimeUs.resize(taskCount);
- SourceCpuTimeUs.resize(taskCount);
+ CpuTimeUs.Resize(taskCount);
+ SourceCpuTimeUs.Resize(taskCount);
InputRows.resize(taskCount);
InputBytes.resize(taskCount);
@@ -61,11 +188,54 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
WaitOutputTimeUs.resize(taskCount);
for (auto& p : Ingress) p.second.Resize(taskCount);
- for (auto& p : Egress) p.second.Resize(taskCount);
- for (auto& p : Input) p.second.Resize(taskCount);
- for (auto& p : Output) p.second.Resize(taskCount);
+ for (auto& p : Input) p.second.Resize(taskCount);
+ for (auto& p : Output) p.second.Resize(taskCount);
+ for (auto& p : Egress) p.second.Resize(taskCount);
+
+ MaxMemoryUsage.Resize(taskCount);
+}
- MaxMemoryUsage.resize(taskCount);
+void TStageExecutionStats::SetHistorySampleCount(ui32 historySampleCount) {
+ HistorySampleCount = historySampleCount;
+ CpuTimeUs.HistorySampleCount = historySampleCount;
+ SourceCpuTimeUs.HistorySampleCount = historySampleCount;
+ MaxMemoryUsage.HistorySampleCount = historySampleCount;
+}
+
+void TStageExecutionStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStageStats& stageStats) {
+ if (stageStats.HasCpuTimeUs()) {
+ CpuTimeUs.ExportHistory(baseTimeMs, *stageStats.MutableCpuTimeUs());
+ }
+ if (stageStats.HasSourceCpuTimeUs()) {
+ SourceCpuTimeUs.ExportHistory(baseTimeMs, *stageStats.MutableSourceCpuTimeUs());
+ }
+ for (auto& p : *stageStats.MutableIngress()) {
+ auto it = Ingress.find(p.first);
+ if (it != Ingress.end()) {
+ it->second.ExportHistory(baseTimeMs, p.second);
+ }
+ }
+ for (auto& p : *stageStats.MutableInput()) {
+ auto it = Input.find(p.first);
+ if (it != Input.end()) {
+ it->second.ExportHistory(baseTimeMs, p.second);
+ }
+ }
+ for (auto& p : *stageStats.MutableOutput()) {
+ auto it = Output.find(p.first);
+ if (it != Output.end()) {
+ it->second.ExportHistory(baseTimeMs, p.second);
+ }
+ }
+ for (auto& p : *stageStats.MutableEgress()) {
+ auto it = Egress.find(p.first);
+ if (it != Egress.end()) {
+ it->second.ExportHistory(baseTimeMs, p.second);
+ }
+ }
+ if (stageStats.HasMaxMemoryUsage()) {
+ MaxMemoryUsage.ExportHistory(baseTimeMs, *stageStats.MutableMaxMemoryUsage());
+ }
}
void SetNonZero(ui64& target, ui64 source) {
@@ -74,8 +244,10 @@ void SetNonZero(ui64& target, ui64 source) {
}
}
-void TStageExecutionStats::UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats) {
- SetNonZero(aggrAsyncStats.Bytes[index], asyncStats.GetBytes());
+ui64 TStageExecutionStats::UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats) {
+ ui64 baseTimeMs = 0;
+
+ aggrAsyncStats.Bytes.SetNonZero(index, asyncStats.GetBytes());
SetNonZero(aggrAsyncStats.DecompressedBytes[index], asyncStats.GetDecompressedBytes());
SetNonZero(aggrAsyncStats.Rows[index], asyncStats.GetRows());
SetNonZero(aggrAsyncStats.Chunks[index], asyncStats.GetChunks());
@@ -83,20 +255,33 @@ void TStageExecutionStats::UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncSta
auto firstMessageMs = asyncStats.GetFirstMessageMs();
SetNonZero(aggrAsyncStats.FirstMessageMs[index], firstMessageMs);
- SetNonZero(aggrAsyncStats.PauseMessageMs[index], asyncStats.GetPauseMessageMs());
- SetNonZero(aggrAsyncStats.ResumeMessageMs[index], asyncStats.GetResumeMessageMs());
+ baseTimeMs = NonZeroMin(baseTimeMs, firstMessageMs);
+
+ auto pauseMessageMs = asyncStats.GetPauseMessageMs();
+ SetNonZero(aggrAsyncStats.PauseMessageMs[index], pauseMessageMs);
+ baseTimeMs = NonZeroMin(baseTimeMs, pauseMessageMs);
+
+ auto resumeMessageMs = asyncStats.GetResumeMessageMs();
+ SetNonZero(aggrAsyncStats.ResumeMessageMs[index], resumeMessageMs);
+ baseTimeMs = NonZeroMin(baseTimeMs, resumeMessageMs);
+
auto lastMessageMs = asyncStats.GetLastMessageMs();
SetNonZero(aggrAsyncStats.LastMessageMs[index], lastMessageMs);
- SetNonZero(aggrAsyncStats.WaitTimeUs[index], asyncStats.GetWaitTimeUs());
+ baseTimeMs = NonZeroMin(baseTimeMs, lastMessageMs);
+
+ aggrAsyncStats.WaitTimeUs.SetNonZero(index, asyncStats.GetWaitTimeUs());
SetNonZero(aggrAsyncStats.WaitPeriods[index], asyncStats.GetWaitPeriods());
if (firstMessageMs && lastMessageMs > firstMessageMs) {
aggrAsyncStats.ActiveTimeUs[index] = lastMessageMs - firstMessageMs;
}
+
+ return baseTimeMs;
}
-void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs) {
+ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs) {
auto taskId = taskStats.GetTaskId();
auto it = Task2Index.find(taskId);
+ ui64 baseTimeMs = 0;
ui32 taskCount = Task2Index.size();
@@ -109,8 +294,8 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
index = it->second;
}
- SetNonZero(CpuTimeUs[index], taskStats.GetCpuTimeUs());
- SetNonZero(SourceCpuTimeUs[index], taskStats.GetSourceCpuTimeUs());
+ CpuTimeUs.SetNonZero(index, taskStats.GetCpuTimeUs());
+ SourceCpuTimeUs.SetNonZero(index, taskStats.GetSourceCpuTimeUs());
SetNonZero(InputRows[index], taskStats.GetInputRows());
SetNonZero(InputBytes[index], taskStats.GetInputBytes());
@@ -124,8 +309,14 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
SetNonZero(EgressRows[index], taskStats.GetEgressRows());
SetNonZero(EgressBytes[index], taskStats.GetEgressBytes());
- SetNonZero(StartTimeMs[index], taskStats.GetStartTimeMs());
- SetNonZero(FinishTimeMs[index], taskStats.GetFinishTimeMs());
+ auto startTimeMs = taskStats.GetStartTimeMs();
+ SetNonZero(StartTimeMs[index], startTimeMs);
+ baseTimeMs = NonZeroMin(baseTimeMs, startTimeMs);
+
+ auto finishTimeMs = taskStats.GetFinishTimeMs();
+ SetNonZero(FinishTimeMs[index], finishTimeMs);
+ baseTimeMs = NonZeroMin(baseTimeMs, finishTimeMs);
+
SetNonZero(DurationUs[index], durationUs);
SetNonZero(WaitInputTimeUs[index], taskStats.GetWaitInputTimeUs());
SetNonZero(WaitOutputTimeUs[index], taskStats.GetWaitOutputTimeUs());
@@ -148,9 +339,12 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
if (ingressName) {
auto [it, inserted] = Ingress.try_emplace(ingressName, taskCount);
auto& asyncBufferStats = it->second;
- UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress());
- UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush());
- UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop());
+ if (inserted) {
+ asyncBufferStats.SetHistorySampleCount(HistorySampleCount);
+ }
+ baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress()));
+ baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush()));
+ baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop()));
}
}
@@ -158,16 +352,22 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
auto stageId = inputChannelStat.GetSrcStageId();
auto [it, inserted] = Input.try_emplace(stageId, taskCount);
auto& asyncBufferStats = it->second;
- UpdateAsyncStats(index, asyncBufferStats.Push, inputChannelStat.GetPush());
- UpdateAsyncStats(index, asyncBufferStats.Pop, inputChannelStat.GetPop());
+ if (inserted) {
+ asyncBufferStats.SetHistorySampleCount(HistorySampleCount);
+ }
+ baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Push, inputChannelStat.GetPush()));
+ baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Pop, inputChannelStat.GetPop()));
}
for (auto& outputChannelStat : taskStats.GetOutputChannels()) {
auto stageId = outputChannelStat.GetDstStageId();
auto [it, inserted] = Output.try_emplace(stageId, taskCount);
auto& asyncBufferStats = it->second;
- UpdateAsyncStats(index, asyncBufferStats.Push, outputChannelStat.GetPush());
- UpdateAsyncStats(index, asyncBufferStats.Pop, outputChannelStat.GetPop());
+ if (inserted) {
+ asyncBufferStats.SetHistorySampleCount(HistorySampleCount);
+ }
+ baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Push, outputChannelStat.GetPush()));
+ baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Pop, outputChannelStat.GetPop()));
}
for (auto& sinkStat : taskStats.GetSinks()) {
@@ -175,13 +375,18 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
if (egressName) {
auto [it, inserted] = Egress.try_emplace(egressName, taskCount);
auto& asyncBufferStats = it->second;
- UpdateAsyncStats(index, asyncBufferStats.Push, sinkStat.GetPush());
- UpdateAsyncStats(index, asyncBufferStats.Pop, sinkStat.GetPop());
- UpdateAsyncStats(index, asyncBufferStats.Ingress, sinkStat.GetEgress());
+ if (inserted) {
+ asyncBufferStats.SetHistorySampleCount(HistorySampleCount);
+ }
+ baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Push, sinkStat.GetPush()));
+ baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Pop, sinkStat.GetPop()));
+ baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Ingress, sinkStat.GetEgress()));
}
}
- SetNonZero(MaxMemoryUsage[index], maxMemoryUsage);
+ MaxMemoryUsage.SetNonZero(index, maxMemoryUsage);
+
+ return baseTimeMs;
}
namespace {
@@ -225,7 +430,9 @@ struct TAsyncGroupStat {
ui64 Count = 0;
};
-void UpdateAsyncAggr(NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDqAsyncBufferStats& asyncStat) noexcept {
+ui64 UpdateAsyncAggr(NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDqAsyncBufferStats& asyncStat) noexcept {
+ ui64 baseTimeMs = 0;
+
UpdateAggr(asyncAggr.MutableBytes(), asyncStat.GetBytes());
UpdateAggr(asyncAggr.MutableDecompressedBytes(), asyncStat.GetDecompressedBytes());
UpdateAggr(asyncAggr.MutableRows(), asyncStat.GetRows());
@@ -235,16 +442,25 @@ void UpdateAsyncAggr(NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDq
auto firstMessageMs = asyncStat.GetFirstMessageMs();
if (firstMessageMs) {
UpdateAggr(asyncAggr.MutableFirstMessageMs(), firstMessageMs);
+ baseTimeMs = NonZeroMin(baseTimeMs, firstMessageMs);
}
- if (asyncStat.GetPauseMessageMs()) {
- UpdateAggr(asyncAggr.MutablePauseMessageMs(), asyncStat.GetPauseMessageMs());
+
+ auto pauseMessageMs = asyncStat.GetPauseMessageMs();
+ if (pauseMessageMs) {
+ UpdateAggr(asyncAggr.MutablePauseMessageMs(), pauseMessageMs);
+ baseTimeMs = NonZeroMin(baseTimeMs, pauseMessageMs);
}
- if (asyncStat.GetResumeMessageMs()) {
- UpdateAggr(asyncAggr.MutableResumeMessageMs(), asyncStat.GetResumeMessageMs());
+
+ auto resumeMessageMs = asyncStat.GetResumeMessageMs();
+ if (resumeMessageMs) {
+ UpdateAggr(asyncAggr.MutableResumeMessageMs(), resumeMessageMs);
+ baseTimeMs = NonZeroMin(baseTimeMs, resumeMessageMs);
}
+
auto lastMessageMs = asyncStat.GetLastMessageMs();
if (lastMessageMs) {
UpdateAggr(asyncAggr.MutableLastMessageMs(), lastMessageMs);
+ baseTimeMs = NonZeroMin(baseTimeMs, lastMessageMs);
}
UpdateAggr(asyncAggr.MutableWaitTimeUs(), asyncStat.GetWaitTimeUs());
@@ -253,6 +469,8 @@ void UpdateAsyncAggr(NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDq
if (firstMessageMs && lastMessageMs >= firstMessageMs) {
UpdateAggr(asyncAggr.MutableActiveTimeUs(), (lastMessageMs - firstMessageMs) * 1000);
}
+
+ return baseTimeMs;
}
NDqProto::TDqStageStats* GetOrCreateStageStats(const NYql::NDq::TStageId& stageId,
@@ -365,30 +583,36 @@ void TQueryExecutionStats::AddComputeActorFullStatsByTask(
UpdateAggr(stageStats->MutableEgressRows(), task.GetEgressRows());
UpdateAggr(stageStats->MutableEgressBytes(), task.GetEgressBytes());
- UpdateAggr(stageStats->MutableStartTimeMs(), task.GetStartTimeMs());
- UpdateAggr(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs());
+ auto startTimeMs = task.GetStartTimeMs();
+ UpdateAggr(stageStats->MutableStartTimeMs(), startTimeMs);
+ BaseTimeMs = NonZeroMin(BaseTimeMs, startTimeMs);
+
+ auto finishTimeMs = task.GetFinishTimeMs();
+ UpdateAggr(stageStats->MutableFinishTimeMs(), finishTimeMs);
+ BaseTimeMs = NonZeroMin(BaseTimeMs, finishTimeMs);
+
UpdateAggr(stageStats->MutableDurationUs(), stats.GetDurationUs());
UpdateAggr(stageStats->MutableWaitInputTimeUs(), task.GetWaitInputTimeUs());
UpdateAggr(stageStats->MutableWaitOutputTimeUs(), task.GetWaitOutputTimeUs());
FillStageDurationUs(*stageStats);
for (auto& sourcesStat : task.GetSources()) {
- UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutableIngress(), sourcesStat.GetIngress());
- UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePush(), sourcesStat.GetPush());
- UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePop(), sourcesStat.GetPop());
+ BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutableIngress(), sourcesStat.GetIngress()));
+ BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePush(), sourcesStat.GetPush()));
+ BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePop(), sourcesStat.GetPop()));
}
for (auto& inputChannelStat : task.GetInputChannels()) {
- UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePush(), inputChannelStat.GetPush());
- UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePop(), inputChannelStat.GetPop());
+ BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePush(), inputChannelStat.GetPush()));
+ BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePop(), inputChannelStat.GetPop()));
}
for (auto& outputChannelStat : task.GetOutputChannels()) {
- UpdateAsyncAggr(*(*stageStats->MutableOutput())[outputChannelStat.GetDstStageId()].MutablePush(), outputChannelStat.GetPush());
- UpdateAsyncAggr(*(*stageStats->MutableOutput())[outputChannelStat.GetDstStageId()].MutablePop(), outputChannelStat.GetPop());
+ BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableOutput())[outputChannelStat.GetDstStageId()].MutablePush(), outputChannelStat.GetPush()));
+ BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableOutput())[outputChannelStat.GetDstStageId()].MutablePop(), outputChannelStat.GetPop()));
}
for (auto& sinksStat : task.GetSinks()) {
- UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutablePush(), sinksStat.GetPush());
- UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutablePop(), sinksStat.GetPop());
- UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutableEgress(), sinksStat.GetEgress());
+ BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutablePush(), sinksStat.GetPush()));
+ BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutablePop(), sinksStat.GetPop()));
+ BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutableEgress(), sinksStat.GetEgress()));
}
}
@@ -490,6 +714,8 @@ void TQueryExecutionStats::AddDatashardFullStatsByTask(
const NYql::NDqProto::TDqTaskStats& task, ui64 datashardCpuTimeUs) {
auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result);
+ // TODO: dedup with AddComputeActorFullStatsByTask
+
stageStats->SetTotalTasksCount(stageStats->GetTotalTasksCount() + 1);
UpdateAggr(stageStats->MutableCpuTimeUs(), task.GetCpuTimeUs());
UpdateAggr(stageStats->MutableInputRows(), task.GetInputRows());
@@ -497,9 +723,14 @@ void TQueryExecutionStats::AddDatashardFullStatsByTask(
UpdateAggr(stageStats->MutableOutputRows(), task.GetOutputRows());
UpdateAggr(stageStats->MutableOutputBytes(), task.GetOutputBytes());
- UpdateAggr(stageStats->MutableStartTimeMs(), task.GetStartTimeMs());
- UpdateAggr(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs());
- // UpdateAggr(stageStats->MutableDurationUs(), ??? );
+ auto startTimeMs = task.GetStartTimeMs();
+ UpdateAggr(stageStats->MutableStartTimeMs(), startTimeMs);
+ BaseTimeMs = NonZeroMin(BaseTimeMs, startTimeMs);
+
+ auto finishTimeMs = task.GetFinishTimeMs();
+ UpdateAggr(stageStats->MutableFinishTimeMs(), finishTimeMs);
+ BaseTimeMs = NonZeroMin(BaseTimeMs, finishTimeMs);
+
UpdateAggr(stageStats->MutableWaitInputTimeUs(), task.GetWaitInputTimeUs());
UpdateAggr(stageStats->MutableWaitOutputTimeUs(), task.GetWaitOutputTimeUs());
FillStageDurationUs(*stageStats);
@@ -635,8 +866,9 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD
auto [it, inserted] = StageStats.try_emplace(stageId);
if (inserted) {
it->second.StageId = TasksGraph->GetTask(taskStats.GetTaskId()).StageId;
+ it->second.SetHistorySampleCount(HistorySampleCount);
}
- it->second.UpdateStats(taskStats, stats.GetMaxMemoryUsage(), stats.GetDurationUs());
+ BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, stats.GetMaxMemoryUsage(), stats.GetDurationUs()));
}
void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) {
@@ -660,12 +892,13 @@ void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& sta
}
}
-void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsAggr& stats) {
+void ExportOffsetAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsAggr& stats, ui64 offset) {
ui64 count = 0;
ui64 min = 0;
ui64 max = 0;
ui64 sum = 0;
for (auto d : data) {
+ d = (d <= offset) ? 0 : (d - offset);
if (d) {
if (count) {
if (min > d) min = d;
@@ -685,6 +918,10 @@ void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsAggr& stats
}
}
+void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsAggr& stats) {
+ ExportOffsetAggStats(data, stats, 0);
+}
+
ui64 ExportAggStats(std::vector<ui64>& data) {
ui64 sum = 0;
for (auto d : data) {
@@ -693,21 +930,21 @@ ui64 ExportAggStats(std::vector<ui64>& data) {
return sum;
}
-void ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto::TDqAsyncStatsAggr& stats) {
- ExportAggStats(data.Bytes, *stats.MutableBytes());
+void TQueryExecutionStats::ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto::TDqAsyncStatsAggr& stats) {
+ data.Bytes.ExportAggStats(BaseTimeMs, *stats.MutableBytes());
ExportAggStats(data.Rows, *stats.MutableRows());
ExportAggStats(data.Chunks, *stats.MutableChunks());
ExportAggStats(data.Splits, *stats.MutableSplits());
- ExportAggStats(data.FirstMessageMs, *stats.MutableFirstMessageMs());
- ExportAggStats(data.PauseMessageMs, *stats.MutablePauseMessageMs());
- ExportAggStats(data.ResumeMessageMs, *stats.MutableResumeMessageMs());
- ExportAggStats(data.LastMessageMs, *stats.MutableLastMessageMs());
- ExportAggStats(data.WaitTimeUs, *stats.MutableWaitTimeUs());
+ ExportOffsetAggStats(data.FirstMessageMs, *stats.MutableFirstMessageMs(), BaseTimeMs);
+ ExportOffsetAggStats(data.PauseMessageMs, *stats.MutablePauseMessageMs(), BaseTimeMs);
+ ExportOffsetAggStats(data.ResumeMessageMs, *stats.MutableResumeMessageMs(), BaseTimeMs);
+ ExportOffsetAggStats(data.LastMessageMs, *stats.MutableLastMessageMs(), BaseTimeMs);
+ data.WaitTimeUs.ExportAggStats(BaseTimeMs, *stats.MutableWaitTimeUs());
ExportAggStats(data.WaitPeriods, *stats.MutableWaitPeriods());
ExportAggStats(data.ActiveTimeUs, *stats.MutableActiveTimeUs());
}
-void ExportAggAsyncBufferStats(TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
+void TQueryExecutionStats::ExportAggAsyncBufferStats(TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
ExportAggAsyncStats(data.Ingress, *stats.MutableIngress());
ExportAggAsyncStats(data.Push, *stats.MutablePush());
ExportAggAsyncStats(data.Pop, *stats.MutablePop());
@@ -725,8 +962,10 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
auto& stageStats = *protoStages[p.second.StageId.StageId];
stageStats.SetTotalTasksCount(p.second.Task2Index.size());
- ExportAggStats(p.second.CpuTimeUs, *stageStats.MutableCpuTimeUs());
- ExportAggStats(p.second.SourceCpuTimeUs, *stageStats.MutableSourceCpuTimeUs());
+ stageStats.SetBaseTimeMs(BaseTimeMs);
+ p.second.CpuTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableCpuTimeUs());
+ p.second.SourceCpuTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableSourceCpuTimeUs());
+ p.second.MaxMemoryUsage.ExportAggStats(BaseTimeMs, *stageStats.MutableMaxMemoryUsage());
ExportAggStats(p.second.InputRows, *stageStats.MutableInputRows());
ExportAggStats(p.second.InputBytes, *stageStats.MutableInputBytes());
@@ -740,8 +979,8 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
ExportAggStats(p.second.EgressRows, *stageStats.MutableEgressRows());
ExportAggStats(p.second.EgressBytes, *stageStats.MutableEgressBytes());
- ExportAggStats(p.second.StartTimeMs, *stageStats.MutableStartTimeMs());
- ExportAggStats(p.second.FinishTimeMs, *stageStats.MutableFinishTimeMs());
+ ExportOffsetAggStats(p.second.StartTimeMs, *stageStats.MutableStartTimeMs(), BaseTimeMs);
+ ExportOffsetAggStats(p.second.FinishTimeMs, *stageStats.MutableFinishTimeMs(), BaseTimeMs);
ExportAggStats(p.second.DurationUs, *stageStats.MutableDurationUs());
ExportAggStats(p.second.WaitInputTimeUs, *stageStats.MutableWaitInputTimeUs());
ExportAggStats(p.second.WaitOutputTimeUs, *stageStats.MutableWaitOutputTimeUs());
@@ -773,12 +1012,81 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
}
}
+void TQueryExecutionStats::AdjustAsyncAggr(NYql::NDqProto::TDqAsyncStatsAggr& stats) {
+ if (stats.HasFirstMessageMs()) {
+ AdjustDqStatsAggr(*stats.MutableFirstMessageMs());
+ }
+ if (stats.HasPauseMessageMs()) {
+ AdjustDqStatsAggr(*stats.MutablePauseMessageMs());
+ }
+ if (stats.HasResumeMessageMs()) {
+ AdjustDqStatsAggr(*stats.MutableResumeMessageMs());
+ }
+ if (stats.HasLastMessageMs()) {
+ AdjustDqStatsAggr(*stats.MutableLastMessageMs());
+ }
+}
+
+void TQueryExecutionStats::AdjustAsyncBufferAggr(NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
+ if (stats.HasIngress()) {
+ AdjustAsyncAggr(*stats.MutableIngress());
+ }
+ if (stats.HasPush()) {
+ AdjustAsyncAggr(*stats.MutablePush());
+ }
+ if (stats.HasPop()) {
+ AdjustAsyncAggr(*stats.MutablePop());
+ }
+ if (stats.HasEgress()) {
+ AdjustAsyncAggr(*stats.MutableEgress());
+ }
+}
+
+void TQueryExecutionStats::AdjustDqStatsAggr(NYql::NDqProto::TDqStatsAggr& stats) {
+ if (auto min = stats.GetMin()) {
+ stats.SetMin(min - BaseTimeMs);
+ }
+ if (auto max = stats.GetMax()) {
+ stats.SetMax(max - BaseTimeMs);
+ }
+ if (auto cnt = stats.GetCnt()) {
+ stats.SetSum(stats.GetSum() - BaseTimeMs * cnt);
+ }
+}
+
+void TQueryExecutionStats::AdjustBaseTime(NDqProto::TDqStageStats* stageStats) {
+ if (stageStats->HasStartTimeMs()) {
+ AdjustDqStatsAggr(*stageStats->MutableStartTimeMs());
+ }
+ if (stageStats->HasFinishTimeMs()) {
+ AdjustDqStatsAggr(*stageStats->MutableFinishTimeMs());
+ }
+ for (auto& p : *stageStats->MutableIngress()) {
+ AdjustAsyncBufferAggr(p.second);
+ }
+ for (auto& p : *stageStats->MutableInput()) {
+ AdjustAsyncBufferAggr(p.second);
+ }
+ for (auto& p : *stageStats->MutableOutput()) {
+ AdjustAsyncBufferAggr(p.second);
+ }
+ for (auto& p : *stageStats->MutableEgress()) {
+ AdjustAsyncBufferAggr(p.second);
+ }
+}
+
void TQueryExecutionStats::Finish() {
// Cerr << (TStringBuilder() << "-- finish: executerTime: " << ExecuterCpuTime.MicroSeconds() << Endl);
-
THashMap<ui32, NDqProto::TDqStageStats*> protoStages;
+
for (auto& [stageId, stagetype] : TasksGraph->GetStagesInfo()) {
- GetOrCreateStageStats(stageId, *TasksGraph, *Result);
+ auto stageStats = GetOrCreateStageStats(stageId, *TasksGraph, *Result);
+ stageStats->SetBaseTimeMs(BaseTimeMs);
+ AdjustBaseTime(stageStats);
+ auto it = StageStats.find(stageId.StageId);
+ if (it != StageStats.end()) {
+ it->second.ExportHistory(BaseTimeMs, *stageStats);
+ }
}
Result->SetCpuTimeUs(Result->GetCpuTimeUs() + ExecuterCpuTime.MicroSeconds());
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h
index e3fd084557..ad297bf6a8 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h
@@ -14,9 +14,22 @@ NYql::NDqProto::EDqStatsMode GetDqStatsModeShard(Ydb::Table::QueryStatsCollectio
bool CollectFullStats(Ydb::Table::QueryStatsCollection::Mode statsMode);
bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode);
+struct TTimeSeriesStats {
+ std::vector<ui64> Values;
+ ui32 HistorySampleCount = 0;
+ ui64 Sum = 0;
+ std::vector<std::pair<ui64, ui64>> History;
+
+ void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats);
+ void ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats);
+ void Resize(ui32 taskCount);
+ void SetNonZero(ui32 taskIndex, ui64 value);
+ void Pack();
+};
+
struct TAsyncStats {
// Data
- std::vector<ui64> Bytes;
+ TTimeSeriesStats Bytes;
std::vector<ui64> DecompressedBytes;
std::vector<ui64> Rows;
std::vector<ui64> Chunks;
@@ -26,11 +39,13 @@ struct TAsyncStats {
std::vector<ui64> PauseMessageMs;
std::vector<ui64> ResumeMessageMs;
std::vector<ui64> LastMessageMs;
- std::vector<ui64> WaitTimeUs;
+ TTimeSeriesStats WaitTimeUs;
std::vector<ui64> WaitPeriods;
std::vector<ui64> ActiveTimeUs;
void Resize(ui32 taskCount);
+ void SetHistorySampleCount(ui32 historySampleCount);
+ void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncStatsAggr& stats);
};
struct TAsyncBufferStats {
@@ -46,6 +61,8 @@ struct TAsyncBufferStats {
TAsyncStats Egress;
void Resize(ui32 taskCount);
+ void SetHistorySampleCount(ui32 historySampleCount);
+ void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats);
};
struct TTableStats {
@@ -73,8 +90,8 @@ struct TStageExecutionStats {
std::map<ui32, ui32> Task2Index;
- std::vector<ui64> CpuTimeUs;
- std::vector<ui64> SourceCpuTimeUs;
+ TTimeSeriesStats CpuTimeUs;
+ TTimeSeriesStats SourceCpuTimeUs;
std::vector<ui64> InputRows;
std::vector<ui64> InputBytes;
@@ -100,11 +117,14 @@ struct TStageExecutionStats {
std::map<ui32, TAsyncBufferStats> Input;
std::map<ui32, TAsyncBufferStats> Output;
- std::vector<ui64> MaxMemoryUsage;
+ TTimeSeriesStats MaxMemoryUsage;
+ ui32 HistorySampleCount;
void Resize(ui32 taskCount);
- void UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats);
- void UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs);
+ void SetHistorySampleCount(ui32 historySampleCount);
+ void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStageStats& stageStats);
+ ui64 UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats);
+ ui64 UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs);
};
struct TQueryExecutionStats {
@@ -112,6 +132,13 @@ private:
std::map<ui32, std::map<ui32, ui32>> ShardsCountByNode;
std::map<ui32, bool> UseLlvmByStageId;
std::map<ui32, TStageExecutionStats> StageStats;
+ ui64 BaseTimeMs = 0;
+ void ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto::TDqAsyncStatsAggr& stats);
+ void ExportAggAsyncBufferStats(TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats);
+ void AdjustAsyncAggr(NYql::NDqProto::TDqAsyncStatsAggr& stats);
+ void AdjustAsyncBufferAggr(NYql::NDqProto::TDqAsyncBufferStatsAggr& stats);
+ void AdjustDqStatsAggr(NYql::NDqProto::TDqStatsAggr& stats);
+ void AdjustBaseTime(NYql::NDqProto::TDqStageStats* stageStats);
public:
const Ydb::Table::QueryStatsCollection::Mode StatsMode;
const TKqpTasksGraph* const TasksGraph = nullptr;
@@ -119,6 +146,7 @@ public:
// basic stats
std::unordered_set<ui64> AffectedShards;
+ ui32 HistorySampleCount = 0;
ui32 TotalTasks = 0;
ui64 ResultBytes = 0;
ui64 ResultRows = 0;
@@ -145,6 +173,7 @@ public:
, TasksGraph(tasksGraph)
, Result(result)
{
+ HistorySampleCount = 32;
}
void AddComputeActorStats(
diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp
index dde5ad890e..4d0aef4837 100644
--- a/ydb/core/kqp/opt/kqp_query_plan.cpp
+++ b/ydb/core/kqp/opt/kqp_query_plan.cpp
@@ -2373,6 +2373,13 @@ void FillAggrStat(NJson::TJsonValue& node, const NYql::NDqProto::TDqStatsAggr& a
aggrStat["Max"] = max;
aggrStat["Sum"] = sum;
aggrStat["Count"] = aggr.GetCnt();
+ if (aggr.GetHistory().size()) {
+ auto& aggrHistory = aggrStat.InsertValue("History", NJson::JSON_ARRAY);
+ for (auto& h : aggr.GetHistory()) {
+ aggrHistory.AppendValue(h.GetTimeMs());
+ aggrHistory.AppendValue(h.GetValue());
+ }
+ }
}
}
@@ -2550,6 +2557,10 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
stats["StageDurationUs"] = (*stat)->GetStageDurationUs();
+ if ((*stat)->GetBaseTimeMs()) {
+ stats["BaseTimeMs"] = (*stat)->GetBaseTimeMs();
+ }
+
if ((*stat)->HasDurationUs()) {
FillAggrStat(stats, (*stat)->GetDurationUs(), "DurationUs");
}