aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <shumkovnd@yandex-team.com>2023-11-01 17:40:53 +0300
committershumkovnd <shumkovnd@yandex-team.com>2023-11-01 18:24:54 +0300
commit0a532f08a13bcf0b6c8406d508dd219167c5ab96 (patch)
treee9b44f23b2cc5a9df0a5d74142952393db4e75a9
parentaa31162bb2e86ab105c2326b5bbad68174aa78c4 (diff)
downloadydb-0a532f08a13bcf0b6c8406d508dd219167c5ab96.tar.gz
KIKIMR-19287: add stats for long tasks
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp19
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h6
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_stats.cpp245
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_stats.h27
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp9
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp6
-rw-r--r--ydb/core/protos/config.proto2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp28
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_stats.proto6
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp25
10 files changed, 235 insertions, 138 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 8446d7e6e3..a14cb64d96 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -207,6 +207,10 @@ public:
);
}
+ bool LogStatsByLongTasks() const {
+ return Stats->CollectStatsByLongTasks && HasOlapTable;
+ }
+
void Finalize() {
if (LocksBroken) {
TString message = "Transaction locks invalidated.";
@@ -251,7 +255,7 @@ public:
Stats->FinishTs = TInstant::Now();
Stats->Finish();
- if (CollectFullStats(Request.StatsMode)) {
+ if (LogStatsByLongTasks() || CollectFullStats(Request.StatsMode)) {
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
const auto& tx = Request.Transactions[txId].Body;
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
@@ -259,6 +263,13 @@ public:
}
}
+ if (LogStatsByLongTasks()) {
+ const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
+ if (!txPlansWithStats.empty()) {
+ LOG_N("Full stats: " << txPlansWithStats);
+ }
+ }
+
Stats.reset();
}
@@ -1047,8 +1058,10 @@ private:
<< ", error: " << res->GetError());
if (Stats) {
- Stats->AddDatashardStats(std::move(*res->Record.MutableComputeActorStats()),
- std::move(*res->Record.MutableTxStats()));
+ Stats->AddDatashardStats(
+ std::move(*res->Record.MutableComputeActorStats()),
+ std::move(*res->Record.MutableTxStats()),
+ TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs()));
}
switch (res->GetStatus()) {
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 1a88b88038..b478dd040b 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -283,7 +283,11 @@ protected:
case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
if (Stats) {
- Stats->AddComputeActorStats(computeActor.NodeId(), std::move(*state.MutableStats()));
+ Stats->AddComputeActorStats(
+ computeActor.NodeId(),
+ std::move(*state.MutableStats()),
+ TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
+ );
}
ExtraData[computeActor].Swap(state.MutableExtraData());
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
index 1cc1aef930..584ea11e4f 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
@@ -155,7 +155,55 @@ bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode) {
return statsMode >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE;
}
-void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProto::TDqComputeActorStats&& stats) {
+void TQueryExecutionStats::AddComputeActorFullStatsByTask(
+ const NYql::NDqProto::TDqTaskStats& task,
+ const NYql::NDqProto::TDqComputeActorStats& stats
+ ) {
+ auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result);
+
+ stageStats->SetTotalTasksCount(stageStats->GetTotalTasksCount() + 1);
+ UpdateAggr(stageStats->MutableMaxMemoryUsage(), stats.GetMaxMemoryUsage()); // only 1 task per CA now
+ UpdateAggr(stageStats->MutableCpuTimeUs(), task.GetCpuTimeUs());
+ UpdateAggr(stageStats->MutableSourceCpuTimeUs(), task.GetSourceCpuTimeUs());
+ UpdateAggr(stageStats->MutableInputRows(), task.GetInputRows());
+ UpdateAggr(stageStats->MutableInputBytes(), task.GetInputBytes());
+ UpdateAggr(stageStats->MutableOutputRows(), task.GetOutputRows());
+ UpdateAggr(stageStats->MutableOutputBytes(), task.GetOutputBytes());
+
+ UpdateMinMax(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); // to be reviewed
+ UpdateMinMax(stageStats->MutableFirstRowTimeMs(), task.GetFirstRowTimeMs()); // to be reviewed
+ UpdateMinMax(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); // to be reviewed
+
+ stageStats->SetDurationUs((stageStats->GetFinishTimeMs().GetMax() - stageStats->GetStartTimeMs().GetMin()) * 1'000);
+
+ for (auto& sourcesStat : task.GetSources()) {
+ UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutableIngress(), sourcesStat.GetIngress());
+ UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePush(), sourcesStat.GetPush());
+ UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePop(), sourcesStat.GetPop());
+ }
+ for (auto& inputChannelStat : task.GetInputChannels()) {
+ UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePush(), inputChannelStat.GetPush());
+ UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePop(), inputChannelStat.GetPop());
+ }
+ for (auto& outputChannelStat : task.GetOutputChannels()) {
+ UpdateAsyncAggr(*(*stageStats->MutableOutput())[outputChannelStat.GetDstStageId()].MutablePush(), outputChannelStat.GetPush());
+ UpdateAsyncAggr(*(*stageStats->MutableOutput())[outputChannelStat.GetDstStageId()].MutablePop(), outputChannelStat.GetPop());
+ }
+ for (auto& sinksStat : task.GetSinks()) {
+ UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutablePush(), sinksStat.GetPush());
+ UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutablePop(), sinksStat.GetPop());
+ UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutableEgress(), sinksStat.GetEgress());
+ }
+}
+
+void TQueryExecutionStats::AddComputeActorProfileStatsByTask(
+ const NYql::NDqProto::TDqTaskStats& task, const NYql::NDqProto::TDqComputeActorStats& stats) {
+ auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result);
+ stageStats->AddComputeActors()->CopyFrom(stats);
+}
+
+void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProto::TDqComputeActorStats&& stats,
+ TDuration collectLongTaskStatsTimeout) {
// Cerr << (TStringBuilder() << "::AddComputeActorStats " << stats.DebugString() << Endl);
Result->SetCpuTimeUs(Result->GetCpuTimeUs() + stats.GetCpuTimeUs());
@@ -163,7 +211,9 @@ void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProt
UpdateAggr(ExtraStats.MutableComputeCpuTimeUs(), stats.GetCpuTimeUs());
- for (auto& task : stats.GetTasks()) {
+ auto longTasks = TVector<NYql::NDqProto::TDqTaskStats*>(Reserve(stats.GetTasks().size()));
+
+ for (auto& task : *stats.MutableTasks()) {
for (auto& table : task.GetTables()) {
NYql::NDqProto::TDqTableStats* tableAggr = nullptr;
if (auto it = TableStats.find(table.GetTablePath()); it != TableStats.end()) {
@@ -198,52 +248,29 @@ void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProt
}
}
}
+
+ // checking whether the task is long
+ auto taskDuration = TDuration::MilliSeconds(task.GetFinishTimeMs() - task.GetStartTimeMs());
+ bool longTask = taskDuration > collectLongTaskStatsTimeout;
+ if (longTask) {
+ CollectStatsByLongTasks = true;
+ longTasks.push_back(&task);
+ }
}
if (CollectFullStats(StatsMode)) {
- for (auto& task : stats.GetTasks()) {
- auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result);
-
- stageStats->SetTotalTasksCount(stageStats->GetTotalTasksCount() + 1);
- UpdateAggr(stageStats->MutableMaxMemoryUsage(), stats.GetMaxMemoryUsage()); // only 1 task per CA now
- UpdateAggr(stageStats->MutableCpuTimeUs(), task.GetCpuTimeUs());
- UpdateAggr(stageStats->MutableSourceCpuTimeUs(), task.GetSourceCpuTimeUs());
- UpdateAggr(stageStats->MutableInputRows(), task.GetInputRows());
- UpdateAggr(stageStats->MutableInputBytes(), task.GetInputBytes());
- UpdateAggr(stageStats->MutableOutputRows(), task.GetOutputRows());
- UpdateAggr(stageStats->MutableOutputBytes(), task.GetOutputBytes());
-
- UpdateMinMax(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); // to be reviewed
- UpdateMinMax(stageStats->MutableFirstRowTimeMs(), task.GetFirstRowTimeMs()); // to be reviewed
- UpdateMinMax(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); // to be reviewed
-
- stageStats->SetDurationUs((stageStats->GetFinishTimeMs().GetMax() - stageStats->GetStartTimeMs().GetMin()) * 1'000);
-
- for (auto& sourcesStat : task.GetSources()) {
- UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutableIngress(), sourcesStat.GetIngress());
- UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePush(), sourcesStat.GetPush());
- UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePop(), sourcesStat.GetPop());
- }
- for (auto& inputChannelStat : task.GetInputChannels()) {
- UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePush(), inputChannelStat.GetPush());
- UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePop(), inputChannelStat.GetPop());
- }
- for (auto& outputChannelStat : task.GetOutputChannels()) {
- UpdateAsyncAggr(*(*stageStats->MutableOutput())[outputChannelStat.GetDstStageId()].MutablePush(), outputChannelStat.GetPush());
- UpdateAsyncAggr(*(*stageStats->MutableOutput())[outputChannelStat.GetDstStageId()].MutablePop(), outputChannelStat.GetPop());
- }
- for (auto& sinksStat : task.GetSinks()) {
- UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutablePush(), sinksStat.GetPush());
- UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutablePop(), sinksStat.GetPop());
- UpdateAsyncAggr(*(*stageStats->MutableEgress())[sinksStat.GetEgressName()].MutableEgress(), sinksStat.GetEgress());
- }
+ for (const auto& task : stats.GetTasks()) {
+ AddComputeActorFullStatsByTask(task, stats);
}
}
if (CollectProfileStats(StatsMode)) {
- for (auto& task : stats.GetTasks()) {
- auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result);
- stageStats->AddComputeActors()->CopyFrom(stats);
+ for (const auto& task : stats.GetTasks()) {
+ AddComputeActorProfileStatsByTask(task, stats);
+ }
+ } else {
+ for (const auto* task : longTasks) {
+ AddComputeActorProfileStatsByTask(*task, stats);
}
}
}
@@ -260,8 +287,59 @@ void TQueryExecutionStats::AddDatashardPrepareStats(NKikimrQueryStats::TTxStats&
Result->SetCpuTimeUs(Result->GetCpuTimeUs() + cpuUs);
}
+void TQueryExecutionStats::AddDatashardFullStatsByTask(
+ const NYql::NDqProto::TDqTaskStats& task, ui64 datashardCpuTimeUs) {
+ auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result);
+
+ stageStats->SetTotalTasksCount(stageStats->GetTotalTasksCount() + 1);
+ UpdateAggr(stageStats->MutableCpuTimeUs(), task.GetCpuTimeUs());
+ UpdateAggr(stageStats->MutableInputRows(), task.GetInputRows());
+ UpdateAggr(stageStats->MutableInputBytes(), task.GetInputBytes());
+ UpdateAggr(stageStats->MutableOutputRows(), task.GetOutputRows());
+ UpdateAggr(stageStats->MutableOutputBytes(), task.GetOutputBytes());
+
+ UpdateMinMax(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); // to be reviewed
+ UpdateMinMax(stageStats->MutableFirstRowTimeMs(), task.GetFirstRowTimeMs()); // to be reviewed
+ UpdateMinMax(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); // to be reviewed
+
+ stageStats->SetDurationUs((stageStats->GetFinishTimeMs().GetMax() - stageStats->GetStartTimeMs().GetMin()) * 1'000);
+
+ for (auto& tableStats: task.GetTables()) {
+ auto* tableAggrStats = GetOrCreateTableAggrStats(stageStats, tableStats.GetTablePath());
+
+ UpdateAggr(tableAggrStats->MutableReadRows(), tableStats.GetReadRows());
+ UpdateAggr(tableAggrStats->MutableReadBytes(), tableStats.GetReadBytes());
+ UpdateAggr(tableAggrStats->MutableWriteRows(), tableStats.GetWriteRows());
+ UpdateAggr(tableAggrStats->MutableWriteBytes(), tableStats.GetWriteBytes());
+ UpdateAggr(tableAggrStats->MutableEraseRows(), tableStats.GetEraseRows());
+
+ NKqpProto::TKqpShardTableExtraStats tableExtraStats;
+
+ if (tableStats.GetExtra().UnpackTo(&tableExtraStats)) {
+ NKqpProto::TKqpShardTableAggrExtraStats tableAggrExtraStats;
+ if (tableAggrStats->HasExtra()) {
+ bool ok = tableAggrStats->MutableExtra()->UnpackTo(&tableAggrExtraStats);
+ YQL_ENSURE(ok);
+ }
+
+ tableAggrExtraStats.SetAffectedShards(TableShards[tableStats.GetTablePath()].size());
+ UpdateAggr(tableAggrExtraStats.MutableShardCpuTimeUs(), datashardCpuTimeUs);
+
+ tableAggrStats->MutableExtra()->PackFrom(tableAggrExtraStats);
+ }
+ }
+
+ NKqpProto::TKqpStageExtraStats stageExtraStats;
+ if (stageStats->HasExtra()) {
+ bool ok = stageStats->GetExtra().UnpackTo(&stageExtraStats);
+ YQL_ENSURE(ok);
+ }
+ stageExtraStats.AddDatashardTasks()->CopyFrom(task);
+ stageStats->MutableExtra()->PackFrom(stageExtraStats);
+}
+
void TQueryExecutionStats::AddDatashardStats(NYql::NDqProto::TDqComputeActorStats&& stats,
- NKikimrQueryStats::TTxStats&& txStats)
+ NKikimrQueryStats::TTxStats&& txStats, TDuration collectLongTaskStatsTimeout)
{
// Cerr << (TStringBuilder() << "::AddDatashardStats " << stats.DebugString() << ", " << txStats.DebugString() << Endl);
@@ -276,7 +354,9 @@ void TQueryExecutionStats::AddDatashardStats(NYql::NDqProto::TDqComputeActorStat
Result->SetCpuTimeUs(Result->GetCpuTimeUs() + datashardCpuTimeUs);
TotalTasks += stats.GetTasks().size();
- for (auto& task : stats.GetTasks()) {
+ auto longTasks = TVector<NYql::NDqProto::TDqTaskStats*>(Reserve(stats.GetTasks().size()));
+
+ for (auto& task : *stats.MutableTasks()) {
for (auto& table : task.GetTables()) {
NYql::NDqProto::TDqTableStats* tableAggr = nullptr;
if (auto it = TableStats.find(table.GetTablePath()); it != TableStats.end()) {
@@ -299,66 +379,35 @@ void TQueryExecutionStats::AddDatashardStats(NYql::NDqProto::TDqComputeActorStat
}
tableAggr->SetAffectedPartitions(shards.size());
}
+
+ // checking whether the task is long
+ auto taskDuration = TDuration::MilliSeconds(task.GetFinishTimeMs() - task.GetStartTimeMs());
+ bool longTask = taskDuration > collectLongTaskStatsTimeout;
+ if (longTask) {
+ CollectStatsByLongTasks = true;
+ longTasks.push_back(&task);
+ }
}
if (CollectFullStats(StatsMode)) {
for (auto& task : stats.GetTasks()) {
- auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result);
-
- stageStats->SetTotalTasksCount(stageStats->GetTotalTasksCount() + 1);
- UpdateAggr(stageStats->MutableCpuTimeUs(), task.GetCpuTimeUs());
- UpdateAggr(stageStats->MutableInputRows(), task.GetInputRows());
- UpdateAggr(stageStats->MutableInputBytes(), task.GetInputBytes());
- UpdateAggr(stageStats->MutableOutputRows(), task.GetOutputRows());
- UpdateAggr(stageStats->MutableOutputBytes(), task.GetOutputBytes());
-
- UpdateMinMax(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); // to be reviewed
- UpdateMinMax(stageStats->MutableFirstRowTimeMs(), task.GetFirstRowTimeMs()); // to be reviewed
- UpdateMinMax(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); // to be reviewed
-
- stageStats->SetDurationUs((stageStats->GetFinishTimeMs().GetMax() - stageStats->GetStartTimeMs().GetMin()) * 1'000);
-
- for (auto& tableStats: task.GetTables()) {
- auto* tableAggrStats = GetOrCreateTableAggrStats(stageStats, tableStats.GetTablePath());
-
- UpdateAggr(tableAggrStats->MutableReadRows(), tableStats.GetReadRows());
- UpdateAggr(tableAggrStats->MutableReadBytes(), tableStats.GetReadBytes());
- UpdateAggr(tableAggrStats->MutableWriteRows(), tableStats.GetWriteRows());
- UpdateAggr(tableAggrStats->MutableWriteBytes(), tableStats.GetWriteBytes());
- UpdateAggr(tableAggrStats->MutableEraseRows(), tableStats.GetEraseRows());
-
- NKqpProto::TKqpShardTableExtraStats tableExtraStats;
- if (tableStats.GetExtra().UnpackTo(&tableExtraStats)) {
- NKqpProto::TKqpShardTableAggrExtraStats tableAggrExtraStats;
- if (tableAggrStats->HasExtra()) {
- bool ok = tableAggrStats->MutableExtra()->UnpackTo(&tableAggrExtraStats);
- YQL_ENSURE(ok);
- }
-
- tableAggrExtraStats.SetAffectedShards(TableShards[tableStats.GetTablePath()].size());
- UpdateAggr(tableAggrExtraStats.MutableShardCpuTimeUs(), datashardCpuTimeUs);
-
- tableAggrStats->MutableExtra()->PackFrom(tableAggrExtraStats);
- }
- }
-
- NKqpProto::TKqpStageExtraStats stageExtraStats;
- if (stageStats->HasExtra()) {
- bool ok = stageStats->GetExtra().UnpackTo(&stageExtraStats);
- YQL_ENSURE(ok);
- }
- stageExtraStats.AddDatashardTasks()->CopyFrom(task);
- stageStats->MutableExtra()->PackFrom(stageExtraStats);
+ AddDatashardFullStatsByTask(task, datashardCpuTimeUs);
}
-
- if (CollectProfileStats(StatsMode)) {
- for (auto& task : stats.GetTasks()) {
- auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result);
- stageStats->AddComputeActors()->CopyFrom(stats);
- }
+ DatashardStats.emplace_back(std::move(txStats));
+ } else {
+ if (!longTasks.empty()) {
+ DatashardStats.emplace_back(std::move(txStats));
}
+ }
- DatashardStats.emplace_back(std::move(txStats));
+ if (CollectProfileStats(StatsMode)) {
+ for (const auto& task : stats.GetTasks()) {
+ AddComputeActorProfileStatsByTask(task, stats);
+ }
+ } else {
+ for (const auto* task : longTasks) {
+ AddComputeActorProfileStatsByTask(*task, stats);
+ }
}
}
@@ -372,7 +421,7 @@ void TQueryExecutionStats::Finish() {
Result->SetResultRows(ResultRows);
ExtraStats.SetAffectedShards(AffectedShards.size());
- if (CollectProfileStats(StatsMode)) {
+ if (CollectStatsByLongTasks || CollectProfileStats(StatsMode)) {
for (auto&& s : UseLlvmByStageId) {
for (auto&& pbs : *Result->MutableStages()) {
if (pbs.GetStageId() == s.first) {
@@ -402,7 +451,7 @@ void TQueryExecutionStats::Finish() {
}
Result->MutableExtra()->PackFrom(ExtraStats);
- if (CollectFullStats(StatsMode)) {
+ if (CollectStatsByLongTasks || CollectFullStats(StatsMode)) {
Result->SetExecuterCpuTimeUs(ExecuterCpuTime.MicroSeconds());
Result->SetStartTimeMs(StartTs.MilliSeconds());
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h
index 85d8390670..63348022b1 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h
@@ -25,7 +25,7 @@ public:
std::unordered_set<ui64> AffectedShards;
ui32 TotalTasks = 0;
std::atomic<ui64> ResultBytes = 0;
- std::atomic<ui64> ResultRows = 0;
+ std::atomic<ui64> ResultRows = 0;
TDuration ExecuterCpuTime;
TInstant StartTs;
@@ -41,6 +41,8 @@ public:
TDuration ResolveWallTime;
TVector<NKikimrQueryStats::TTxStats> DatashardStats;
+ bool CollectStatsByLongTasks = false;
+
TQueryExecutionStats(Ydb::Table::QueryStatsCollection::Mode statsMode, const TKqpTasksGraph* const tasksGraph,
NYql::NDqProto::TDqExecutionStats* const result)
: StatsMode(statsMode)
@@ -49,7 +51,11 @@ public:
{
}
- void AddComputeActorStats(ui32 nodeId, NYql::NDqProto::TDqComputeActorStats&& stats);
+ void AddComputeActorStats(
+ ui32 nodeId,
+ NYql::NDqProto::TDqComputeActorStats&& stats,
+ TDuration collectLongTaskStatsTimeout = TDuration::Max()
+ );
void AddNodeShardsCount(const ui32 stageId, const ui32 nodeId, const ui32 shardsCount) {
Y_ABORT_UNLESS(ShardsCountByNode[stageId].emplace(nodeId, shardsCount).second);
}
@@ -58,9 +64,24 @@ public:
}
void AddDatashardPrepareStats(NKikimrQueryStats::TTxStats&& txStats);
- void AddDatashardStats(NYql::NDqProto::TDqComputeActorStats&& stats, NKikimrQueryStats::TTxStats&& txStats);
+ void AddDatashardStats(
+ NYql::NDqProto::TDqComputeActorStats&& stats,
+ NKikimrQueryStats::TTxStats&& txStats,
+ TDuration collectLongTaskStatsTimeout = TDuration::Max()
+ );
void Finish();
+
+private:
+ void AddComputeActorFullStatsByTask(
+ const NYql::NDqProto::TDqTaskStats& task,
+ const NYql::NDqProto::TDqComputeActorStats& stats);
+ void AddComputeActorProfileStatsByTask(
+ const NYql::NDqProto::TDqTaskStats& task,
+ const NYql::NDqProto::TDqComputeActorStats& stats);
+ void AddDatashardFullStatsByTask(
+ const NYql::NDqProto::TDqTaskStats& task,
+ ui64 datashardCpuTimeUs);
};
struct TTableStat {
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index f917b39fee..cb18ace2cf 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -303,11 +303,18 @@ public:
Stats->FinishTs = TInstant::Now();
Stats->Finish();
- if (CollectFullStats(Request.StatsMode)) {
+ if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) {
const auto& tx = Request.Transactions[0].Body;
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
}
+
+ if (Stats->CollectStatsByLongTasks) {
+ const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
+ if (!txPlansWithStats.empty()) {
+ LOG_N("Full stats: " << txPlansWithStats);
+ }
+ }
}
LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize());
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 53cf5e1915..07047d3931 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -213,7 +213,7 @@ public:
}
auto selfId = SelfId();
auto as = TActivationContext::ActorSystem();
- ev->Get()->SetClientLostAction(selfId, as);
+ ev->Get()->SetClientLostAction(selfId, as);
QueryState = std::make_shared<TKqpQueryState>(
ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession,
Settings.TableService, Settings.QueryService, std::move(id), SessionId);
@@ -1044,7 +1044,7 @@ public:
request.MaxShardCount = RequestControls.MaxShardCount;
request.TraceId = QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId();
LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize());
-
+
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(),
RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(),
@@ -1585,7 +1585,7 @@ public:
}
LWTRACK(KqpSessionReplyError, QueryState->Orbit, TStringBuilder() << status);
}
-
+
Counters->ReportResponseStatus(Settings.DbCounters, record.ByteSize(), record.GetYdbStatus());
for (auto& issue : record.GetResponse().GetQueryIssues()) {
Counters->ReportIssues(Settings.DbCounters, CachedIssueCounters, issue);
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 9142022f2a..ea706ad72f 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1295,6 +1295,8 @@ message TTableServiceConfig {
optional uint32 AggregationComputeThreads = 37;
optional uint32 CSScanThreadsPerNode = 38;
+
+ optional uint64 CollectLongTasksStatsTimeoutMs = 22 [default = 10000]; // 10 sec
}
message TExecuterRetriesConfig {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp
index e86e3b11f6..bf0bc688d3 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp
@@ -37,23 +37,23 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase&
protoTask->SetFinishTimeMs(taskStats.FinishTs.MilliSeconds()); // to be reviewed
protoTask->SetStartTimeMs(taskStats.StartTs.MilliSeconds()); // to be reviewed
- if (StatsLevelCollectProfile(level)) {
- if (NActors::TlsActivationContext && NActors::TlsActivationContext->ActorSystem()) {
- protoTask->SetNodeId(NActors::TlsActivationContext->ActorSystem()->NodeId);
- }
- protoTask->SetHostName(HostName());
- protoTask->SetComputeCpuTimeUs(taskStats.ComputeCpuTime.MicroSeconds());
- protoTask->SetBuildCpuTimeUs(taskStats.BuildCpuTime.MicroSeconds());
+ if (NActors::TlsActivationContext && NActors::TlsActivationContext->ActorSystem()) {
+ protoTask->SetNodeId(NActors::TlsActivationContext->ActorSystem()->NodeId);
+ }
+ protoTask->SetHostName(HostName());
+ protoTask->SetComputeCpuTimeUs(taskStats.ComputeCpuTime.MicroSeconds());
+ protoTask->SetBuildCpuTimeUs(taskStats.BuildCpuTime.MicroSeconds());
- protoTask->SetWaitTimeUs(taskStats.WaitTime.MicroSeconds()); // to be reviewed
- protoTask->SetWaitOutputTimeUs(taskStats.WaitOutputTime.MicroSeconds()); // to be reviewed
+ protoTask->SetWaitTimeUs(taskStats.WaitTime.MicroSeconds()); // to be reviewed
+ protoTask->SetWaitOutputTimeUs(taskStats.WaitOutputTime.MicroSeconds()); // to be reviewed
- // All run statuses metrics
- protoTask->SetPendingInputTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::PendingInput].MicroSeconds()); // to be reviewed
- protoTask->SetPendingOutputTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::PendingOutput].MicroSeconds()); // to be reviewed
- protoTask->SetFinishTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::Finished].MicroSeconds()); // to be reviewed
- static_assert(TRunStatusTimeMetrics::StatusesCount == 3); // Add all statuses here
+ // All run statuses metrics
+ protoTask->SetPendingInputTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::PendingInput].MicroSeconds()); // to be reviewed
+ protoTask->SetPendingOutputTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::PendingOutput].MicroSeconds()); // to be reviewed
+ protoTask->SetFinishTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::Finished].MicroSeconds()); // to be reviewed
+ static_assert(TRunStatusTimeMetrics::StatusesCount == 3); // Add all statuses here
+ if (StatsLevelCollectProfile(level)) {
if (taskStats.ComputeCpuTimeByRun) {
auto snapshot = taskStats.ComputeCpuTimeByRun->Snapshot();
for (ui32 i = 0; i < snapshot->Count(); i++) {
diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto
index 48f16bbf0f..a67d87d70d 100644
--- a/ydb/library/yql/dq/actors/protos/dq_stats.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto
@@ -9,7 +9,7 @@ import "google/protobuf/any.proto";
enum EDqStatsMode {
DQ_STATS_MODE_UNSPECIFIED = 0;
DQ_STATS_MODE_NONE = 10; // no statistics is expected, it will be ignored anyway
- DQ_STATS_MODE_BASIC = 20; // per graph statistics
+ DQ_STATS_MODE_BASIC = 20; // per graph statistics
DQ_STATS_MODE_FULL = 25; // per stage/connection statistics
DQ_STATS_MODE_PROFILE = 30; // per task statistics
}
@@ -19,7 +19,7 @@ enum EDqStatsMode {
message TDqAsyncBufferStats {
// Data
uint64 Bytes = 1; // physical bytes
- uint64 Rows = 2; // logical rows (if applicable)
+ uint64 Rows = 2; // logical rows (if applicable)
uint64 Chunks = 3; // chunk is group of sent/received bytes in single batch
uint64 Splits = 4; // logical partitioning (if applicable)
// Time
@@ -35,7 +35,7 @@ message TDqAsyncInputBufferStats {
uint64 InputIndex = 1;
string IngressName = 9;
- TDqAsyncBufferStats Ingress = 10;
+ TDqAsyncBufferStats Ingress = 10;
TDqAsyncBufferStats Push = 11;
TDqAsyncBufferStats Pop = 12;
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 4e4fec2bef..9c0355424f 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -229,7 +229,7 @@ public:
, LogFunc(logFunc)
, AllocatedHolder(std::make_optional<TAllocatedHolder>())
{
- if (Settings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) {
+ if (CollectBasic()) {
Stats = std::make_unique<TDqTaskRunnerStats>();
Stats->StartTs = TInstant::Now();
if (Y_UNLIKELY(CollectFull())) {
@@ -267,6 +267,10 @@ public:
return Settings.StatsMode >= NDqProto::DQ_STATS_MODE_FULL;
}
+ bool CollectBasic() const {
+ return Settings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC;
+ }
+
const TDqMeteringStats* GetMeteringStats() const override {
return &BillingStats;
}
@@ -722,17 +726,14 @@ public:
}
if (runStatus == ERunStatus::Finished) {
- if (Stats) {
- Stats->FinishTs = TInstant::Now();
- }
- if (Y_UNLIKELY(CollectFull())) {
+ if (CollectBasic()) {
StopWaiting(Stats->FinishTs);
}
return ERunStatus::Finished;
}
- if (Y_UNLIKELY(CollectFull())) {
+ if (CollectBasic()) {
auto now = TInstant::Now();
StartWaiting(now);
if (runStatus == ERunStatus::PendingOutput) {
@@ -877,7 +878,7 @@ private:
if (Stats) {
auto duration = TInstant::Now() - startComputeTime;
Stats->ComputeCpuTime += duration;
- if (Y_UNLIKELY(CollectFull())) {
+ if (CollectBasic()) {
RunComputeTime = duration;
}
}
@@ -899,7 +900,7 @@ private:
wideBuffer.resize(AllocatedHolder->OutputWideType->GetElementsCount());
}
while (!AllocatedHolder->Output->IsFull()) {
- if (Y_UNLIKELY(CollectFull())) {
+ if (CollectBasic()) {
auto now = TInstant::Now();
StopWaitingOutput(now);
StopWaiting(now);
@@ -1002,26 +1003,26 @@ private:
std::optional<TInstant> StartWaitTime;
void StartWaitingOutput(TInstant now) {
- if (Y_UNLIKELY(CollectFull()) && !StartWaitOutputTime) {
+ if (CollectBasic() && !StartWaitOutputTime) {
StartWaitOutputTime = now;
}
}
void StopWaitingOutput(TInstant now) {
- if (Y_UNLIKELY(CollectFull()) && StartWaitOutputTime) {
+ if (CollectBasic() && StartWaitOutputTime) {
Stats->WaitOutputTime += (now - *StartWaitOutputTime);
StartWaitOutputTime.reset();
}
}
void StartWaiting(TInstant now) {
- if (Y_UNLIKELY(CollectFull()) && !StartWaitTime) {
+ if (CollectBasic() && !StartWaitTime) {
StartWaitTime = now;
}
}
void StopWaiting(TInstant now) {
- if (Y_UNLIKELY(CollectFull()) && StartWaitTime) {
+ if (CollectBasic() && StartWaitTime) {
Stats->WaitTime += (now - *StartWaitTime);
StartWaitTime.reset();
}