aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-09-13 16:58:33 +0300
committerhor911 <hor911@ydb.tech>2023-09-13 19:34:10 +0300
commit984bb4059ca09282012c669b6be5f97cf87af0cc (patch)
tree5669be28f47ea4fcc73aac5a024fb80796d0dc1d
parent9f8f746bffdb6ce9c295d3ae39c7d960d620e6a2 (diff)
downloadydb-984bb4059ca09282012c669b6be5f97cf87af0cc.tar.gz
Publish Ingress/Egress stats through KQP to YQv2 UI
-rw-r--r--ydb/core/fq/libs/compute/common/utils.cpp126
-rw-r--r--ydb/core/fq/libs/control_plane_storage/internal/utils.cpp67
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_stats.cpp7
-rw-r--r--ydb/core/kqp/opt/kqp_query_plan.cpp24
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_stats.proto4
5 files changed, 204 insertions, 24 deletions
diff --git a/ydb/core/fq/libs/compute/common/utils.cpp b/ydb/core/fq/libs/compute/common/utils.cpp
index bcebc00cff..aa32e5296f 100644
--- a/ydb/core/fq/libs/compute/common/utils.cpp
+++ b/ydb/core/fq/libs/compute/common/utils.cpp
@@ -5,36 +5,114 @@
namespace NFq {
-void EnumeratePlans(NYson::TYsonWriter& writer, NJson::TJsonValue& value) {
+void WriteNamedNode(NYson::TYsonWriter& writer, NJson::TJsonValue& node, const TString& name) {
+ switch (node.GetType()) {
+ case NJson::JSON_INTEGER:
+ case NJson::JSON_DOUBLE:
+ case NJson::JSON_UINTEGER:
+ if (name) {
+ auto v = node.GetIntegerRobust();
+ writer.OnKeyedItem(name);
+ writer.OnBeginMap();
+ writer.OnKeyedItem("sum");
+ writer.OnInt64Scalar(v);
+ writer.OnKeyedItem("count");
+ writer.OnInt64Scalar(1);
+ writer.OnKeyedItem("avg");
+ writer.OnInt64Scalar(v);
+ writer.OnKeyedItem("max");
+ writer.OnInt64Scalar(v);
+ writer.OnKeyedItem("min");
+ writer.OnInt64Scalar(v);
+ writer.OnEndMap();
+ }
+ break;
+ case NJson::JSON_ARRAY:
+ if (name) {
+ writer.OnKeyedItem(name);
+ writer.OnBeginMap();
+ }
+ for (auto item : node.GetArray()) {
+ if (auto* subNode = item.GetValueByPath("Name")) {
+ WriteNamedNode(writer, item, subNode->GetStringRobust());
+ }
+ }
+ if (name) {
+ writer.OnEndMap();
+ }
+ break;
+ case NJson::JSON_MAP:
+ if (auto* subNode = node.GetValueByPath("Sum")) {
+ auto sum = subNode->GetIntegerRobust();
+ auto count = 1;
+ if (auto* subNode = node.GetValueByPath("Count")) {
+ count = subNode->GetIntegerRobust();
+ if (count <= 1) {
+ count = 1;
+ }
+ }
+ auto min = sum;
+ if (auto* subNode = node.GetValueByPath("Min")) {
+ min = subNode->GetIntegerRobust();
+ }
+ auto max = sum;
+ if (auto* subNode = node.GetValueByPath("Max")) {
+ max = subNode->GetIntegerRobust();
+ }
+ writer.OnKeyedItem(name);
+ writer.OnBeginMap();
+ writer.OnKeyedItem("sum");
+ writer.OnInt64Scalar(sum);
+ writer.OnKeyedItem("count");
+ writer.OnInt64Scalar(count);
+ writer.OnKeyedItem("avg");
+ writer.OnInt64Scalar(sum / count);
+ writer.OnKeyedItem("max");
+ writer.OnInt64Scalar(max);
+ writer.OnKeyedItem("min");
+ writer.OnInt64Scalar(min);
+ writer.OnEndMap();
+ } else {
+ if (name) {
+ writer.OnKeyedItem(name);
+ writer.OnBeginMap();
+ }
+ for (auto& [key, value] : node.GetMapSafe()) {
+ WriteNamedNode(writer, value, key);
+ }
+ if (name) {
+ writer.OnEndMap();
+ }
+ }
+ break;
+ default:
+ break;
+ }
+}
+
+void EnumeratePlans(NYson::TYsonWriter& writer, NJson::TJsonValue& value, const TString& prefix) {
if (auto* subNode = value.GetValueByPath("Plans")) {
ui32 index = 0;
TString nodeType = "Unknown";
- if (auto* subNode = value.GetValueByPath("PlanNodeType")) {
+ if (auto* subNode = value.GetValueByPath("Node Type")) {
nodeType = subNode->GetStringRobust();
}
+ if (prefix) {
+ nodeType = prefix + "." + nodeType;
+ }
for (auto plan : subNode->GetArray()) {
- writer.OnKeyedItem(nodeType + "." + ToString(index++));
- writer.OnBeginMap();
- EnumeratePlans(writer, plan);
- if (auto* subNode = plan.GetValueByPath("Stats")) {
- for (auto& [key, value] : subNode->GetMapSafe()) {
- auto v = value.GetIntegerRobust();
- writer.OnKeyedItem(key);
- writer.OnBeginMap();
- writer.OnKeyedItem("sum");
- writer.OnInt64Scalar(v);
- writer.OnKeyedItem("count");
- writer.OnInt64Scalar(v);
- writer.OnKeyedItem("avg");
- writer.OnInt64Scalar(v);
- writer.OnKeyedItem("max");
- writer.OnInt64Scalar(v);
- writer.OnKeyedItem("min");
- writer.OnInt64Scalar(v);
- writer.OnEndMap();
- }
+ auto itemPrefix = nodeType + "[" + ToString(index++) + "]";
+ auto* statNode = plan.GetValueByPath("Stats");
+ if (statNode) {
+ writer.OnKeyedItem(itemPrefix);
+ writer.OnBeginMap();
+ itemPrefix = "";
+ }
+ EnumeratePlans(writer, plan, itemPrefix);
+ if (statNode) {
+ WriteNamedNode(writer, *statNode, "");
+ writer.OnEndMap();
}
- writer.OnEndMap();
}
}
}
@@ -48,7 +126,7 @@ TString GetV1StatFromV2Plan(const TString& plan) {
NJson::TJsonValue stat;
if (NJson::ReadJsonTree(plan, &jsonConfig, &stat)) {
if (auto* subNode = stat.GetValueByPath("Plan")) {
- EnumeratePlans(writer, *subNode);
+ EnumeratePlans(writer, *subNode, "");
}
}
writer.OnEndMap();
diff --git a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
index 00f2f8d559..2efdc33ce6 100644
--- a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
@@ -179,6 +179,62 @@ void RemapNode(NYson::TYsonWriter& writer, const NJson::TJsonValue& node, const
}
}
+void AggregateNode(const NJson::TJsonValue& node, const TString& path, ui64& min, ui64& max, ui64& sum, ui64& count) {
+ if (node.GetType() == NJson::JSON_MAP) {
+ if (auto* subNode = node.GetValueByPath(path)) {
+ if (auto* keyNode = subNode->GetValueByPath("count")) {
+ auto nodeCount = keyNode->GetInteger();
+ if (nodeCount) {
+ if (auto* keyNode = subNode->GetValueByPath("min")) {
+ auto nodeMin = keyNode->GetInteger();
+ min = count ? std::min<ui64>(min, nodeMin) : nodeMin;
+ }
+ if (auto* keyNode = subNode->GetValueByPath("max")) {
+ auto nodeMax = keyNode->GetInteger();
+ max = count ? std::max<ui64>(max, nodeMax) : nodeMax;
+ }
+ if (auto* keyNode = subNode->GetValueByPath("sum")) {
+ sum += keyNode->GetInteger();
+ }
+ // ignore "avg"
+ count += nodeCount;
+ }
+ }
+ }
+ for (const auto& p : node.GetMap()) {
+ if (p.first == "min" || p.first == "max" || p.first == "sum" || p.first == "count" || p.first == "avg") {
+ return;
+ }
+ AggregateNode(p.second, path, min, max, sum, count);
+ }
+ }
+}
+
+void AggregateNode(NYson::TYsonWriter& writer, const NJson::TJsonValue& node, const TString& path, const TString& key) {
+ ui64 min = 0;
+ ui64 max = 0;
+ ui64 sum = 0;
+ ui64 count = 0;
+
+ AggregateNode(node, path, min, max, sum, count);
+
+ if (count) {
+ writer.OnKeyedItem(key);
+ writer.OnBeginMap();
+ writer.OnKeyedItem("sum");
+ writer.OnInt64Scalar(sum);
+ writer.OnKeyedItem("count");
+ writer.OnInt64Scalar(count);
+ writer.OnKeyedItem("avg");
+ writer.OnInt64Scalar(sum / count);
+ writer.OnKeyedItem("min");
+ writer.OnInt64Scalar(min);
+ writer.OnKeyedItem("max");
+ writer.OnInt64Scalar(max);
+ writer.OnEndMap();
+ }
+}
+
TString GetPrettyStatistics(const TString& statistics) {
TStringStream out;
NYson::TYsonWriter writer(&out);
@@ -187,6 +243,7 @@ TString GetPrettyStatistics(const TString& statistics) {
NJson::TJsonValue stat;
if (NJson::ReadJsonTree(statistics, &jsonConfig, &stat)) {
for (const auto& p : stat.GetMap()) {
+ // YQv1
if (p.first.StartsWith("Graph=") || p.first.StartsWith("Precompute=")) {
writer.OnKeyedItem(p.first);
writer.OnBeginMap();
@@ -203,6 +260,16 @@ TString GetPrettyStatistics(const TString& statistics) {
RemapNode(writer, p.second, "TaskRunner.Source=0.Stage=Total.RowsIn", "IngressRows");
writer.OnEndMap();
}
+ // YQv2
+ if (p.first.StartsWith("Query[")) {
+ writer.OnKeyedItem(p.first);
+ writer.OnBeginMap();
+ RemapNode(writer, p.second, "TotalTasks", "TasksCount");
+ RemapNode(writer, p.second, "TotalCpuTimeUs", "CpuTimeUs");
+ AggregateNode(writer, p.second, "IngressBytes.S3Source", "IngressObjectStorageBytes");
+ AggregateNode(writer, p.second, "EgressBytes.S3Sink", "EgressObjectStorageBytes");
+ writer.OnEndMap();
+ }
}
}
writer.OnEndMap();
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
index 8bb42587a0..6617f44e29 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
@@ -171,6 +171,13 @@ void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProt
UpdateMinMax(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs());
stageStats->SetDurationUs((stageStats->GetFinishTimeMs().GetMax() - stageStats->GetFirstRowTimeMs().GetMin()) * 1'000);
+
+ for (auto ingressStat : task.GetIngress()) {
+ UpdateAggr(&(*stageStats->MutableIngressBytes())[ingressStat.GetName()], ingressStat.GetBytes());
+ }
+ for (auto egressStat : task.GetEgress()) {
+ UpdateAggr(&(*stageStats->MutableEgressBytes())[egressStat.GetName()], egressStat.GetBytes());
+ }
}
}
diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp
index aaf7c57c78..3836800ca4 100644
--- a/ydb/core/kqp/opt/kqp_query_plan.cpp
+++ b/ydb/core/kqp/opt/kqp_query_plan.cpp
@@ -1817,6 +1817,30 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
stats["TotalOutputRows"] = (*stat)->GetOutputRows().GetSum();
stats["TotalOutputBytes"] = (*stat)->GetOutputBytes().GetSum();
+ if (!(*stat)->GetIngressBytes().empty()) {
+ auto& ingressStats = stats.InsertValue("IngressBytes", NJson::JSON_ARRAY);
+ for (auto ingressBytes : (*stat)->GetIngressBytes()) {
+ auto& ingressInfo = ingressStats.AppendValue(NJson::JSON_MAP);
+ ingressInfo["Name"] = ingressBytes.first;
+ ingressInfo["Min"] = ingressBytes.second.GetMin();
+ ingressInfo["Max"] = ingressBytes.second.GetMax();
+ ingressInfo["Sum"] = ingressBytes.second.GetSum();
+ ingressInfo["Count"] = ingressBytes.second.GetCnt();
+ }
+ }
+
+ if (!(*stat)->GetEgressBytes().empty()) {
+ auto& egressStats = stats.InsertValue("EgressBytes", NJson::JSON_ARRAY);
+ for (auto egressBytes : (*stat)->GetEgressBytes()) {
+ auto& egressInfo = egressStats.AppendValue(NJson::JSON_MAP);
+ egressInfo["Name"] = egressBytes.first;
+ egressInfo["Min"] = egressBytes.second.GetMin();
+ egressInfo["Max"] = egressBytes.second.GetMax();
+ egressInfo["Sum"] = egressBytes.second.GetSum();
+ egressInfo["Count"] = egressBytes.second.GetCnt();
+ }
+ }
+
NKqpProto::TKqpStageExtraStats kqpStageStats;
if ((*stat)->GetExtra().UnpackTo(&kqpStageStats)) {
auto& nodesStats = stats.InsertValue("NodesScanShards", NJson::JSON_ARRAY);
diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto
index a8ac09a210..d898d3dfaa 100644
--- a/ydb/library/yql/dq/actors/protos/dq_stats.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto
@@ -245,6 +245,10 @@ message TDqStageStats {
bool UseLlvm = 18;
}
+ // currently only 1 source/sink per stage is supported
+ map<string, TDqStatsAggr> IngressBytes = 19; // ingress from external source, per provider
+ map<string, TDqStatsAggr> EgressBytes = 20; // egress to external consumer, per provider
+
google.protobuf.Any Extra = 100;
}