diff options
author | Hor911 <hor911@ydb.tech> | 2024-07-18 15:37:39 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-18 15:37:39 +0300 |
commit | ab98fb1b80564ef7bf1ca42c0bae0b14df408a32 (patch) | |
tree | 6a979fc0b3b2f8b6dad31a84a344dc28f70b5540 | |
parent | 2d8ae71907f4da2c07c941ea8efa277d4680d216 (diff) | |
download | ydb-ab98fb1b80564ef7bf1ca42c0bae0b14df408a32.tar.gz |
Time series metrics + relative time in stat (#6805)
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_stats.cpp | 438 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_stats.h | 43 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_query_plan.cpp | 11 |
3 files changed, 420 insertions, 72 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp index 33fbb0a001..ff5f0be7f9 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp @@ -6,8 +6,99 @@ namespace NKikimr::NKqp { using namespace NYql; using namespace NYql::NDq; +void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsAggr& stats); + +ui64 NonZeroMin(ui64 a, ui64 b) { + return (b == 0) ? a : ((a == 0 || a > b) ? b : a); +} + +void TTimeSeriesStats::ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats) { + NKikimr::NKqp::ExportAggStats(Values, stats); + ExportHistory(baseTimeMs, stats); +} + +void TTimeSeriesStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats) { + Pack(); + if (!History.empty()) { + for (auto& h : History) { + auto& item = *stats.AddHistory(); + item.SetTimeMs((h.first <= baseTimeMs) ? 0 : (h.first - baseTimeMs)); + item.SetValue(h.second); + } + } +} + +void TTimeSeriesStats::Resize(ui32 taskCount) { + Values.resize(taskCount); +} + +void TTimeSeriesStats::SetNonZero(ui32 taskIndex, ui64 value) { + if (value) { + Sum += value; + Sum -= Values[taskIndex]; + Values[taskIndex] = value; + } + if (HistorySampleCount) { + auto nowMs = Now().MilliSeconds(); + + if (!History.empty() && History.back().first == nowMs) { + History.back().second = Sum; + return; + } + + if (History.size() > 1 && History.back().second == Sum && History[History.size() - 2].second == Sum) { + History.back().first = nowMs; + return; + } + + History.emplace_back(nowMs, Sum); + if (History.size() >= HistorySampleCount * 2) { + Pack(); + } + } +} + +void TTimeSeriesStats::Pack() { + if (HistorySampleCount == 0) { + History.clear(); + return; + } + if (History.size() > HistorySampleCount) { + + if (HistorySampleCount == 1) { + History.front() = History.back(); + return; + } + if (HistorySampleCount == 2) { + History[1] = History.back(); + History.resize(2); + return; + } + + std::vector<std::pair<ui64, ui64>> history; + ui32 count = History.size(); + ui32 delta = count - HistorySampleCount; + ui64 minTime = History.front().first; + ui64 maxTime = History.back().first; + ui64 deltaTime = (maxTime - minTime) / (HistorySampleCount - 1); + bool first = true; + ui64 nextTime = minTime; + for (auto& h : History) { + if (!first && delta && ((h.first < nextTime) || (delta + 1 == count))) { + delta--; + } else { + history.push_back(h); + nextTime += deltaTime; + first = false; + } + count--; + } + History.swap(history); + } +} + void TAsyncStats::Resize(ui32 taskCount) { - Bytes.resize(taskCount); + Bytes.Resize(taskCount); DecompressedBytes.resize(taskCount); Rows.resize(taskCount); Chunks.resize(taskCount); @@ -16,11 +107,25 @@ void TAsyncStats::Resize(ui32 taskCount) { PauseMessageMs.resize(taskCount); ResumeMessageMs.resize(taskCount); LastMessageMs.resize(taskCount); - WaitTimeUs.resize(taskCount); + WaitTimeUs.Resize(taskCount); WaitPeriods.resize(taskCount); ActiveTimeUs.resize(taskCount); } +void TAsyncStats::SetHistorySampleCount(ui32 historySampleCount) { + Bytes.HistorySampleCount = historySampleCount; + WaitTimeUs.HistorySampleCount = historySampleCount; +} + +void TAsyncStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncStatsAggr& stats) { + if (stats.HasBytes()) { + Bytes.ExportHistory(baseTimeMs, *stats.MutableBytes()); + } + if (stats.HasWaitTimeUs()) { + WaitTimeUs.ExportHistory(baseTimeMs, *stats.MutableWaitTimeUs()); + } +} + void TAsyncBufferStats::Resize(ui32 taskCount) { Ingress.Resize(taskCount); Push.Resize(taskCount); @@ -28,6 +133,28 @@ void TAsyncBufferStats::Resize(ui32 taskCount) { Egress.Resize(taskCount); } +void TAsyncBufferStats::SetHistorySampleCount(ui32 historySampleCount) { + Ingress.SetHistorySampleCount(historySampleCount); + Push.SetHistorySampleCount(historySampleCount); + Pop.SetHistorySampleCount(historySampleCount); + Egress.SetHistorySampleCount(historySampleCount); +} + +void TAsyncBufferStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) { + if (stats.HasIngress()) { + Ingress.ExportHistory(baseTimeMs, *stats.MutableIngress()); + } + if (stats.HasPush()) { + Push.ExportHistory(baseTimeMs, *stats.MutablePush()); + } + if (stats.HasPop()) { + Pop.ExportHistory(baseTimeMs, *stats.MutablePop()); + } + if (stats.HasEgress()) { + Egress.ExportHistory(baseTimeMs, *stats.MutableEgress()); + } +} + void TTableStats::Resize(ui32 taskCount) { ReadRows.resize(taskCount); ReadBytes.resize(taskCount); @@ -39,8 +166,8 @@ void TTableStats::Resize(ui32 taskCount) { } void TStageExecutionStats::Resize(ui32 taskCount) { - CpuTimeUs.resize(taskCount); - SourceCpuTimeUs.resize(taskCount); + CpuTimeUs.Resize(taskCount); + SourceCpuTimeUs.Resize(taskCount); InputRows.resize(taskCount); InputBytes.resize(taskCount); @@ -61,11 +188,54 @@ void TStageExecutionStats::Resize(ui32 taskCount) { WaitOutputTimeUs.resize(taskCount); for (auto& p : Ingress) p.second.Resize(taskCount); - for (auto& p : Egress) p.second.Resize(taskCount); - for (auto& p : Input) p.second.Resize(taskCount); - for (auto& p : Output) p.second.Resize(taskCount); + for (auto& p : Input) p.second.Resize(taskCount); + for (auto& p : Output) p.second.Resize(taskCount); + for (auto& p : Egress) p.second.Resize(taskCount); + + MaxMemoryUsage.Resize(taskCount); +} - MaxMemoryUsage.resize(taskCount); +void TStageExecutionStats::SetHistorySampleCount(ui32 historySampleCount) { + HistorySampleCount = historySampleCount; + CpuTimeUs.HistorySampleCount = historySampleCount; + SourceCpuTimeUs.HistorySampleCount = historySampleCount; + MaxMemoryUsage.HistorySampleCount = historySampleCount; +} + +void TStageExecutionStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStageStats& stageStats) { + if (stageStats.HasCpuTimeUs()) { + CpuTimeUs.ExportHistory(baseTimeMs, *stageStats.MutableCpuTimeUs()); + } + if (stageStats.HasSourceCpuTimeUs()) { + SourceCpuTimeUs.ExportHistory(baseTimeMs, *stageStats.MutableSourceCpuTimeUs()); + } + for (auto& p : *stageStats.MutableIngress()) { + auto it = Ingress.find(p.first); + if (it != Ingress.end()) { + it->second.ExportHistory(baseTimeMs, p.second); + } + } + for (auto& p : *stageStats.MutableInput()) { + auto it = Input.find(p.first); + if (it != Input.end()) { + it->second.ExportHistory(baseTimeMs, p.second); + } + } + for (auto& p : *stageStats.MutableOutput()) { + auto it = Output.find(p.first); + if (it != Output.end()) { + it->second.ExportHistory(baseTimeMs, p.second); + } + } + for (auto& p : *stageStats.MutableEgress()) { + auto it = Egress.find(p.first); + if (it != Egress.end()) { + it->second.ExportHistory(baseTimeMs, p.second); + } + } + if (stageStats.HasMaxMemoryUsage()) { + MaxMemoryUsage.ExportHistory(baseTimeMs, *stageStats.MutableMaxMemoryUsage()); + } } void SetNonZero(ui64& target, ui64 source) { @@ -74,8 +244,10 @@ void SetNonZero(ui64& target, ui64 source) { } } -void TStageExecutionStats::UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats) { - SetNonZero(aggrAsyncStats.Bytes[index], asyncStats.GetBytes()); +ui64 TStageExecutionStats::UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats) { + ui64 baseTimeMs = 0; + + aggrAsyncStats.Bytes.SetNonZero(index, asyncStats.GetBytes()); SetNonZero(aggrAsyncStats.DecompressedBytes[index], asyncStats.GetDecompressedBytes()); SetNonZero(aggrAsyncStats.Rows[index], asyncStats.GetRows()); SetNonZero(aggrAsyncStats.Chunks[index], asyncStats.GetChunks()); @@ -83,20 +255,33 @@ void TStageExecutionStats::UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncSta auto firstMessageMs = asyncStats.GetFirstMessageMs(); SetNonZero(aggrAsyncStats.FirstMessageMs[index], firstMessageMs); - SetNonZero(aggrAsyncStats.PauseMessageMs[index], asyncStats.GetPauseMessageMs()); - SetNonZero(aggrAsyncStats.ResumeMessageMs[index], asyncStats.GetResumeMessageMs()); + baseTimeMs = NonZeroMin(baseTimeMs, firstMessageMs); + + auto pauseMessageMs = asyncStats.GetPauseMessageMs(); + SetNonZero(aggrAsyncStats.PauseMessageMs[index], pauseMessageMs); + baseTimeMs = NonZeroMin(baseTimeMs, pauseMessageMs); + + auto resumeMessageMs = asyncStats.GetResumeMessageMs(); + SetNonZero(aggrAsyncStats.ResumeMessageMs[index], resumeMessageMs); + baseTimeMs = NonZeroMin(baseTimeMs, resumeMessageMs); + auto lastMessageMs = asyncStats.GetLastMessageMs(); SetNonZero(aggrAsyncStats.LastMessageMs[index], lastMessageMs); - SetNonZero(aggrAsyncStats.WaitTimeUs[index], asyncStats.GetWaitTimeUs()); + baseTimeMs = NonZeroMin(baseTimeMs, lastMessageMs); + + aggrAsyncStats.WaitTimeUs.SetNonZero(index, asyncStats.GetWaitTimeUs()); SetNonZero(aggrAsyncStats.WaitPeriods[index], asyncStats.GetWaitPeriods()); if (firstMessageMs && lastMessageMs > firstMessageMs) { aggrAsyncStats.ActiveTimeUs[index] = lastMessageMs - firstMessageMs; } + + return baseTimeMs; } -void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs) { +ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs) { auto taskId = taskStats.GetTaskId(); auto it = Task2Index.find(taskId); + ui64 baseTimeMs = 0; ui32 taskCount = Task2Index.size(); @@ -109,8 +294,8 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS index = it->second; } - SetNonZero(CpuTimeUs[index], taskStats.GetCpuTimeUs()); - SetNonZero(SourceCpuTimeUs[index], taskStats.GetSourceCpuTimeUs()); + CpuTimeUs.SetNonZero(index, taskStats.GetCpuTimeUs()); + SourceCpuTimeUs.SetNonZero(index, taskStats.GetSourceCpuTimeUs()); SetNonZero(InputRows[index], taskStats.GetInputRows()); SetNonZero(InputBytes[index], taskStats.GetInputBytes()); @@ -124,8 +309,14 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS SetNonZero(EgressRows[index], taskStats.GetEgressRows()); SetNonZero(EgressBytes[index], taskStats.GetEgressBytes()); - SetNonZero(StartTimeMs[index], taskStats.GetStartTimeMs()); - SetNonZero(FinishTimeMs[index], taskStats.GetFinishTimeMs()); + auto startTimeMs = taskStats.GetStartTimeMs(); + SetNonZero(StartTimeMs[index], startTimeMs); + baseTimeMs = NonZeroMin(baseTimeMs, startTimeMs); + + auto finishTimeMs = taskStats.GetFinishTimeMs(); + SetNonZero(FinishTimeMs[index], finishTimeMs); + baseTimeMs = NonZeroMin(baseTimeMs, finishTimeMs); + SetNonZero(DurationUs[index], durationUs); SetNonZero(WaitInputTimeUs[index], taskStats.GetWaitInputTimeUs()); SetNonZero(WaitOutputTimeUs[index], taskStats.GetWaitOutputTimeUs()); @@ -148,9 +339,12 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS if (ingressName) { auto [it, inserted] = Ingress.try_emplace(ingressName, taskCount); auto& asyncBufferStats = it->second; - UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress()); - UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush()); - UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop()); + if (inserted) { + asyncBufferStats.SetHistorySampleCount(HistorySampleCount); + } + baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress())); + baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush())); + baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop())); } } @@ -158,16 +352,22 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS auto stageId = inputChannelStat.GetSrcStageId(); auto [it, inserted] = Input.try_emplace(stageId, taskCount); auto& asyncBufferStats = it->second; - UpdateAsyncStats(index, asyncBufferStats.Push, inputChannelStat.GetPush()); - UpdateAsyncStats(index, asyncBufferStats.Pop, inputChannelStat.GetPop()); + if (inserted) { + asyncBufferStats.SetHistorySampleCount(HistorySampleCount); + } + baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Push, inputChannelStat.GetPush())); + baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Pop, inputChannelStat.GetPop())); } for (auto& outputChannelStat : taskStats.GetOutputChannels()) { auto stageId = outputChannelStat.GetDstStageId(); auto [it, inserted] = Output.try_emplace(stageId, taskCount); auto& asyncBufferStats = it->second; - UpdateAsyncStats(index, asyncBufferStats.Push, outputChannelStat.GetPush()); - UpdateAsyncStats(index, asyncBufferStats.Pop, outputChannelStat.GetPop()); + if (inserted) { + asyncBufferStats.SetHistorySampleCount(HistorySampleCount); + } + baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Push, outputChannelStat.GetPush())); + baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Pop, outputChannelStat.GetPop())); } for (auto& sinkStat : taskStats.GetSinks()) { @@ -175,13 +375,18 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS if (egressName) { auto [it, inserted] = Egress.try_emplace(egressName, taskCount); auto& asyncBufferStats = it->second; - UpdateAsyncStats(index, asyncBufferStats.Push, sinkStat.GetPush()); - UpdateAsyncStats(index, asyncBufferStats.Pop, sinkStat.GetPop()); - UpdateAsyncStats(index, asyncBufferStats.Ingress, sinkStat.GetEgress()); + if (inserted) { + asyncBufferStats.SetHistorySampleCount(HistorySampleCount); + } + baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Push, sinkStat.GetPush())); + baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Pop, sinkStat.GetPop())); + baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Ingress, sinkStat.GetEgress())); } } - SetNonZero(MaxMemoryUsage[index], maxMemoryUsage); + MaxMemoryUsage.SetNonZero(index, maxMemoryUsage); + + return baseTimeMs; } namespace { @@ -225,7 +430,9 @@ struct TAsyncGroupStat { ui64 Count = 0; }; -void UpdateAsyncAggr(NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDqAsyncBufferStats& asyncStat) noexcept { +ui64 UpdateAsyncAggr(NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDqAsyncBufferStats& asyncStat) noexcept { + ui64 baseTimeMs = 0; + UpdateAggr(asyncAggr.MutableBytes(), asyncStat.GetBytes()); UpdateAggr(asyncAggr.MutableDecompressedBytes(), asyncStat.GetDecompressedBytes()); UpdateAggr(asyncAggr.MutableRows(), asyncStat.GetRows()); @@ -235,16 +442,25 @@ void UpdateAsyncAggr(NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDq auto firstMessageMs = asyncStat.GetFirstMessageMs(); if (firstMessageMs) { UpdateAggr(asyncAggr.MutableFirstMessageMs(), firstMessageMs); + baseTimeMs = NonZeroMin(baseTimeMs, firstMessageMs); } - if (asyncStat.GetPauseMessageMs()) { - UpdateAggr(asyncAggr.MutablePauseMessageMs(), asyncStat.GetPauseMessageMs()); + + auto pauseMessageMs = asyncStat.GetPauseMessageMs(); + if (pauseMessageMs) { + UpdateAggr(asyncAggr.MutablePauseMessageMs(), pauseMessageMs); + baseTimeMs = NonZeroMin(baseTimeMs, pauseMessageMs); } - if (asyncStat.GetResumeMessageMs()) { - UpdateAggr(asyncAggr.MutableResumeMessageMs(), asyncStat.GetResumeMessageMs()); + + auto resumeMessageMs = asyncStat.GetResumeMessageMs(); + if (resumeMessageMs) { + UpdateAggr(asyncAggr.MutableResumeMessageMs(), resumeMessageMs); + baseTimeMs = NonZeroMin(baseTimeMs, resumeMessageMs); } + auto lastMessageMs = asyncStat.GetLastMessageMs(); if (lastMessageMs) { UpdateAggr(asyncAggr.MutableLastMessageMs(), lastMessageMs); + baseTimeMs = NonZeroMin(baseTimeMs, lastMessageMs); } UpdateAggr(asyncAggr.MutableWaitTimeUs(), asyncStat.GetWaitTimeUs()); @@ -253,6 +469,8 @@ void UpdateAsyncAggr(NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDq if (firstMessageMs && lastMessageMs >= firstMessageMs) { UpdateAggr(asyncAggr.MutableActiveTimeUs(), (lastMessageMs - firstMessageMs) * 1000); } + + return baseTimeMs; } NDqProto::TDqStageStats* GetOrCreateStageStats(const NYql::NDq::TStageId& stageId, @@ -365,30 +583,36 @@ void TQueryExecutionStats::AddComputeActorFullStatsByTask( UpdateAggr(stageStats->MutableEgressRows(), task.GetEgressRows()); UpdateAggr(stageStats->MutableEgressBytes(), task.GetEgressBytes()); - UpdateAggr(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); - UpdateAggr(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); + auto startTimeMs = task.GetStartTimeMs(); + UpdateAggr(stageStats->MutableStartTimeMs(), startTimeMs); + BaseTimeMs = NonZeroMin(BaseTimeMs, startTimeMs); + + auto finishTimeMs = task.GetFinishTimeMs(); + UpdateAggr(stageStats->MutableFinishTimeMs(), finishTimeMs); + BaseTimeMs = NonZeroMin(BaseTimeMs, finishTimeMs); + UpdateAggr(stageStats->MutableDurationUs(), stats.GetDurationUs()); UpdateAggr(stageStats->MutableWaitInputTimeUs(), task.GetWaitInputTimeUs()); UpdateAggr(stageStats->MutableWaitOutputTimeUs(), task.GetWaitOutputTimeUs()); FillStageDurationUs(*stageStats); for (auto& sourcesStat : task.GetSources()) { - UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutableIngress(), sourcesStat.GetIngress()); - UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePush(), sourcesStat.GetPush()); - UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePop(), sourcesStat.GetPop()); + BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutableIngress(), sourcesStat.GetIngress())); + BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePush(), sourcesStat.GetPush())); + BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePop(), sourcesStat.GetPop())); } for (auto& inputChannelStat : task.GetInputChannels()) { - UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePush(), inputChannelStat.GetPush()); - UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePop(), inputChannelStat.GetPop()); + BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePush(), inputChannelStat.GetPush())); + BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePop(), inputChannelStat.GetPop())); } for (auto& outputChannelStat : task.GetOutputChannels()) { - UpdateAsyncAggr(*(*stageStats->MutableOutput())[outputChannelStat.GetDstStageId()].MutablePush(), outputChannelStat.GetPush()); - UpdateAsyncAggr(*(*stageStats->MutableOutput())[outputChannelStat.GetDstStageId()].MutablePop(), outputChannelStat.GetPop()); + BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableOutput())[outputChannelStat.GetDstStageId()].MutablePush(), outputChannelStat.GetPush())); + BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableOutput())[outputChannelStat.GetDstStageId()].MutablePop(), outputChannelStat.GetPop())); } for (auto& sinksStat : task.GetSinks()) { - UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutablePush(), sinksStat.GetPush()); - UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutablePop(), sinksStat.GetPop()); - UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutableEgress(), sinksStat.GetEgress()); + BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutablePush(), sinksStat.GetPush())); + BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutablePop(), sinksStat.GetPop())); + BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutableEgress(), sinksStat.GetEgress())); } } @@ -490,6 +714,8 @@ void TQueryExecutionStats::AddDatashardFullStatsByTask( const NYql::NDqProto::TDqTaskStats& task, ui64 datashardCpuTimeUs) { auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result); + // TODO: dedup with AddComputeActorFullStatsByTask + stageStats->SetTotalTasksCount(stageStats->GetTotalTasksCount() + 1); UpdateAggr(stageStats->MutableCpuTimeUs(), task.GetCpuTimeUs()); UpdateAggr(stageStats->MutableInputRows(), task.GetInputRows()); @@ -497,9 +723,14 @@ void TQueryExecutionStats::AddDatashardFullStatsByTask( UpdateAggr(stageStats->MutableOutputRows(), task.GetOutputRows()); UpdateAggr(stageStats->MutableOutputBytes(), task.GetOutputBytes()); - UpdateAggr(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); - UpdateAggr(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); - // UpdateAggr(stageStats->MutableDurationUs(), ??? ); + auto startTimeMs = task.GetStartTimeMs(); + UpdateAggr(stageStats->MutableStartTimeMs(), startTimeMs); + BaseTimeMs = NonZeroMin(BaseTimeMs, startTimeMs); + + auto finishTimeMs = task.GetFinishTimeMs(); + UpdateAggr(stageStats->MutableFinishTimeMs(), finishTimeMs); + BaseTimeMs = NonZeroMin(BaseTimeMs, finishTimeMs); + UpdateAggr(stageStats->MutableWaitInputTimeUs(), task.GetWaitInputTimeUs()); UpdateAggr(stageStats->MutableWaitOutputTimeUs(), task.GetWaitOutputTimeUs()); FillStageDurationUs(*stageStats); @@ -635,8 +866,9 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD auto [it, inserted] = StageStats.try_emplace(stageId); if (inserted) { it->second.StageId = TasksGraph->GetTask(taskStats.GetTaskId()).StageId; + it->second.SetHistorySampleCount(HistorySampleCount); } - it->second.UpdateStats(taskStats, stats.GetMaxMemoryUsage(), stats.GetDurationUs()); + BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, stats.GetMaxMemoryUsage(), stats.GetDurationUs())); } void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) { @@ -660,12 +892,13 @@ void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& sta } } -void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsAggr& stats) { +void ExportOffsetAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsAggr& stats, ui64 offset) { ui64 count = 0; ui64 min = 0; ui64 max = 0; ui64 sum = 0; for (auto d : data) { + d = (d <= offset) ? 0 : (d - offset); if (d) { if (count) { if (min > d) min = d; @@ -685,6 +918,10 @@ void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsAggr& stats } } +void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsAggr& stats) { + ExportOffsetAggStats(data, stats, 0); +} + ui64 ExportAggStats(std::vector<ui64>& data) { ui64 sum = 0; for (auto d : data) { @@ -693,21 +930,21 @@ ui64 ExportAggStats(std::vector<ui64>& data) { return sum; } -void ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto::TDqAsyncStatsAggr& stats) { - ExportAggStats(data.Bytes, *stats.MutableBytes()); +void TQueryExecutionStats::ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto::TDqAsyncStatsAggr& stats) { + data.Bytes.ExportAggStats(BaseTimeMs, *stats.MutableBytes()); ExportAggStats(data.Rows, *stats.MutableRows()); ExportAggStats(data.Chunks, *stats.MutableChunks()); ExportAggStats(data.Splits, *stats.MutableSplits()); - ExportAggStats(data.FirstMessageMs, *stats.MutableFirstMessageMs()); - ExportAggStats(data.PauseMessageMs, *stats.MutablePauseMessageMs()); - ExportAggStats(data.ResumeMessageMs, *stats.MutableResumeMessageMs()); - ExportAggStats(data.LastMessageMs, *stats.MutableLastMessageMs()); - ExportAggStats(data.WaitTimeUs, *stats.MutableWaitTimeUs()); + ExportOffsetAggStats(data.FirstMessageMs, *stats.MutableFirstMessageMs(), BaseTimeMs); + ExportOffsetAggStats(data.PauseMessageMs, *stats.MutablePauseMessageMs(), BaseTimeMs); + ExportOffsetAggStats(data.ResumeMessageMs, *stats.MutableResumeMessageMs(), BaseTimeMs); + ExportOffsetAggStats(data.LastMessageMs, *stats.MutableLastMessageMs(), BaseTimeMs); + data.WaitTimeUs.ExportAggStats(BaseTimeMs, *stats.MutableWaitTimeUs()); ExportAggStats(data.WaitPeriods, *stats.MutableWaitPeriods()); ExportAggStats(data.ActiveTimeUs, *stats.MutableActiveTimeUs()); } -void ExportAggAsyncBufferStats(TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) { +void TQueryExecutionStats::ExportAggAsyncBufferStats(TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) { ExportAggAsyncStats(data.Ingress, *stats.MutableIngress()); ExportAggAsyncStats(data.Push, *stats.MutablePush()); ExportAggAsyncStats(data.Pop, *stats.MutablePop()); @@ -725,8 +962,10 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st auto& stageStats = *protoStages[p.second.StageId.StageId]; stageStats.SetTotalTasksCount(p.second.Task2Index.size()); - ExportAggStats(p.second.CpuTimeUs, *stageStats.MutableCpuTimeUs()); - ExportAggStats(p.second.SourceCpuTimeUs, *stageStats.MutableSourceCpuTimeUs()); + stageStats.SetBaseTimeMs(BaseTimeMs); + p.second.CpuTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableCpuTimeUs()); + p.second.SourceCpuTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableSourceCpuTimeUs()); + p.second.MaxMemoryUsage.ExportAggStats(BaseTimeMs, *stageStats.MutableMaxMemoryUsage()); ExportAggStats(p.second.InputRows, *stageStats.MutableInputRows()); ExportAggStats(p.second.InputBytes, *stageStats.MutableInputBytes()); @@ -740,8 +979,8 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st ExportAggStats(p.second.EgressRows, *stageStats.MutableEgressRows()); ExportAggStats(p.second.EgressBytes, *stageStats.MutableEgressBytes()); - ExportAggStats(p.second.StartTimeMs, *stageStats.MutableStartTimeMs()); - ExportAggStats(p.second.FinishTimeMs, *stageStats.MutableFinishTimeMs()); + ExportOffsetAggStats(p.second.StartTimeMs, *stageStats.MutableStartTimeMs(), BaseTimeMs); + ExportOffsetAggStats(p.second.FinishTimeMs, *stageStats.MutableFinishTimeMs(), BaseTimeMs); ExportAggStats(p.second.DurationUs, *stageStats.MutableDurationUs()); ExportAggStats(p.second.WaitInputTimeUs, *stageStats.MutableWaitInputTimeUs()); ExportAggStats(p.second.WaitOutputTimeUs, *stageStats.MutableWaitOutputTimeUs()); @@ -773,12 +1012,81 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st } } +void TQueryExecutionStats::AdjustAsyncAggr(NYql::NDqProto::TDqAsyncStatsAggr& stats) { + if (stats.HasFirstMessageMs()) { + AdjustDqStatsAggr(*stats.MutableFirstMessageMs()); + } + if (stats.HasPauseMessageMs()) { + AdjustDqStatsAggr(*stats.MutablePauseMessageMs()); + } + if (stats.HasResumeMessageMs()) { + AdjustDqStatsAggr(*stats.MutableResumeMessageMs()); + } + if (stats.HasLastMessageMs()) { + AdjustDqStatsAggr(*stats.MutableLastMessageMs()); + } +} + +void TQueryExecutionStats::AdjustAsyncBufferAggr(NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) { + if (stats.HasIngress()) { + AdjustAsyncAggr(*stats.MutableIngress()); + } + if (stats.HasPush()) { + AdjustAsyncAggr(*stats.MutablePush()); + } + if (stats.HasPop()) { + AdjustAsyncAggr(*stats.MutablePop()); + } + if (stats.HasEgress()) { + AdjustAsyncAggr(*stats.MutableEgress()); + } +} + +void TQueryExecutionStats::AdjustDqStatsAggr(NYql::NDqProto::TDqStatsAggr& stats) { + if (auto min = stats.GetMin()) { + stats.SetMin(min - BaseTimeMs); + } + if (auto max = stats.GetMax()) { + stats.SetMax(max - BaseTimeMs); + } + if (auto cnt = stats.GetCnt()) { + stats.SetSum(stats.GetSum() - BaseTimeMs * cnt); + } +} + +void TQueryExecutionStats::AdjustBaseTime(NDqProto::TDqStageStats* stageStats) { + if (stageStats->HasStartTimeMs()) { + AdjustDqStatsAggr(*stageStats->MutableStartTimeMs()); + } + if (stageStats->HasFinishTimeMs()) { + AdjustDqStatsAggr(*stageStats->MutableFinishTimeMs()); + } + for (auto& p : *stageStats->MutableIngress()) { + AdjustAsyncBufferAggr(p.second); + } + for (auto& p : *stageStats->MutableInput()) { + AdjustAsyncBufferAggr(p.second); + } + for (auto& p : *stageStats->MutableOutput()) { + AdjustAsyncBufferAggr(p.second); + } + for (auto& p : *stageStats->MutableEgress()) { + AdjustAsyncBufferAggr(p.second); + } +} + void TQueryExecutionStats::Finish() { // Cerr << (TStringBuilder() << "-- finish: executerTime: " << ExecuterCpuTime.MicroSeconds() << Endl); - THashMap<ui32, NDqProto::TDqStageStats*> protoStages; + for (auto& [stageId, stagetype] : TasksGraph->GetStagesInfo()) { - GetOrCreateStageStats(stageId, *TasksGraph, *Result); + auto stageStats = GetOrCreateStageStats(stageId, *TasksGraph, *Result); + stageStats->SetBaseTimeMs(BaseTimeMs); + AdjustBaseTime(stageStats); + auto it = StageStats.find(stageId.StageId); + if (it != StageStats.end()) { + it->second.ExportHistory(BaseTimeMs, *stageStats); + } } Result->SetCpuTimeUs(Result->GetCpuTimeUs() + ExecuterCpuTime.MicroSeconds()); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h index e3fd084557..ad297bf6a8 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h @@ -14,9 +14,22 @@ NYql::NDqProto::EDqStatsMode GetDqStatsModeShard(Ydb::Table::QueryStatsCollectio bool CollectFullStats(Ydb::Table::QueryStatsCollection::Mode statsMode); bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode); +struct TTimeSeriesStats { + std::vector<ui64> Values; + ui32 HistorySampleCount = 0; + ui64 Sum = 0; + std::vector<std::pair<ui64, ui64>> History; + + void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats); + void ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats); + void Resize(ui32 taskCount); + void SetNonZero(ui32 taskIndex, ui64 value); + void Pack(); +}; + struct TAsyncStats { // Data - std::vector<ui64> Bytes; + TTimeSeriesStats Bytes; std::vector<ui64> DecompressedBytes; std::vector<ui64> Rows; std::vector<ui64> Chunks; @@ -26,11 +39,13 @@ struct TAsyncStats { std::vector<ui64> PauseMessageMs; std::vector<ui64> ResumeMessageMs; std::vector<ui64> LastMessageMs; - std::vector<ui64> WaitTimeUs; + TTimeSeriesStats WaitTimeUs; std::vector<ui64> WaitPeriods; std::vector<ui64> ActiveTimeUs; void Resize(ui32 taskCount); + void SetHistorySampleCount(ui32 historySampleCount); + void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncStatsAggr& stats); }; struct TAsyncBufferStats { @@ -46,6 +61,8 @@ struct TAsyncBufferStats { TAsyncStats Egress; void Resize(ui32 taskCount); + void SetHistorySampleCount(ui32 historySampleCount); + void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats); }; struct TTableStats { @@ -73,8 +90,8 @@ struct TStageExecutionStats { std::map<ui32, ui32> Task2Index; - std::vector<ui64> CpuTimeUs; - std::vector<ui64> SourceCpuTimeUs; + TTimeSeriesStats CpuTimeUs; + TTimeSeriesStats SourceCpuTimeUs; std::vector<ui64> InputRows; std::vector<ui64> InputBytes; @@ -100,11 +117,14 @@ struct TStageExecutionStats { std::map<ui32, TAsyncBufferStats> Input; std::map<ui32, TAsyncBufferStats> Output; - std::vector<ui64> MaxMemoryUsage; + TTimeSeriesStats MaxMemoryUsage; + ui32 HistorySampleCount; void Resize(ui32 taskCount); - void UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats); - void UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs); + void SetHistorySampleCount(ui32 historySampleCount); + void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStageStats& stageStats); + ui64 UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats); + ui64 UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs); }; struct TQueryExecutionStats { @@ -112,6 +132,13 @@ private: std::map<ui32, std::map<ui32, ui32>> ShardsCountByNode; std::map<ui32, bool> UseLlvmByStageId; std::map<ui32, TStageExecutionStats> StageStats; + ui64 BaseTimeMs = 0; + void ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto::TDqAsyncStatsAggr& stats); + void ExportAggAsyncBufferStats(TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats); + void AdjustAsyncAggr(NYql::NDqProto::TDqAsyncStatsAggr& stats); + void AdjustAsyncBufferAggr(NYql::NDqProto::TDqAsyncBufferStatsAggr& stats); + void AdjustDqStatsAggr(NYql::NDqProto::TDqStatsAggr& stats); + void AdjustBaseTime(NYql::NDqProto::TDqStageStats* stageStats); public: const Ydb::Table::QueryStatsCollection::Mode StatsMode; const TKqpTasksGraph* const TasksGraph = nullptr; @@ -119,6 +146,7 @@ public: // basic stats std::unordered_set<ui64> AffectedShards; + ui32 HistorySampleCount = 0; ui32 TotalTasks = 0; ui64 ResultBytes = 0; ui64 ResultRows = 0; @@ -145,6 +173,7 @@ public: , TasksGraph(tasksGraph) , Result(result) { + HistorySampleCount = 32; } void AddComputeActorStats( diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index dde5ad890e..4d0aef4837 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -2373,6 +2373,13 @@ void FillAggrStat(NJson::TJsonValue& node, const NYql::NDqProto::TDqStatsAggr& a aggrStat["Max"] = max; aggrStat["Sum"] = sum; aggrStat["Count"] = aggr.GetCnt(); + if (aggr.GetHistory().size()) { + auto& aggrHistory = aggrStat.InsertValue("History", NJson::JSON_ARRAY); + for (auto& h : aggr.GetHistory()) { + aggrHistory.AppendValue(h.GetTimeMs()); + aggrHistory.AppendValue(h.GetValue()); + } + } } } @@ -2550,6 +2557,10 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD stats["StageDurationUs"] = (*stat)->GetStageDurationUs(); + if ((*stat)->GetBaseTimeMs()) { + stats["BaseTimeMs"] = (*stat)->GetBaseTimeMs(); + } + if ((*stat)->HasDurationUs()) { FillAggrStat(stats, (*stat)->GetDurationUs(), "DurationUs"); } |