diff options
author | hor911 <hor911@ydb.tech> | 2023-09-30 17:40:51 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-09-30 17:55:03 +0300 |
commit | 7e1c6cfccee26bdf938a0083896f78d92aa0db70 (patch) | |
tree | 0d6aeb2086d50ca6b181f8f47f78d85e27839ddb | |
parent | d0df3655f8a3f6ffb9a6658f3918c40c5d28c6b1 (diff) | |
download | ydb-7e1c6cfccee26bdf938a0083896f78d92aa0db70.tar.gz |
Metering for YQv2
-rw-r--r-- | ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp | 7 | ||||
-rw-r--r-- | ydb/core/fq/libs/control_plane_storage/internal/utils.cpp | 35 |
2 files changed, 36 insertions, 6 deletions
diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp index fea4de73a40..dce989fb15b 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp @@ -446,7 +446,12 @@ TPingTaskParams ConstructHardPingTask( << FederatedQuery::QueryMeta::ComputeStatus_Name(request.status()) << ", statusCode: " << NYql::NDqProto::StatusIds_StatusCode_Name(internal.status_code())); } - auto records = GetMeteringRecords(request.statistics(), isBillable, jobId, request.scope(), HostName()); + auto statistics = request.statistics(); + if (!statistics) { + // YQv2 may not provide statistics with terminal status, use saved one + statistics = query.statistics().json(); + } + auto records = GetMeteringRecords(statistics, isBillable, jobId, request.scope(), HostName()); meteringRecords->swap(records); } catch (const std::exception&) { CPS_LOG_AS_E(*actorSystem, "Error on statistics meterification: " << CurrentExceptionMessage()); diff --git a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp index dcde6b65a70..f3c43159024 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp @@ -117,13 +117,38 @@ std::vector<TString> GetMeteringRecords(const TString& statistics, bool billable NJson::TJsonValue stat; ui64 ingress = 0; - if (billable) { if (NJson::ReadJsonTree(statistics, &jsonConfig, &stat)) { - for (const auto& p : stat.GetMap()) { - if (p.first.StartsWith("Graph=") || p.first.StartsWith("Precompute=")) { - if (auto* ingressNode = p.second.GetValueByPath("TaskRunner.Stage=Total.IngressS3SourceBytes.sum")) { - ingress += ingressNode->GetInteger(); + for (const auto& graph : stat.GetMapSafe()) { + // + // in v1 graphs are always named as name=index + // + // in v2 two cases are possible + // - ResultSet or Sink (i.e. name) when only one subgraph exists + // - name_index when there are 2+ graphs + // + // Precompute always implies other graph, so we can distinguish v1 vs v2 by '=' or '_' symbol + // + if (graph.first.StartsWith("Graph=") || graph.first.StartsWith("Precompute=")) { + // YQv1 raw + if (auto* ingressNode = graph.second.GetValueByPath("TaskRunner.Stage=Total.IngressS3SourceBytes.sum")) { + ingress += ingressNode->GetIntegerSafe(); + } + } else if (graph.first.StartsWith("ResultSet") || graph.first.StartsWith("Sink") || graph.first.StartsWith("Precompute_")) { + // YQv2 + if (auto* ingressNode = graph.second.GetValueByPath("IngressObjectStorageBytes.sum")) { + // prettyfied + ingress += ingressNode->GetIntegerSafe(); + } else if (graph.second.GetType() == NJson::JSON_MAP) { + for (const auto& stage : graph.second.GetMapSafe()) { + if (auto* ingressNode = stage.second.GetValueByPath("IngressBytes=S3Source.sum")) { + // raw old + ingress += ingressNode->GetIntegerSafe(); + } else if (auto* ingressNode = stage.second.GetValueByPath("IngressBytes=S3Source.Input.Bytes.sum")) { + // raw new + ingress += ingressNode->GetIntegerSafe(); + } + } } } } |