aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-09-30 17:40:51 +0300
committerhor911 <hor911@ydb.tech>2023-09-30 17:55:03 +0300
commit7e1c6cfccee26bdf938a0083896f78d92aa0db70 (patch)
tree0d6aeb2086d50ca6b181f8f47f78d85e27839ddb
parentd0df3655f8a3f6ffb9a6658f3918c40c5d28c6b1 (diff)
downloadydb-7e1c6cfccee26bdf938a0083896f78d92aa0db70.tar.gz
Metering for YQv2
-rw-r--r--ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp7
-rw-r--r--ydb/core/fq/libs/control_plane_storage/internal/utils.cpp35
2 files changed, 36 insertions, 6 deletions
diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp
index fea4de73a40..dce989fb15b 100644
--- a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp
@@ -446,7 +446,12 @@ TPingTaskParams ConstructHardPingTask(
<< FederatedQuery::QueryMeta::ComputeStatus_Name(request.status())
<< ", statusCode: " << NYql::NDqProto::StatusIds_StatusCode_Name(internal.status_code()));
}
- auto records = GetMeteringRecords(request.statistics(), isBillable, jobId, request.scope(), HostName());
+ auto statistics = request.statistics();
+ if (!statistics) {
+ // YQv2 may not provide statistics with terminal status, use saved one
+ statistics = query.statistics().json();
+ }
+ auto records = GetMeteringRecords(statistics, isBillable, jobId, request.scope(), HostName());
meteringRecords->swap(records);
} catch (const std::exception&) {
CPS_LOG_AS_E(*actorSystem, "Error on statistics meterification: " << CurrentExceptionMessage());
diff --git a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
index dcde6b65a70..f3c43159024 100644
--- a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
@@ -117,13 +117,38 @@ std::vector<TString> GetMeteringRecords(const TString& statistics, bool billable
NJson::TJsonValue stat;
ui64 ingress = 0;
-
if (billable) {
if (NJson::ReadJsonTree(statistics, &jsonConfig, &stat)) {
- for (const auto& p : stat.GetMap()) {
- if (p.first.StartsWith("Graph=") || p.first.StartsWith("Precompute=")) {
- if (auto* ingressNode = p.second.GetValueByPath("TaskRunner.Stage=Total.IngressS3SourceBytes.sum")) {
- ingress += ingressNode->GetInteger();
+ for (const auto& graph : stat.GetMapSafe()) {
+ //
+ // in v1 graphs are always named as name=index
+ //
+ // in v2 two cases are possible
+ // - ResultSet or Sink (i.e. name) when only one subgraph exists
+ // - name_index when there are 2+ graphs
+ //
+ // Precompute always implies other graph, so we can distinguish v1 vs v2 by '=' or '_' symbol
+ //
+ if (graph.first.StartsWith("Graph=") || graph.first.StartsWith("Precompute=")) {
+ // YQv1 raw
+ if (auto* ingressNode = graph.second.GetValueByPath("TaskRunner.Stage=Total.IngressS3SourceBytes.sum")) {
+ ingress += ingressNode->GetIntegerSafe();
+ }
+ } else if (graph.first.StartsWith("ResultSet") || graph.first.StartsWith("Sink") || graph.first.StartsWith("Precompute_")) {
+ // YQv2
+ if (auto* ingressNode = graph.second.GetValueByPath("IngressObjectStorageBytes.sum")) {
+ // prettyfied
+ ingress += ingressNode->GetIntegerSafe();
+ } else if (graph.second.GetType() == NJson::JSON_MAP) {
+ for (const auto& stage : graph.second.GetMapSafe()) {
+ if (auto* ingressNode = stage.second.GetValueByPath("IngressBytes=S3Source.sum")) {
+ // raw old
+ ingress += ingressNode->GetIntegerSafe();
+ } else if (auto* ingressNode = stage.second.GetValueByPath("IngressBytes=S3Source.Input.Bytes.sum")) {
+ // raw new
+ ingress += ingressNode->GetIntegerSafe();
+ }
+ }
}
}
}