diff options
author | hor911 <hor911@ydb.tech> | 2023-09-13 16:58:33 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-09-13 19:34:10 +0300 |
commit | 984bb4059ca09282012c669b6be5f97cf87af0cc (patch) | |
tree | 5669be28f47ea4fcc73aac5a024fb80796d0dc1d | |
parent | 9f8f746bffdb6ce9c295d3ae39c7d960d620e6a2 (diff) | |
download | ydb-984bb4059ca09282012c669b6be5f97cf87af0cc.tar.gz |
Publish Ingress/Egress stats through KQP to YQv2 UI
-rw-r--r-- | ydb/core/fq/libs/compute/common/utils.cpp | 126 | ||||
-rw-r--r-- | ydb/core/fq/libs/control_plane_storage/internal/utils.cpp | 67 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_stats.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_query_plan.cpp | 24 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/protos/dq_stats.proto | 4 |
5 files changed, 204 insertions, 24 deletions
diff --git a/ydb/core/fq/libs/compute/common/utils.cpp b/ydb/core/fq/libs/compute/common/utils.cpp index bcebc00cff..aa32e5296f 100644 --- a/ydb/core/fq/libs/compute/common/utils.cpp +++ b/ydb/core/fq/libs/compute/common/utils.cpp @@ -5,36 +5,114 @@ namespace NFq { -void EnumeratePlans(NYson::TYsonWriter& writer, NJson::TJsonValue& value) { +void WriteNamedNode(NYson::TYsonWriter& writer, NJson::TJsonValue& node, const TString& name) { + switch (node.GetType()) { + case NJson::JSON_INTEGER: + case NJson::JSON_DOUBLE: + case NJson::JSON_UINTEGER: + if (name) { + auto v = node.GetIntegerRobust(); + writer.OnKeyedItem(name); + writer.OnBeginMap(); + writer.OnKeyedItem("sum"); + writer.OnInt64Scalar(v); + writer.OnKeyedItem("count"); + writer.OnInt64Scalar(1); + writer.OnKeyedItem("avg"); + writer.OnInt64Scalar(v); + writer.OnKeyedItem("max"); + writer.OnInt64Scalar(v); + writer.OnKeyedItem("min"); + writer.OnInt64Scalar(v); + writer.OnEndMap(); + } + break; + case NJson::JSON_ARRAY: + if (name) { + writer.OnKeyedItem(name); + writer.OnBeginMap(); + } + for (auto item : node.GetArray()) { + if (auto* subNode = item.GetValueByPath("Name")) { + WriteNamedNode(writer, item, subNode->GetStringRobust()); + } + } + if (name) { + writer.OnEndMap(); + } + break; + case NJson::JSON_MAP: + if (auto* subNode = node.GetValueByPath("Sum")) { + auto sum = subNode->GetIntegerRobust(); + auto count = 1; + if (auto* subNode = node.GetValueByPath("Count")) { + count = subNode->GetIntegerRobust(); + if (count <= 1) { + count = 1; + } + } + auto min = sum; + if (auto* subNode = node.GetValueByPath("Min")) { + min = subNode->GetIntegerRobust(); + } + auto max = sum; + if (auto* subNode = node.GetValueByPath("Max")) { + max = subNode->GetIntegerRobust(); + } + writer.OnKeyedItem(name); + writer.OnBeginMap(); + writer.OnKeyedItem("sum"); + writer.OnInt64Scalar(sum); + writer.OnKeyedItem("count"); + writer.OnInt64Scalar(count); + writer.OnKeyedItem("avg"); + writer.OnInt64Scalar(sum / count); + writer.OnKeyedItem("max"); + writer.OnInt64Scalar(max); + writer.OnKeyedItem("min"); + writer.OnInt64Scalar(min); + writer.OnEndMap(); + } else { + if (name) { + writer.OnKeyedItem(name); + writer.OnBeginMap(); + } + for (auto& [key, value] : node.GetMapSafe()) { + WriteNamedNode(writer, value, key); + } + if (name) { + writer.OnEndMap(); + } + } + break; + default: + break; + } +} + +void EnumeratePlans(NYson::TYsonWriter& writer, NJson::TJsonValue& value, const TString& prefix) { if (auto* subNode = value.GetValueByPath("Plans")) { ui32 index = 0; TString nodeType = "Unknown"; - if (auto* subNode = value.GetValueByPath("PlanNodeType")) { + if (auto* subNode = value.GetValueByPath("Node Type")) { nodeType = subNode->GetStringRobust(); } + if (prefix) { + nodeType = prefix + "." + nodeType; + } for (auto plan : subNode->GetArray()) { - writer.OnKeyedItem(nodeType + "." + ToString(index++)); - writer.OnBeginMap(); - EnumeratePlans(writer, plan); - if (auto* subNode = plan.GetValueByPath("Stats")) { - for (auto& [key, value] : subNode->GetMapSafe()) { - auto v = value.GetIntegerRobust(); - writer.OnKeyedItem(key); - writer.OnBeginMap(); - writer.OnKeyedItem("sum"); - writer.OnInt64Scalar(v); - writer.OnKeyedItem("count"); - writer.OnInt64Scalar(v); - writer.OnKeyedItem("avg"); - writer.OnInt64Scalar(v); - writer.OnKeyedItem("max"); - writer.OnInt64Scalar(v); - writer.OnKeyedItem("min"); - writer.OnInt64Scalar(v); - writer.OnEndMap(); - } + auto itemPrefix = nodeType + "[" + ToString(index++) + "]"; + auto* statNode = plan.GetValueByPath("Stats"); + if (statNode) { + writer.OnKeyedItem(itemPrefix); + writer.OnBeginMap(); + itemPrefix = ""; + } + EnumeratePlans(writer, plan, itemPrefix); + if (statNode) { + WriteNamedNode(writer, *statNode, ""); + writer.OnEndMap(); } - writer.OnEndMap(); } } } @@ -48,7 +126,7 @@ TString GetV1StatFromV2Plan(const TString& plan) { NJson::TJsonValue stat; if (NJson::ReadJsonTree(plan, &jsonConfig, &stat)) { if (auto* subNode = stat.GetValueByPath("Plan")) { - EnumeratePlans(writer, *subNode); + EnumeratePlans(writer, *subNode, ""); } } writer.OnEndMap(); diff --git a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp index 00f2f8d559..2efdc33ce6 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp @@ -179,6 +179,62 @@ void RemapNode(NYson::TYsonWriter& writer, const NJson::TJsonValue& node, const } } +void AggregateNode(const NJson::TJsonValue& node, const TString& path, ui64& min, ui64& max, ui64& sum, ui64& count) { + if (node.GetType() == NJson::JSON_MAP) { + if (auto* subNode = node.GetValueByPath(path)) { + if (auto* keyNode = subNode->GetValueByPath("count")) { + auto nodeCount = keyNode->GetInteger(); + if (nodeCount) { + if (auto* keyNode = subNode->GetValueByPath("min")) { + auto nodeMin = keyNode->GetInteger(); + min = count ? std::min<ui64>(min, nodeMin) : nodeMin; + } + if (auto* keyNode = subNode->GetValueByPath("max")) { + auto nodeMax = keyNode->GetInteger(); + max = count ? std::max<ui64>(max, nodeMax) : nodeMax; + } + if (auto* keyNode = subNode->GetValueByPath("sum")) { + sum += keyNode->GetInteger(); + } + // ignore "avg" + count += nodeCount; + } + } + } + for (const auto& p : node.GetMap()) { + if (p.first == "min" || p.first == "max" || p.first == "sum" || p.first == "count" || p.first == "avg") { + return; + } + AggregateNode(p.second, path, min, max, sum, count); + } + } +} + +void AggregateNode(NYson::TYsonWriter& writer, const NJson::TJsonValue& node, const TString& path, const TString& key) { + ui64 min = 0; + ui64 max = 0; + ui64 sum = 0; + ui64 count = 0; + + AggregateNode(node, path, min, max, sum, count); + + if (count) { + writer.OnKeyedItem(key); + writer.OnBeginMap(); + writer.OnKeyedItem("sum"); + writer.OnInt64Scalar(sum); + writer.OnKeyedItem("count"); + writer.OnInt64Scalar(count); + writer.OnKeyedItem("avg"); + writer.OnInt64Scalar(sum / count); + writer.OnKeyedItem("min"); + writer.OnInt64Scalar(min); + writer.OnKeyedItem("max"); + writer.OnInt64Scalar(max); + writer.OnEndMap(); + } +} + TString GetPrettyStatistics(const TString& statistics) { TStringStream out; NYson::TYsonWriter writer(&out); @@ -187,6 +243,7 @@ TString GetPrettyStatistics(const TString& statistics) { NJson::TJsonValue stat; if (NJson::ReadJsonTree(statistics, &jsonConfig, &stat)) { for (const auto& p : stat.GetMap()) { + // YQv1 if (p.first.StartsWith("Graph=") || p.first.StartsWith("Precompute=")) { writer.OnKeyedItem(p.first); writer.OnBeginMap(); @@ -203,6 +260,16 @@ TString GetPrettyStatistics(const TString& statistics) { RemapNode(writer, p.second, "TaskRunner.Source=0.Stage=Total.RowsIn", "IngressRows"); writer.OnEndMap(); } + // YQv2 + if (p.first.StartsWith("Query[")) { + writer.OnKeyedItem(p.first); + writer.OnBeginMap(); + RemapNode(writer, p.second, "TotalTasks", "TasksCount"); + RemapNode(writer, p.second, "TotalCpuTimeUs", "CpuTimeUs"); + AggregateNode(writer, p.second, "IngressBytes.S3Source", "IngressObjectStorageBytes"); + AggregateNode(writer, p.second, "EgressBytes.S3Sink", "EgressObjectStorageBytes"); + writer.OnEndMap(); + } } } writer.OnEndMap(); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp index 8bb42587a0..6617f44e29 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp @@ -171,6 +171,13 @@ void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProt UpdateMinMax(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); stageStats->SetDurationUs((stageStats->GetFinishTimeMs().GetMax() - stageStats->GetFirstRowTimeMs().GetMin()) * 1'000); + + for (auto ingressStat : task.GetIngress()) { + UpdateAggr(&(*stageStats->MutableIngressBytes())[ingressStat.GetName()], ingressStat.GetBytes()); + } + for (auto egressStat : task.GetEgress()) { + UpdateAggr(&(*stageStats->MutableEgressBytes())[egressStat.GetName()], egressStat.GetBytes()); + } } } diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index aaf7c57c78..3836800ca4 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -1817,6 +1817,30 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD stats["TotalOutputRows"] = (*stat)->GetOutputRows().GetSum(); stats["TotalOutputBytes"] = (*stat)->GetOutputBytes().GetSum(); + if (!(*stat)->GetIngressBytes().empty()) { + auto& ingressStats = stats.InsertValue("IngressBytes", NJson::JSON_ARRAY); + for (auto ingressBytes : (*stat)->GetIngressBytes()) { + auto& ingressInfo = ingressStats.AppendValue(NJson::JSON_MAP); + ingressInfo["Name"] = ingressBytes.first; + ingressInfo["Min"] = ingressBytes.second.GetMin(); + ingressInfo["Max"] = ingressBytes.second.GetMax(); + ingressInfo["Sum"] = ingressBytes.second.GetSum(); + ingressInfo["Count"] = ingressBytes.second.GetCnt(); + } + } + + if (!(*stat)->GetEgressBytes().empty()) { + auto& egressStats = stats.InsertValue("EgressBytes", NJson::JSON_ARRAY); + for (auto egressBytes : (*stat)->GetEgressBytes()) { + auto& egressInfo = egressStats.AppendValue(NJson::JSON_MAP); + egressInfo["Name"] = egressBytes.first; + egressInfo["Min"] = egressBytes.second.GetMin(); + egressInfo["Max"] = egressBytes.second.GetMax(); + egressInfo["Sum"] = egressBytes.second.GetSum(); + egressInfo["Count"] = egressBytes.second.GetCnt(); + } + } + NKqpProto::TKqpStageExtraStats kqpStageStats; if ((*stat)->GetExtra().UnpackTo(&kqpStageStats)) { auto& nodesStats = stats.InsertValue("NodesScanShards", NJson::JSON_ARRAY); diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto index a8ac09a210..d898d3dfaa 100644 --- a/ydb/library/yql/dq/actors/protos/dq_stats.proto +++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto @@ -245,6 +245,10 @@ message TDqStageStats { bool UseLlvm = 18; } + // currently only 1 source/sink per stage is supported + map<string, TDqStatsAggr> IngressBytes = 19; // ingress from external source, per provider + map<string, TDqStatsAggr> EgressBytes = 20; // egress to external consumer, per provider + google.protobuf.Any Extra = 100; } |