aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-12-07 12:17:15 +0300
committerhcpp <hcpp@ydb.tech>2023-12-07 13:06:06 +0300
commit9a6139f092a89bfc1685550d6560c4f79997eae2 (patch)
tree2e8fd3348605859eb5a38c37d40be7023b9ebbd4
parent9a56672d465a1b3ba59eb93cc37eadab9440975b (diff)
downloadydb-9a6139f092a89bfc1685550d6560c4f79997eae2.tar.gz
public_metrics
public metrics connector ua has been fixed
-rw-r--r--ydb/core/fq/libs/compute/common/utils.cpp80
-rw-r--r--ydb/core/fq/libs/compute/common/utils.h12
-rw-r--r--ydb/core/fq/libs/compute/ydb/base_compute_actor.h11
-rw-r--r--ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp41
-rw-r--r--ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp2
-rw-r--r--ydb/tests/fq/s3/test_public_metrics.py82
-rw-r--r--ydb/tests/fq/s3/ya.make1
7 files changed, 227 insertions, 2 deletions
diff --git a/ydb/core/fq/libs/compute/common/utils.cpp b/ydb/core/fq/libs/compute/common/utils.cpp
index ac14d4a078..2376d19b22 100644
--- a/ydb/core/fq/libs/compute/common/utils.cpp
+++ b/ydb/core/fq/libs/compute/common/utils.cpp
@@ -643,4 +643,84 @@ TString GetV1StatFromV2PlanV2(const TString& plan) {
return NJson2Yson::ConvertYson2Json(out.Str());
}
+std::optional<int> GetValue(const NJson::TJsonValue& node, const TString& name) {
+ if (auto* keyNode = node.GetValueByPath(name)) {
+ auto result = keyNode->GetInteger();
+ if (result) {
+ return result;
+ }
+ }
+ return {};
+}
+
+void AggregateNode(const NJson::TJsonValue& node, const TString& name, ui64& sum) {
+ if (node.GetType() == NJson::JSON_MAP) {
+ if (auto* subNode = node.GetValueByPath(name)) {
+ if (auto* keyNode = subNode->GetValueByPath("count")) {
+ auto nodeCount = keyNode->GetInteger();
+ if (nodeCount) {
+ if (auto* keyNode = subNode->GetValueByPath("sum")) {
+ sum += keyNode->GetInteger();
+ }
+ }
+ }
+ }
+ }
+}
+
+std::optional<int> GetNodeValue(const NJson::TJsonValue& node, const TString& name, bool aggregate = false) {
+ if (aggregate) {
+ ui64 sum = 0;
+ if (node.GetType() == NJson::JSON_MAP) {
+ for (const auto& p : node.GetMap()) {
+ AggregateNode(p.second, name, sum);
+ }
+ }
+ if (sum) {
+ return sum;
+ }
+ return {};
+ }
+ if (auto* subNode = node.GetValueByPath(name)) {
+ return GetValue(*subNode, "sum");
+ }
+ return {};
+}
+
+std::optional<int> Sum(const std::optional<int>& a, const std::optional<int>& b) {
+ if (!a) {
+ return b;
+ }
+
+ if (!b) {
+ return a;
+ }
+
+ return *a + *b;
+}
+
+TPublicStat GetPublicStat(const TString& statistics) {
+ TPublicStat counters;
+ NJson::TJsonReaderConfig jsonConfig;
+ NJson::TJsonValue stat;
+ if (NJson::ReadJsonTree(statistics, &jsonConfig, &stat)) {
+
+ // EXP
+ if (stat.GetValueByPath("Columns")) {
+ return counters;
+ }
+
+ for (const auto& p : stat.GetMap()) {
+ counters.MemoryUsageBytes = Sum(counters.MemoryUsageBytes, GetNodeValue(p.second, "MaxMemoryUsage"));
+ counters.CpuUsageUs = Sum(counters.CpuUsageUs, GetNodeValue(p.second, "CpuTimeUs"));
+ counters.InputBytes = Sum(counters.InputBytes, GetNodeValue(p.second, "InputBytes"));
+ counters.OutputBytes = Sum(counters.OutputBytes, GetNodeValue(p.second, "OutputBytes"));
+ counters.SourceInputRecords = Sum(counters.SourceInputRecords, GetNodeValue(p.second, "InputRows"));
+ counters.SinkOutputRecords = Sum(counters.SinkOutputRecords, GetNodeValue(p.second, "OutputRows"));
+ counters.RunningTasks = Sum(counters.RunningTasks, GetNodeValue(p.second, "TotalTasks", true));
+ }
+ }
+ return counters;
+}
+
} // namespace NFq
diff --git a/ydb/core/fq/libs/compute/common/utils.h b/ydb/core/fq/libs/compute/common/utils.h
index dae8624eaf..d2fad6d01f 100644
--- a/ydb/core/fq/libs/compute/common/utils.h
+++ b/ydb/core/fq/libs/compute/common/utils.h
@@ -31,4 +31,16 @@ TString FormatDurationMs(ui64 durationMs);
TString FormatDurationUs(ui64 durationUs);
TString FormatInstant(TInstant instant);
+struct TPublicStat {
+ std::optional<int> MemoryUsageBytes = 0;
+ std::optional<int> CpuUsageUs = 0;
+ std::optional<int> InputBytes = 0;
+ std::optional<int> OutputBytes = 0;
+ std::optional<int> SourceInputRecords = 0;
+ std::optional<int> SinkOutputRecords = 0;
+ std::optional<int> RunningTasks = 0;
+};
+
+TPublicStat GetPublicStat(const TString& statistics);
+
} // namespace NFq
diff --git a/ydb/core/fq/libs/compute/ydb/base_compute_actor.h b/ydb/core/fq/libs/compute/ydb/base_compute_actor.h
index 6bbebcaef0..420eedbeb3 100644
--- a/ydb/core/fq/libs/compute/ydb/base_compute_actor.h
+++ b/ydb/core/fq/libs/compute/ydb/base_compute_actor.h
@@ -20,13 +20,15 @@ public:
using TBase::PassAway;
TBaseComputeActor(const ::NYql::NCommon::TServiceCounters& queryCounters, const TString& stepName)
- : BaseCounters(queryCounters.Counters)
+ : PublicCounters(queryCounters.PublicCounters)
+ , BaseCounters(queryCounters.Counters)
, Counters(MakeIntrusive<TComputeRequestCounters>("Total", queryCounters.Counters->GetSubgroup("step", stepName)))
, TotalStartTime(TInstant::Now())
{}
TBaseComputeActor(const ::NMonitoring::TDynamicCounterPtr& baseCounters, const TString& stepName)
- : BaseCounters(baseCounters)
+ : PublicCounters(MakeIntrusive<::NMonitoring::TDynamicCounters>())
+ , BaseCounters(baseCounters)
, Counters(MakeIntrusive<TComputeRequestCounters>("Total", baseCounters->GetSubgroup("step", stepName)))
, TotalStartTime(TInstant::Now())
{}
@@ -64,7 +66,12 @@ public:
return BaseCounters;
}
+ ::NMonitoring::TDynamicCounterPtr GetPublicCounters() const {
+ return PublicCounters;
+ }
+
private:
+ ::NMonitoring::TDynamicCounterPtr PublicCounters;
::NMonitoring::TDynamicCounterPtr BaseCounters;
TComputeRequestCountersPtr Counters;
TInstant TotalStartTime;
diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
index ef628906ce..f21c7e8982 100644
--- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
@@ -124,6 +124,7 @@ public:
return;
}
+ ReportPublicCounters(response.QueryStats);
StartTime = TInstant::Now();
LOG_D("Execution status: " << static_cast<int>(response.ExecStatus));
switch (response.ExecStatus) {
@@ -153,6 +154,46 @@ public:
}
}
+ void ReportPublicCounters(const Ydb::TableStats::QueryStats& stats) {
+ auto stat = GetPublicStat(GetV1StatFromV2Plan(stats.query_plan()));
+ auto publicCounters = GetPublicCounters();
+
+ if (stat.MemoryUsageBytes) {
+ auto& counter = *publicCounters->GetNamedCounter("name", "query.memory_usage_bytes");
+ counter = *stat.MemoryUsageBytes;
+ }
+
+ if (stat.CpuUsageUs) {
+ auto& counter = *publicCounters->GetNamedCounter("name", "query.cpu_usage_us", true);
+ counter = *stat.CpuUsageUs;
+ }
+
+ if (stat.InputBytes) {
+ auto& counter = *publicCounters->GetNamedCounter("name", "query.input_bytes", true);
+ counter = *stat.InputBytes;
+ }
+
+ if (stat.OutputBytes) {
+ auto& counter = *publicCounters->GetNamedCounter("name", "query.output_bytes", true);
+ counter = *stat.OutputBytes;
+ }
+
+ if (stat.SourceInputRecords) {
+ auto& counter = *publicCounters->GetNamedCounter("name", "query.source_input_records", true);
+ counter = *stat.SourceInputRecords;
+ }
+
+ if (stat.SinkOutputRecords) {
+ auto& counter = *publicCounters->GetNamedCounter("name", "query.sink_output_records", true);
+ counter = *stat.SinkOutputRecords;
+ }
+
+ if (stat.RunningTasks) {
+ auto& counter = *publicCounters->GetNamedCounter("name", "query.running_tasks");
+ counter = *stat.RunningTasks;
+ }
+ }
+
void SendGetOperation(const TDuration& delay = TDuration::Zero()) {
Register(new TRetryActor<TEvYdbCompute::TEvGetOperationRequest, TEvYdbCompute::TEvGetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_GET_OPERATION), delay, SelfId(), Connector, OperationId));
}
diff --git a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
index 6c7d1a6cb6..2988fbc942 100644
--- a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
@@ -197,6 +197,7 @@ public:
}
void ResignAndPassAway(const NYql::TIssues& issues) {
+ Send(FetcherId, new NActors::TEvents::TEvPoisonTaken());
Fq::Private::PingTaskRequest pingTaskRequest;
NYql::IssuesToMessage(issues, pingTaskRequest.mutable_transient_issues());
pingTaskRequest.set_resign_query(true);
@@ -208,6 +209,7 @@ public:
}
void FinishAndPassAway() {
+ Send(FetcherId, new NActors::TEvents::TEvPoisonTaken());
Send(Connector, new NActors::TEvents::TEvPoisonPill());
PassAway();
}
diff --git a/ydb/tests/fq/s3/test_public_metrics.py b/ydb/tests/fq/s3/test_public_metrics.py
new file mode 100644
index 0000000000..cefe06147e
--- /dev/null
+++ b/ydb/tests/fq/s3/test_public_metrics.py
@@ -0,0 +1,82 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import boto3
+import pytest
+
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_all
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+
+class TestPublicMetrics:
+ @yq_all
+ @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
+ def test_public_metrics(self, kikimr, s3, client, yq_version):
+ resource = boto3.resource(
+ "s3",
+ endpoint_url=s3.s3_url,
+ aws_access_key_id="key",
+ aws_secret_access_key="secret_key"
+ )
+
+ bucket = resource.Bucket("fbucket")
+ bucket.create(ACL='public-read')
+ bucket.objects.all().delete()
+
+ s3_client = boto3.client(
+ "s3",
+ endpoint_url=s3.s3_url,
+ aws_access_key_id="key",
+ aws_secret_access_key="secret_key"
+ )
+
+ fruits = R'''Fruit,Price,Weight
+Banana,3,100
+Apple,2,22
+Pear,15,33'''
+ s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.csv', ContentType='text/plain')
+ kikimr.control_plane.wait_bootstrap(1)
+ client.create_storage_connection("fruitbucket", "fbucket")
+
+ sql = R'''
+ SELECT *
+ FROM fruitbucket.`fruits.csv`
+ WITH (format=csv_with_names, SCHEMA (
+ Fruit String NOT NULL,
+ Price Int NOT NULL,
+ Weight Int NOT NULL
+ ));
+ '''
+
+ cloud_id = "mock_cloud"
+ folder_id = "my_folder"
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
+
+ client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
+
+ metrics = kikimr.compute_plane.get_sensors(1, "yq_public") if yq_version == "v1" else kikimr.control_plane.get_sensors(1, "yq_public")
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.running_tasks"}) >= 0
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.cpu_usage_us"}) >= 0
+ assert metrics.find_sensor({"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id,
+ "name": "query.memory_usage_bytes"}) > 0
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.input_bytes"}) > 0
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.uptime_seconds"}) >= 0
+ if yq_version == "v1":
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.output_bytes"}) is None
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.source_input_records"}) is None
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.sink_output_records"}) is None
+ else:
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.output_bytes"}) > 0
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.source_input_records"}) > 0
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.sink_output_records"}) > 0
diff --git a/ydb/tests/fq/s3/ya.make b/ydb/tests/fq/s3/ya.make
index 4b1eca093e..54bd7ac3dc 100644
--- a/ydb/tests/fq/s3/ya.make
+++ b/ydb/tests/fq/s3/ya.make
@@ -28,6 +28,7 @@ TEST_SRCS(
test_formats.py
test_inflight.py
test_insert.py
+ test_public_metrics.py
test_push_down.py
test_s3.py
test_size_limit.py