diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-11-01 17:40:53 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-11-01 18:24:54 +0300 |
commit | 0a532f08a13bcf0b6c8406d508dd219167c5ab96 (patch) | |
tree | e9b44f23b2cc5a9df0a5d74142952393db4e75a9 | |
parent | aa31162bb2e86ab105c2326b5bbad68174aa78c4 (diff) | |
download | ydb-0a532f08a13bcf0b6c8406d508dd219167c5ab96.tar.gz |
KIKIMR-19287: add stats for long tasks
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 19 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_stats.cpp | 245 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_stats.h | 27 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 6 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp | 28 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/protos/dq_stats.proto | 6 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_tasks_runner.cpp | 25 |
10 files changed, 235 insertions, 138 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 8446d7e6e3..a14cb64d96 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -207,6 +207,10 @@ public: ); } + bool LogStatsByLongTasks() const { + return Stats->CollectStatsByLongTasks && HasOlapTable; + } + void Finalize() { if (LocksBroken) { TString message = "Transaction locks invalidated."; @@ -251,7 +255,7 @@ public: Stats->FinishTs = TInstant::Now(); Stats->Finish(); - if (CollectFullStats(Request.StatsMode)) { + if (LogStatsByLongTasks() || CollectFullStats(Request.StatsMode)) { for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) { const auto& tx = Request.Transactions[txId].Body; auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats()); @@ -259,6 +263,13 @@ public: } } + if (LogStatsByLongTasks()) { + const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats(); + if (!txPlansWithStats.empty()) { + LOG_N("Full stats: " << txPlansWithStats); + } + } + Stats.reset(); } @@ -1047,8 +1058,10 @@ private: << ", error: " << res->GetError()); if (Stats) { - Stats->AddDatashardStats(std::move(*res->Record.MutableComputeActorStats()), - std::move(*res->Record.MutableTxStats())); + Stats->AddDatashardStats( + std::move(*res->Record.MutableComputeActorStats()), + std::move(*res->Record.MutableTxStats()), + TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())); } switch (res->GetStatus()) { diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 1a88b88038..b478dd040b 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -283,7 +283,11 @@ protected: case NYql::NDqProto::COMPUTE_STATE_FINISHED: { if (Stats) { - Stats->AddComputeActorStats(computeActor.NodeId(), std::move(*state.MutableStats())); + Stats->AddComputeActorStats( + computeActor.NodeId(), + std::move(*state.MutableStats()), + TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs()) + ); } ExtraData[computeActor].Swap(state.MutableExtraData()); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp index 1cc1aef930..584ea11e4f 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp @@ -155,7 +155,55 @@ bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode) { return statsMode >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE; } -void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProto::TDqComputeActorStats&& stats) { +void TQueryExecutionStats::AddComputeActorFullStatsByTask( + const NYql::NDqProto::TDqTaskStats& task, + const NYql::NDqProto::TDqComputeActorStats& stats + ) { + auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result); + + stageStats->SetTotalTasksCount(stageStats->GetTotalTasksCount() + 1); + UpdateAggr(stageStats->MutableMaxMemoryUsage(), stats.GetMaxMemoryUsage()); // only 1 task per CA now + UpdateAggr(stageStats->MutableCpuTimeUs(), task.GetCpuTimeUs()); + UpdateAggr(stageStats->MutableSourceCpuTimeUs(), task.GetSourceCpuTimeUs()); + UpdateAggr(stageStats->MutableInputRows(), task.GetInputRows()); + UpdateAggr(stageStats->MutableInputBytes(), task.GetInputBytes()); + UpdateAggr(stageStats->MutableOutputRows(), task.GetOutputRows()); + UpdateAggr(stageStats->MutableOutputBytes(), task.GetOutputBytes()); + + UpdateMinMax(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); // to be reviewed + UpdateMinMax(stageStats->MutableFirstRowTimeMs(), task.GetFirstRowTimeMs()); // to be reviewed + UpdateMinMax(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); // to be reviewed + + stageStats->SetDurationUs((stageStats->GetFinishTimeMs().GetMax() - stageStats->GetStartTimeMs().GetMin()) * 1'000); + + for (auto& sourcesStat : task.GetSources()) { + UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutableIngress(), sourcesStat.GetIngress()); + UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePush(), sourcesStat.GetPush()); + UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePop(), sourcesStat.GetPop()); + } + for (auto& inputChannelStat : task.GetInputChannels()) { + UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePush(), inputChannelStat.GetPush()); + UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePop(), inputChannelStat.GetPop()); + } + for (auto& outputChannelStat : task.GetOutputChannels()) { + UpdateAsyncAggr(*(*stageStats->MutableOutput())[outputChannelStat.GetDstStageId()].MutablePush(), outputChannelStat.GetPush()); + UpdateAsyncAggr(*(*stageStats->MutableOutput())[outputChannelStat.GetDstStageId()].MutablePop(), outputChannelStat.GetPop()); + } + for (auto& sinksStat : task.GetSinks()) { + UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutablePush(), sinksStat.GetPush()); + UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutablePop(), sinksStat.GetPop()); + UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutableEgress(), sinksStat.GetEgress()); + } +} + +void TQueryExecutionStats::AddComputeActorProfileStatsByTask( + const NYql::NDqProto::TDqTaskStats& task, const NYql::NDqProto::TDqComputeActorStats& stats) { + auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result); + stageStats->AddComputeActors()->CopyFrom(stats); +} + +void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProto::TDqComputeActorStats&& stats, + TDuration collectLongTaskStatsTimeout) { // Cerr << (TStringBuilder() << "::AddComputeActorStats " << stats.DebugString() << Endl); Result->SetCpuTimeUs(Result->GetCpuTimeUs() + stats.GetCpuTimeUs()); @@ -163,7 +211,9 @@ void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProt UpdateAggr(ExtraStats.MutableComputeCpuTimeUs(), stats.GetCpuTimeUs()); - for (auto& task : stats.GetTasks()) { + auto longTasks = TVector<NYql::NDqProto::TDqTaskStats*>(Reserve(stats.GetTasks().size())); + + for (auto& task : *stats.MutableTasks()) { for (auto& table : task.GetTables()) { NYql::NDqProto::TDqTableStats* tableAggr = nullptr; if (auto it = TableStats.find(table.GetTablePath()); it != TableStats.end()) { @@ -198,52 +248,29 @@ void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProt } } } + + // checking whether the task is long + auto taskDuration = TDuration::MilliSeconds(task.GetFinishTimeMs() - task.GetStartTimeMs()); + bool longTask = taskDuration > collectLongTaskStatsTimeout; + if (longTask) { + CollectStatsByLongTasks = true; + longTasks.push_back(&task); + } } if (CollectFullStats(StatsMode)) { - for (auto& task : stats.GetTasks()) { - auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result); - - stageStats->SetTotalTasksCount(stageStats->GetTotalTasksCount() + 1); - UpdateAggr(stageStats->MutableMaxMemoryUsage(), stats.GetMaxMemoryUsage()); // only 1 task per CA now - UpdateAggr(stageStats->MutableCpuTimeUs(), task.GetCpuTimeUs()); - UpdateAggr(stageStats->MutableSourceCpuTimeUs(), task.GetSourceCpuTimeUs()); - UpdateAggr(stageStats->MutableInputRows(), task.GetInputRows()); - UpdateAggr(stageStats->MutableInputBytes(), task.GetInputBytes()); - UpdateAggr(stageStats->MutableOutputRows(), task.GetOutputRows()); - UpdateAggr(stageStats->MutableOutputBytes(), task.GetOutputBytes()); - - UpdateMinMax(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); // to be reviewed - UpdateMinMax(stageStats->MutableFirstRowTimeMs(), task.GetFirstRowTimeMs()); // to be reviewed - UpdateMinMax(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); // to be reviewed - - stageStats->SetDurationUs((stageStats->GetFinishTimeMs().GetMax() - stageStats->GetStartTimeMs().GetMin()) * 1'000); - - for (auto& sourcesStat : task.GetSources()) { - UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutableIngress(), sourcesStat.GetIngress()); - UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePush(), sourcesStat.GetPush()); - UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePop(), sourcesStat.GetPop()); - } - for (auto& inputChannelStat : task.GetInputChannels()) { - UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePush(), inputChannelStat.GetPush()); - UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePop(), inputChannelStat.GetPop()); - } - for (auto& outputChannelStat : task.GetOutputChannels()) { - UpdateAsyncAggr(*(*stageStats->MutableOutput())[outputChannelStat.GetDstStageId()].MutablePush(), outputChannelStat.GetPush()); - UpdateAsyncAggr(*(*stageStats->MutableOutput())[outputChannelStat.GetDstStageId()].MutablePop(), outputChannelStat.GetPop()); - } - for (auto& sinksStat : task.GetSinks()) { - UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutablePush(), sinksStat.GetPush()); - UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutablePop(), sinksStat.GetPop()); - UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutableEgress(), sinksStat.GetEgress()); - } + for (const auto& task : stats.GetTasks()) { + AddComputeActorFullStatsByTask(task, stats); } } if (CollectProfileStats(StatsMode)) { - for (auto& task : stats.GetTasks()) { - auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result); - stageStats->AddComputeActors()->CopyFrom(stats); + for (const auto& task : stats.GetTasks()) { + AddComputeActorProfileStatsByTask(task, stats); + } + } else { + for (const auto* task : longTasks) { + AddComputeActorProfileStatsByTask(*task, stats); } } } @@ -260,8 +287,59 @@ void TQueryExecutionStats::AddDatashardPrepareStats(NKikimrQueryStats::TTxStats& Result->SetCpuTimeUs(Result->GetCpuTimeUs() + cpuUs); } +void TQueryExecutionStats::AddDatashardFullStatsByTask( + const NYql::NDqProto::TDqTaskStats& task, ui64 datashardCpuTimeUs) { + auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result); + + stageStats->SetTotalTasksCount(stageStats->GetTotalTasksCount() + 1); + UpdateAggr(stageStats->MutableCpuTimeUs(), task.GetCpuTimeUs()); + UpdateAggr(stageStats->MutableInputRows(), task.GetInputRows()); + UpdateAggr(stageStats->MutableInputBytes(), task.GetInputBytes()); + UpdateAggr(stageStats->MutableOutputRows(), task.GetOutputRows()); + UpdateAggr(stageStats->MutableOutputBytes(), task.GetOutputBytes()); + + UpdateMinMax(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); // to be reviewed + UpdateMinMax(stageStats->MutableFirstRowTimeMs(), task.GetFirstRowTimeMs()); // to be reviewed + UpdateMinMax(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); // to be reviewed + + stageStats->SetDurationUs((stageStats->GetFinishTimeMs().GetMax() - stageStats->GetStartTimeMs().GetMin()) * 1'000); + + for (auto& tableStats: task.GetTables()) { + auto* tableAggrStats = GetOrCreateTableAggrStats(stageStats, tableStats.GetTablePath()); + + UpdateAggr(tableAggrStats->MutableReadRows(), tableStats.GetReadRows()); + UpdateAggr(tableAggrStats->MutableReadBytes(), tableStats.GetReadBytes()); + UpdateAggr(tableAggrStats->MutableWriteRows(), tableStats.GetWriteRows()); + UpdateAggr(tableAggrStats->MutableWriteBytes(), tableStats.GetWriteBytes()); + UpdateAggr(tableAggrStats->MutableEraseRows(), tableStats.GetEraseRows()); + + NKqpProto::TKqpShardTableExtraStats tableExtraStats; + + if (tableStats.GetExtra().UnpackTo(&tableExtraStats)) { + NKqpProto::TKqpShardTableAggrExtraStats tableAggrExtraStats; + if (tableAggrStats->HasExtra()) { + bool ok = tableAggrStats->MutableExtra()->UnpackTo(&tableAggrExtraStats); + YQL_ENSURE(ok); + } + + tableAggrExtraStats.SetAffectedShards(TableShards[tableStats.GetTablePath()].size()); + UpdateAggr(tableAggrExtraStats.MutableShardCpuTimeUs(), datashardCpuTimeUs); + + tableAggrStats->MutableExtra()->PackFrom(tableAggrExtraStats); + } + } + + NKqpProto::TKqpStageExtraStats stageExtraStats; + if (stageStats->HasExtra()) { + bool ok = stageStats->GetExtra().UnpackTo(&stageExtraStats); + YQL_ENSURE(ok); + } + stageExtraStats.AddDatashardTasks()->CopyFrom(task); + stageStats->MutableExtra()->PackFrom(stageExtraStats); +} + void TQueryExecutionStats::AddDatashardStats(NYql::NDqProto::TDqComputeActorStats&& stats, - NKikimrQueryStats::TTxStats&& txStats) + NKikimrQueryStats::TTxStats&& txStats, TDuration collectLongTaskStatsTimeout) { // Cerr << (TStringBuilder() << "::AddDatashardStats " << stats.DebugString() << ", " << txStats.DebugString() << Endl); @@ -276,7 +354,9 @@ void TQueryExecutionStats::AddDatashardStats(NYql::NDqProto::TDqComputeActorStat Result->SetCpuTimeUs(Result->GetCpuTimeUs() + datashardCpuTimeUs); TotalTasks += stats.GetTasks().size(); - for (auto& task : stats.GetTasks()) { + auto longTasks = TVector<NYql::NDqProto::TDqTaskStats*>(Reserve(stats.GetTasks().size())); + + for (auto& task : *stats.MutableTasks()) { for (auto& table : task.GetTables()) { NYql::NDqProto::TDqTableStats* tableAggr = nullptr; if (auto it = TableStats.find(table.GetTablePath()); it != TableStats.end()) { @@ -299,66 +379,35 @@ void TQueryExecutionStats::AddDatashardStats(NYql::NDqProto::TDqComputeActorStat } tableAggr->SetAffectedPartitions(shards.size()); } + + // checking whether the task is long + auto taskDuration = TDuration::MilliSeconds(task.GetFinishTimeMs() - task.GetStartTimeMs()); + bool longTask = taskDuration > collectLongTaskStatsTimeout; + if (longTask) { + CollectStatsByLongTasks = true; + longTasks.push_back(&task); + } } if (CollectFullStats(StatsMode)) { for (auto& task : stats.GetTasks()) { - auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result); - - stageStats->SetTotalTasksCount(stageStats->GetTotalTasksCount() + 1); - UpdateAggr(stageStats->MutableCpuTimeUs(), task.GetCpuTimeUs()); - UpdateAggr(stageStats->MutableInputRows(), task.GetInputRows()); - UpdateAggr(stageStats->MutableInputBytes(), task.GetInputBytes()); - UpdateAggr(stageStats->MutableOutputRows(), task.GetOutputRows()); - UpdateAggr(stageStats->MutableOutputBytes(), task.GetOutputBytes()); - - UpdateMinMax(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); // to be reviewed - UpdateMinMax(stageStats->MutableFirstRowTimeMs(), task.GetFirstRowTimeMs()); // to be reviewed - UpdateMinMax(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); // to be reviewed - - stageStats->SetDurationUs((stageStats->GetFinishTimeMs().GetMax() - stageStats->GetStartTimeMs().GetMin()) * 1'000); - - for (auto& tableStats: task.GetTables()) { - auto* tableAggrStats = GetOrCreateTableAggrStats(stageStats, tableStats.GetTablePath()); - - UpdateAggr(tableAggrStats->MutableReadRows(), tableStats.GetReadRows()); - UpdateAggr(tableAggrStats->MutableReadBytes(), tableStats.GetReadBytes()); - UpdateAggr(tableAggrStats->MutableWriteRows(), tableStats.GetWriteRows()); - UpdateAggr(tableAggrStats->MutableWriteBytes(), tableStats.GetWriteBytes()); - UpdateAggr(tableAggrStats->MutableEraseRows(), tableStats.GetEraseRows()); - - NKqpProto::TKqpShardTableExtraStats tableExtraStats; - if (tableStats.GetExtra().UnpackTo(&tableExtraStats)) { - NKqpProto::TKqpShardTableAggrExtraStats tableAggrExtraStats; - if (tableAggrStats->HasExtra()) { - bool ok = tableAggrStats->MutableExtra()->UnpackTo(&tableAggrExtraStats); - YQL_ENSURE(ok); - } - - tableAggrExtraStats.SetAffectedShards(TableShards[tableStats.GetTablePath()].size()); - UpdateAggr(tableAggrExtraStats.MutableShardCpuTimeUs(), datashardCpuTimeUs); - - tableAggrStats->MutableExtra()->PackFrom(tableAggrExtraStats); - } - } - - NKqpProto::TKqpStageExtraStats stageExtraStats; - if (stageStats->HasExtra()) { - bool ok = stageStats->GetExtra().UnpackTo(&stageExtraStats); - YQL_ENSURE(ok); - } - stageExtraStats.AddDatashardTasks()->CopyFrom(task); - stageStats->MutableExtra()->PackFrom(stageExtraStats); + AddDatashardFullStatsByTask(task, datashardCpuTimeUs); } - - if (CollectProfileStats(StatsMode)) { - for (auto& task : stats.GetTasks()) { - auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result); - stageStats->AddComputeActors()->CopyFrom(stats); - } + DatashardStats.emplace_back(std::move(txStats)); + } else { + if (!longTasks.empty()) { + DatashardStats.emplace_back(std::move(txStats)); } + } - DatashardStats.emplace_back(std::move(txStats)); + if (CollectProfileStats(StatsMode)) { + for (const auto& task : stats.GetTasks()) { + AddComputeActorProfileStatsByTask(task, stats); + } + } else { + for (const auto* task : longTasks) { + AddComputeActorProfileStatsByTask(*task, stats); + } } } @@ -372,7 +421,7 @@ void TQueryExecutionStats::Finish() { Result->SetResultRows(ResultRows); ExtraStats.SetAffectedShards(AffectedShards.size()); - if (CollectProfileStats(StatsMode)) { + if (CollectStatsByLongTasks || CollectProfileStats(StatsMode)) { for (auto&& s : UseLlvmByStageId) { for (auto&& pbs : *Result->MutableStages()) { if (pbs.GetStageId() == s.first) { @@ -402,7 +451,7 @@ void TQueryExecutionStats::Finish() { } Result->MutableExtra()->PackFrom(ExtraStats); - if (CollectFullStats(StatsMode)) { + if (CollectStatsByLongTasks || CollectFullStats(StatsMode)) { Result->SetExecuterCpuTimeUs(ExecuterCpuTime.MicroSeconds()); Result->SetStartTimeMs(StartTs.MilliSeconds()); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h index 85d8390670..63348022b1 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h @@ -25,7 +25,7 @@ public: std::unordered_set<ui64> AffectedShards; ui32 TotalTasks = 0; std::atomic<ui64> ResultBytes = 0; - std::atomic<ui64> ResultRows = 0; + std::atomic<ui64> ResultRows = 0; TDuration ExecuterCpuTime; TInstant StartTs; @@ -41,6 +41,8 @@ public: TDuration ResolveWallTime; TVector<NKikimrQueryStats::TTxStats> DatashardStats; + bool CollectStatsByLongTasks = false; + TQueryExecutionStats(Ydb::Table::QueryStatsCollection::Mode statsMode, const TKqpTasksGraph* const tasksGraph, NYql::NDqProto::TDqExecutionStats* const result) : StatsMode(statsMode) @@ -49,7 +51,11 @@ public: { } - void AddComputeActorStats(ui32 nodeId, NYql::NDqProto::TDqComputeActorStats&& stats); + void AddComputeActorStats( + ui32 nodeId, + NYql::NDqProto::TDqComputeActorStats&& stats, + TDuration collectLongTaskStatsTimeout = TDuration::Max() + ); void AddNodeShardsCount(const ui32 stageId, const ui32 nodeId, const ui32 shardsCount) { Y_ABORT_UNLESS(ShardsCountByNode[stageId].emplace(nodeId, shardsCount).second); } @@ -58,9 +64,24 @@ public: } void AddDatashardPrepareStats(NKikimrQueryStats::TTxStats&& txStats); - void AddDatashardStats(NYql::NDqProto::TDqComputeActorStats&& stats, NKikimrQueryStats::TTxStats&& txStats); + void AddDatashardStats( + NYql::NDqProto::TDqComputeActorStats&& stats, + NKikimrQueryStats::TTxStats&& txStats, + TDuration collectLongTaskStatsTimeout = TDuration::Max() + ); void Finish(); + +private: + void AddComputeActorFullStatsByTask( + const NYql::NDqProto::TDqTaskStats& task, + const NYql::NDqProto::TDqComputeActorStats& stats); + void AddComputeActorProfileStatsByTask( + const NYql::NDqProto::TDqTaskStats& task, + const NYql::NDqProto::TDqComputeActorStats& stats); + void AddDatashardFullStatsByTask( + const NYql::NDqProto::TDqTaskStats& task, + ui64 datashardCpuTimeUs); }; struct TTableStat { diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index f917b39fee..cb18ace2cf 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -303,11 +303,18 @@ public: Stats->FinishTs = TInstant::Now(); Stats->Finish(); - if (CollectFullStats(Request.StatsMode)) { + if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) { const auto& tx = Request.Transactions[0].Body; auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats()); response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats); } + + if (Stats->CollectStatsByLongTasks) { + const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats(); + if (!txPlansWithStats.empty()) { + LOG_N("Full stats: " << txPlansWithStats); + } + } } LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize()); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 53cf5e1915..07047d3931 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -213,7 +213,7 @@ public: } auto selfId = SelfId(); auto as = TActivationContext::ActorSystem(); - ev->Get()->SetClientLostAction(selfId, as); + ev->Get()->SetClientLostAction(selfId, as); QueryState = std::make_shared<TKqpQueryState>( ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession, Settings.TableService, Settings.QueryService, std::move(id), SessionId); @@ -1044,7 +1044,7 @@ public: request.MaxShardCount = RequestControls.MaxShardCount; request.TraceId = QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId(); LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize()); - + auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(), RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(), @@ -1585,7 +1585,7 @@ public: } LWTRACK(KqpSessionReplyError, QueryState->Orbit, TStringBuilder() << status); } - + Counters->ReportResponseStatus(Settings.DbCounters, record.ByteSize(), record.GetYdbStatus()); for (auto& issue : record.GetResponse().GetQueryIssues()) { Counters->ReportIssues(Settings.DbCounters, CachedIssueCounters, issue); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 9142022f2a..ea706ad72f 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1295,6 +1295,8 @@ message TTableServiceConfig { optional uint32 AggregationComputeThreads = 37; optional uint32 CSScanThreadsPerNode = 38; + + optional uint64 CollectLongTasksStatsTimeoutMs = 22 [default = 10000]; // 10 sec } message TExecuterRetriesConfig { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp index e86e3b11f6..bf0bc688d3 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp @@ -37,23 +37,23 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& protoTask->SetFinishTimeMs(taskStats.FinishTs.MilliSeconds()); // to be reviewed protoTask->SetStartTimeMs(taskStats.StartTs.MilliSeconds()); // to be reviewed - if (StatsLevelCollectProfile(level)) { - if (NActors::TlsActivationContext && NActors::TlsActivationContext->ActorSystem()) { - protoTask->SetNodeId(NActors::TlsActivationContext->ActorSystem()->NodeId); - } - protoTask->SetHostName(HostName()); - protoTask->SetComputeCpuTimeUs(taskStats.ComputeCpuTime.MicroSeconds()); - protoTask->SetBuildCpuTimeUs(taskStats.BuildCpuTime.MicroSeconds()); + if (NActors::TlsActivationContext && NActors::TlsActivationContext->ActorSystem()) { + protoTask->SetNodeId(NActors::TlsActivationContext->ActorSystem()->NodeId); + } + protoTask->SetHostName(HostName()); + protoTask->SetComputeCpuTimeUs(taskStats.ComputeCpuTime.MicroSeconds()); + protoTask->SetBuildCpuTimeUs(taskStats.BuildCpuTime.MicroSeconds()); - protoTask->SetWaitTimeUs(taskStats.WaitTime.MicroSeconds()); // to be reviewed - protoTask->SetWaitOutputTimeUs(taskStats.WaitOutputTime.MicroSeconds()); // to be reviewed + protoTask->SetWaitTimeUs(taskStats.WaitTime.MicroSeconds()); // to be reviewed + protoTask->SetWaitOutputTimeUs(taskStats.WaitOutputTime.MicroSeconds()); // to be reviewed - // All run statuses metrics - protoTask->SetPendingInputTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::PendingInput].MicroSeconds()); // to be reviewed - protoTask->SetPendingOutputTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::PendingOutput].MicroSeconds()); // to be reviewed - protoTask->SetFinishTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::Finished].MicroSeconds()); // to be reviewed - static_assert(TRunStatusTimeMetrics::StatusesCount == 3); // Add all statuses here + // All run statuses metrics + protoTask->SetPendingInputTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::PendingInput].MicroSeconds()); // to be reviewed + protoTask->SetPendingOutputTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::PendingOutput].MicroSeconds()); // to be reviewed + protoTask->SetFinishTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::Finished].MicroSeconds()); // to be reviewed + static_assert(TRunStatusTimeMetrics::StatusesCount == 3); // Add all statuses here + if (StatsLevelCollectProfile(level)) { if (taskStats.ComputeCpuTimeByRun) { auto snapshot = taskStats.ComputeCpuTimeByRun->Snapshot(); for (ui32 i = 0; i < snapshot->Count(); i++) { diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto index 48f16bbf0f..a67d87d70d 100644 --- a/ydb/library/yql/dq/actors/protos/dq_stats.proto +++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto @@ -9,7 +9,7 @@ import "google/protobuf/any.proto"; enum EDqStatsMode { DQ_STATS_MODE_UNSPECIFIED = 0; DQ_STATS_MODE_NONE = 10; // no statistics is expected, it will be ignored anyway - DQ_STATS_MODE_BASIC = 20; // per graph statistics + DQ_STATS_MODE_BASIC = 20; // per graph statistics DQ_STATS_MODE_FULL = 25; // per stage/connection statistics DQ_STATS_MODE_PROFILE = 30; // per task statistics } @@ -19,7 +19,7 @@ enum EDqStatsMode { message TDqAsyncBufferStats { // Data uint64 Bytes = 1; // physical bytes - uint64 Rows = 2; // logical rows (if applicable) + uint64 Rows = 2; // logical rows (if applicable) uint64 Chunks = 3; // chunk is group of sent/received bytes in single batch uint64 Splits = 4; // logical partitioning (if applicable) // Time @@ -35,7 +35,7 @@ message TDqAsyncInputBufferStats { uint64 InputIndex = 1; string IngressName = 9; - TDqAsyncBufferStats Ingress = 10; + TDqAsyncBufferStats Ingress = 10; TDqAsyncBufferStats Push = 11; TDqAsyncBufferStats Pop = 12; diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 4e4fec2bef..9c0355424f 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -229,7 +229,7 @@ public: , LogFunc(logFunc) , AllocatedHolder(std::make_optional<TAllocatedHolder>()) { - if (Settings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) { + if (CollectBasic()) { Stats = std::make_unique<TDqTaskRunnerStats>(); Stats->StartTs = TInstant::Now(); if (Y_UNLIKELY(CollectFull())) { @@ -267,6 +267,10 @@ public: return Settings.StatsMode >= NDqProto::DQ_STATS_MODE_FULL; } + bool CollectBasic() const { + return Settings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC; + } + const TDqMeteringStats* GetMeteringStats() const override { return &BillingStats; } @@ -722,17 +726,14 @@ public: } if (runStatus == ERunStatus::Finished) { - if (Stats) { - Stats->FinishTs = TInstant::Now(); - } - if (Y_UNLIKELY(CollectFull())) { + if (CollectBasic()) { StopWaiting(Stats->FinishTs); } return ERunStatus::Finished; } - if (Y_UNLIKELY(CollectFull())) { + if (CollectBasic()) { auto now = TInstant::Now(); StartWaiting(now); if (runStatus == ERunStatus::PendingOutput) { @@ -877,7 +878,7 @@ private: if (Stats) { auto duration = TInstant::Now() - startComputeTime; Stats->ComputeCpuTime += duration; - if (Y_UNLIKELY(CollectFull())) { + if (CollectBasic()) { RunComputeTime = duration; } } @@ -899,7 +900,7 @@ private: wideBuffer.resize(AllocatedHolder->OutputWideType->GetElementsCount()); } while (!AllocatedHolder->Output->IsFull()) { - if (Y_UNLIKELY(CollectFull())) { + if (CollectBasic()) { auto now = TInstant::Now(); StopWaitingOutput(now); StopWaiting(now); @@ -1002,26 +1003,26 @@ private: std::optional<TInstant> StartWaitTime; void StartWaitingOutput(TInstant now) { - if (Y_UNLIKELY(CollectFull()) && !StartWaitOutputTime) { + if (CollectBasic() && !StartWaitOutputTime) { StartWaitOutputTime = now; } } void StopWaitingOutput(TInstant now) { - if (Y_UNLIKELY(CollectFull()) && StartWaitOutputTime) { + if (CollectBasic() && StartWaitOutputTime) { Stats->WaitOutputTime += (now - *StartWaitOutputTime); StartWaitOutputTime.reset(); } } void StartWaiting(TInstant now) { - if (Y_UNLIKELY(CollectFull()) && !StartWaitTime) { + if (CollectBasic() && !StartWaitTime) { StartWaitTime = now; } } void StopWaiting(TInstant now) { - if (Y_UNLIKELY(CollectFull()) && StartWaitTime) { + if (CollectBasic() && StartWaitTime) { Stats->WaitTime += (now - *StartWaitTime); StartWaitTime.reset(); } |