diff options
author | hcpp <hcpp@ydb.tech> | 2023-12-07 12:17:15 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-12-07 13:06:06 +0300 |
commit | 9a6139f092a89bfc1685550d6560c4f79997eae2 (patch) | |
tree | 2e8fd3348605859eb5a38c37d40be7023b9ebbd4 | |
parent | 9a56672d465a1b3ba59eb93cc37eadab9440975b (diff) | |
download | ydb-9a6139f092a89bfc1685550d6560c4f79997eae2.tar.gz |
public_metrics
public metrics
connector ua has been fixed
-rw-r--r-- | ydb/core/fq/libs/compute/common/utils.cpp | 80 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/common/utils.h | 12 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/base_compute_actor.h | 11 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp | 41 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/tests/fq/s3/test_public_metrics.py | 82 | ||||
-rw-r--r-- | ydb/tests/fq/s3/ya.make | 1 |
7 files changed, 227 insertions, 2 deletions
diff --git a/ydb/core/fq/libs/compute/common/utils.cpp b/ydb/core/fq/libs/compute/common/utils.cpp index ac14d4a078..2376d19b22 100644 --- a/ydb/core/fq/libs/compute/common/utils.cpp +++ b/ydb/core/fq/libs/compute/common/utils.cpp @@ -643,4 +643,84 @@ TString GetV1StatFromV2PlanV2(const TString& plan) { return NJson2Yson::ConvertYson2Json(out.Str()); } +std::optional<int> GetValue(const NJson::TJsonValue& node, const TString& name) { + if (auto* keyNode = node.GetValueByPath(name)) { + auto result = keyNode->GetInteger(); + if (result) { + return result; + } + } + return {}; +} + +void AggregateNode(const NJson::TJsonValue& node, const TString& name, ui64& sum) { + if (node.GetType() == NJson::JSON_MAP) { + if (auto* subNode = node.GetValueByPath(name)) { + if (auto* keyNode = subNode->GetValueByPath("count")) { + auto nodeCount = keyNode->GetInteger(); + if (nodeCount) { + if (auto* keyNode = subNode->GetValueByPath("sum")) { + sum += keyNode->GetInteger(); + } + } + } + } + } +} + +std::optional<int> GetNodeValue(const NJson::TJsonValue& node, const TString& name, bool aggregate = false) { + if (aggregate) { + ui64 sum = 0; + if (node.GetType() == NJson::JSON_MAP) { + for (const auto& p : node.GetMap()) { + AggregateNode(p.second, name, sum); + } + } + if (sum) { + return sum; + } + return {}; + } + if (auto* subNode = node.GetValueByPath(name)) { + return GetValue(*subNode, "sum"); + } + return {}; +} + +std::optional<int> Sum(const std::optional<int>& a, const std::optional<int>& b) { + if (!a) { + return b; + } + + if (!b) { + return a; + } + + return *a + *b; +} + +TPublicStat GetPublicStat(const TString& statistics) { + TPublicStat counters; + NJson::TJsonReaderConfig jsonConfig; + NJson::TJsonValue stat; + if (NJson::ReadJsonTree(statistics, &jsonConfig, &stat)) { + + // EXP + if (stat.GetValueByPath("Columns")) { + return counters; + } + + for (const auto& p : stat.GetMap()) { + counters.MemoryUsageBytes = Sum(counters.MemoryUsageBytes, GetNodeValue(p.second, "MaxMemoryUsage")); + counters.CpuUsageUs = Sum(counters.CpuUsageUs, GetNodeValue(p.second, "CpuTimeUs")); + counters.InputBytes = Sum(counters.InputBytes, GetNodeValue(p.second, "InputBytes")); + counters.OutputBytes = Sum(counters.OutputBytes, GetNodeValue(p.second, "OutputBytes")); + counters.SourceInputRecords = Sum(counters.SourceInputRecords, GetNodeValue(p.second, "InputRows")); + counters.SinkOutputRecords = Sum(counters.SinkOutputRecords, GetNodeValue(p.second, "OutputRows")); + counters.RunningTasks = Sum(counters.RunningTasks, GetNodeValue(p.second, "TotalTasks", true)); + } + } + return counters; +} + } // namespace NFq diff --git a/ydb/core/fq/libs/compute/common/utils.h b/ydb/core/fq/libs/compute/common/utils.h index dae8624eaf..d2fad6d01f 100644 --- a/ydb/core/fq/libs/compute/common/utils.h +++ b/ydb/core/fq/libs/compute/common/utils.h @@ -31,4 +31,16 @@ TString FormatDurationMs(ui64 durationMs); TString FormatDurationUs(ui64 durationUs); TString FormatInstant(TInstant instant); +struct TPublicStat { + std::optional<int> MemoryUsageBytes = 0; + std::optional<int> CpuUsageUs = 0; + std::optional<int> InputBytes = 0; + std::optional<int> OutputBytes = 0; + std::optional<int> SourceInputRecords = 0; + std::optional<int> SinkOutputRecords = 0; + std::optional<int> RunningTasks = 0; +}; + +TPublicStat GetPublicStat(const TString& statistics); + } // namespace NFq diff --git a/ydb/core/fq/libs/compute/ydb/base_compute_actor.h b/ydb/core/fq/libs/compute/ydb/base_compute_actor.h index 6bbebcaef0..420eedbeb3 100644 --- a/ydb/core/fq/libs/compute/ydb/base_compute_actor.h +++ b/ydb/core/fq/libs/compute/ydb/base_compute_actor.h @@ -20,13 +20,15 @@ public: using TBase::PassAway; TBaseComputeActor(const ::NYql::NCommon::TServiceCounters& queryCounters, const TString& stepName) - : BaseCounters(queryCounters.Counters) + : PublicCounters(queryCounters.PublicCounters) + , BaseCounters(queryCounters.Counters) , Counters(MakeIntrusive<TComputeRequestCounters>("Total", queryCounters.Counters->GetSubgroup("step", stepName))) , TotalStartTime(TInstant::Now()) {} TBaseComputeActor(const ::NMonitoring::TDynamicCounterPtr& baseCounters, const TString& stepName) - : BaseCounters(baseCounters) + : PublicCounters(MakeIntrusive<::NMonitoring::TDynamicCounters>()) + , BaseCounters(baseCounters) , Counters(MakeIntrusive<TComputeRequestCounters>("Total", baseCounters->GetSubgroup("step", stepName))) , TotalStartTime(TInstant::Now()) {} @@ -64,7 +66,12 @@ public: return BaseCounters; } + ::NMonitoring::TDynamicCounterPtr GetPublicCounters() const { + return PublicCounters; + } + private: + ::NMonitoring::TDynamicCounterPtr PublicCounters; ::NMonitoring::TDynamicCounterPtr BaseCounters; TComputeRequestCountersPtr Counters; TInstant TotalStartTime; diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp index ef628906ce..f21c7e8982 100644 --- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp @@ -124,6 +124,7 @@ public: return; } + ReportPublicCounters(response.QueryStats); StartTime = TInstant::Now(); LOG_D("Execution status: " << static_cast<int>(response.ExecStatus)); switch (response.ExecStatus) { @@ -153,6 +154,46 @@ public: } } + void ReportPublicCounters(const Ydb::TableStats::QueryStats& stats) { + auto stat = GetPublicStat(GetV1StatFromV2Plan(stats.query_plan())); + auto publicCounters = GetPublicCounters(); + + if (stat.MemoryUsageBytes) { + auto& counter = *publicCounters->GetNamedCounter("name", "query.memory_usage_bytes"); + counter = *stat.MemoryUsageBytes; + } + + if (stat.CpuUsageUs) { + auto& counter = *publicCounters->GetNamedCounter("name", "query.cpu_usage_us", true); + counter = *stat.CpuUsageUs; + } + + if (stat.InputBytes) { + auto& counter = *publicCounters->GetNamedCounter("name", "query.input_bytes", true); + counter = *stat.InputBytes; + } + + if (stat.OutputBytes) { + auto& counter = *publicCounters->GetNamedCounter("name", "query.output_bytes", true); + counter = *stat.OutputBytes; + } + + if (stat.SourceInputRecords) { + auto& counter = *publicCounters->GetNamedCounter("name", "query.source_input_records", true); + counter = *stat.SourceInputRecords; + } + + if (stat.SinkOutputRecords) { + auto& counter = *publicCounters->GetNamedCounter("name", "query.sink_output_records", true); + counter = *stat.SinkOutputRecords; + } + + if (stat.RunningTasks) { + auto& counter = *publicCounters->GetNamedCounter("name", "query.running_tasks"); + counter = *stat.RunningTasks; + } + } + void SendGetOperation(const TDuration& delay = TDuration::Zero()) { Register(new TRetryActor<TEvYdbCompute::TEvGetOperationRequest, TEvYdbCompute::TEvGetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_GET_OPERATION), delay, SelfId(), Connector, OperationId)); } diff --git a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp index 6c7d1a6cb6..2988fbc942 100644 --- a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp @@ -197,6 +197,7 @@ public: } void ResignAndPassAway(const NYql::TIssues& issues) { + Send(FetcherId, new NActors::TEvents::TEvPoisonTaken()); Fq::Private::PingTaskRequest pingTaskRequest; NYql::IssuesToMessage(issues, pingTaskRequest.mutable_transient_issues()); pingTaskRequest.set_resign_query(true); @@ -208,6 +209,7 @@ public: } void FinishAndPassAway() { + Send(FetcherId, new NActors::TEvents::TEvPoisonTaken()); Send(Connector, new NActors::TEvents::TEvPoisonPill()); PassAway(); } diff --git a/ydb/tests/fq/s3/test_public_metrics.py b/ydb/tests/fq/s3/test_public_metrics.py new file mode 100644 index 0000000000..cefe06147e --- /dev/null +++ b/ydb/tests/fq/s3/test_public_metrics.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import boto3 +import pytest + +from ydb.tests.tools.fq_runner.kikimr_utils import yq_all + +import ydb.public.api.protos.draft.fq_pb2 as fq + + +class TestPublicMetrics: + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_public_metrics(self, kikimr, s3, client, yq_version): + resource = boto3.resource( + "s3", + endpoint_url=s3.s3_url, + aws_access_key_id="key", + aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("fbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + s3_client = boto3.client( + "s3", + endpoint_url=s3.s3_url, + aws_access_key_id="key", + aws_secret_access_key="secret_key" + ) + + fruits = R'''Fruit,Price,Weight +Banana,3,100 +Apple,2,22 +Pear,15,33''' + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.csv', ContentType='text/plain') + kikimr.control_plane.wait_bootstrap(1) + client.create_storage_connection("fruitbucket", "fbucket") + + sql = R''' + SELECT * + FROM fruitbucket.`fruits.csv` + WITH (format=csv_with_names, SCHEMA ( + Fruit String NOT NULL, + Price Int NOT NULL, + Weight Int NOT NULL + )); + ''' + + cloud_id = "mock_cloud" + folder_id = "my_folder" + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + metrics = kikimr.compute_plane.get_sensors(1, "yq_public") if yq_version == "v1" else kikimr.control_plane.get_sensors(1, "yq_public") + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.running_tasks"}) >= 0 + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.cpu_usage_us"}) >= 0 + assert metrics.find_sensor({"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, + "name": "query.memory_usage_bytes"}) > 0 + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.input_bytes"}) > 0 + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.uptime_seconds"}) >= 0 + if yq_version == "v1": + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.output_bytes"}) is None + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.source_input_records"}) is None + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.sink_output_records"}) is None + else: + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.output_bytes"}) > 0 + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.source_input_records"}) > 0 + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.sink_output_records"}) > 0 diff --git a/ydb/tests/fq/s3/ya.make b/ydb/tests/fq/s3/ya.make index 4b1eca093e..54bd7ac3dc 100644 --- a/ydb/tests/fq/s3/ya.make +++ b/ydb/tests/fq/s3/ya.make @@ -28,6 +28,7 @@ TEST_SRCS( test_formats.py test_inflight.py test_insert.py + test_public_metrics.py test_push_down.py test_s3.py test_size_limit.py |