aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-12-07 01:35:49 +0300
committerhor911 <hor911@ydb.tech>2022-12-07 01:35:49 +0300
commitf00f6934361afa3ec43a0a0355082063a772367f (patch)
tree4400118076e359ee76da48d64f7206b175c6c1cf
parent5b2521a5d471e8eca93b51fa3c56b52caa7ce540 (diff)
downloadydb-f00f6934361afa3ec43a0a0355082063a772367f.tar.gz
Metering records on query finish
-rw-r--r--ydb/core/driver_lib/run/config.h1
-rw-r--r--ydb/core/metering/bill_record.h1
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp52
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/utils.cpp63
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/utils.h2
6 files changed, 99 insertions, 21 deletions
diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h
index 9863b0aefbf..a2fc8bc2f3c 100644
--- a/ydb/core/driver_lib/run/config.h
+++ b/ydb/core/driver_lib/run/config.h
@@ -90,6 +90,7 @@ union TBasicKikimrServicesMask {
EnableSecurityServices = true;
EnableYandexQuery = true;
EnableViewerService = true;
+ EnableMeteringWriter = true;
}
TBasicKikimrServicesMask() {
diff --git a/ydb/core/metering/bill_record.h b/ydb/core/metering/bill_record.h
index 2959f7f29bd..8edd622f51f 100644
--- a/ydb/core/metering/bill_record.h
+++ b/ydb/core/metering/bill_record.h
@@ -32,6 +32,7 @@ struct TBillRecord {
enum class EUnit {
RequestUnit /* "request_unit" */,
+ Byte /* "byte" */,
};
BILL_RECORD_FIELD(EType, Type);
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/CMakeLists.txt b/ydb/core/yq/libs/control_plane_storage/internal/CMakeLists.txt
index 0302cfa6aa7..ee878055f6f 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/CMakeLists.txt
+++ b/ydb/core/yq/libs/control_plane_storage/internal/CMakeLists.txt
@@ -18,6 +18,7 @@ target_link_libraries(libs-control_plane_storage-internal PUBLIC
cpp-lwtrace-mon
monlib-service-pages
ydb-core-base
+ ydb-core-metering
ydb-core-mon
yq-libs-common
yq-libs-config
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
index eb6056b4e8b..2fd62cd16ba 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
@@ -2,25 +2,25 @@
#include <util/datetime/base.h>
+#include <ydb/core/metering/metering.h>
+#include <ydb/core/yq/libs/control_plane_storage/util.h>
#include <ydb/core/yq/libs/db_schema/db_schema.h>
#include <ydb/library/protobuf_printer/size_printer.h>
#include <google/protobuf/util/time_util.h>
-namespace NYq {
-
-namespace {
+#include <util/system/hostname.h>
-bool IsFinishedStatus(YandexQuery::QueryMeta::ComputeStatus status) {
- return status == YandexQuery::QueryMeta::ABORTED_BY_SYSTEM
- || status == YandexQuery::QueryMeta::ABORTED_BY_USER
- || status == YandexQuery::QueryMeta::COMPLETED
- || status == YandexQuery::QueryMeta::FAILED;
-}
+namespace NYq {
-} // namespace
+struct TPingTaskParams {
+ TString Query;
+ TParams Params;
+ const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)> Prepare;
+ std::vector<TString> Metrics;
+};
-std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>> ConstructHardPingTask(
+TPingTaskParams ConstructHardPingTask(
const Fq::Private::PingTaskRequest& request, std::shared_ptr<Fq::Private::PingTaskResult> response,
const TString& tablePathPrefix, const TDuration& automaticQueriesTtl, const TDuration& taskLeaseTtl,
const THashMap<ui64, TRetryPolicyItem>& retryPolicies, ::NMonitoring::TDynamicCounterPtr rootCounters, uint64_t maxRequestSize) {
@@ -247,7 +247,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
*internal.mutable_disposition() = request.disposition();
}
- if (request.status() && IsFinishedStatus(request.status())) {
+ if (request.status() && IsTerminalStatus(request.status())) {
internal.clear_created_topic_consumers();
// internal.clear_dq_graph(); keep for debug
internal.clear_dq_graph_index();
@@ -374,10 +374,21 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
return std::make_pair(writeQuery.Sql, writeQuery.Params);
};
const auto readQuery = readQueryBuilder.Build();
- return std::make_tuple(readQuery.Sql, readQuery.Params, prepareParams);
+
+ std::vector<TString> meteringRecords;
+
+ if (IsTerminalStatus(request.status()) && request.statistics()) {
+ try {
+ meteringRecords = GetMeteringRecords(request.statistics(), request.query_id().value(), request.scope(), HostName());
+ } catch (yexception &e) {
+ CPS_LOG_E(e.what());
+ }
+ }
+
+ return {readQuery.Sql, readQuery.Params, prepareParams, meteringRecords};
}
-std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>> ConstructSoftPingTask(
+TPingTaskParams ConstructSoftPingTask(
const Fq::Private::PingTaskRequest& request, std::shared_ptr<Fq::Private::PingTaskResult> response,
const TString& tablePathPrefix, const TDuration& taskLeaseTtl) {
TSqlQueryBuilder readQueryBuilder(tablePathPrefix, "SoftPingTask(read)");
@@ -440,7 +451,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
return std::make_pair(writeQuery.Sql, writeQuery.Params);
};
const auto readQuery = readQueryBuilder.Build();
- return std::make_tuple(readQuery.Sql, readQuery.Params, prepareParams);
+ return {readQuery.Sql, readQuery.Params, prepareParams, std::vector<TString>{}};
}
void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskRequest::TPtr& ev)
@@ -487,12 +498,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq
auto pingTaskParams = DoesPingTaskUpdateQueriesTable(request) ?
ConstructHardPingTask(request, response, YdbConnection->TablePathPrefix, Config->AutomaticQueriesTtl, Config->TaskLeaseTtl, Config->RetryPolicies, Counters.Counters, Config->Proto.GetMaxRequestSize()) :
ConstructSoftPingTask(request, response, YdbConnection->TablePathPrefix, Config->TaskLeaseTtl);
- auto readQuery = std::get<0>(pingTaskParams); // Use std::get for win compiler
- auto readParams = std::get<1>(pingTaskParams);
- auto prepareParams = std::get<2>(pingTaskParams);
-
auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto result = ReadModifyWrite(readQuery, readParams, prepareParams, requestCounters, debugInfo);
+ auto result = ReadModifyWrite(pingTaskParams.Query, pingTaskParams.Params, pingTaskParams.Prepare, requestCounters, debugInfo);
auto prepare = [response] { return *response; };
auto success = SendResponse<TEvControlPlaneStorage::TEvPingTaskResponse, Fq::Private::PingTaskResult>(
"PingTaskRequest - PingTaskResult",
@@ -505,9 +512,12 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq
prepare,
debugInfo);
- success.Apply([=](const auto& future) {
+ success.Apply([=, actorSystem=NActors::TActivationContext::ActorSystem(), metrics=pingTaskParams.Metrics](const auto& future) {
TDuration delta = TInstant::Now() - startTime;
LWPROBE(PingTaskRequest, queryId, delta, future.GetValue());
+ for (const auto& metric : metrics) {
+ actorSystem->Send(NKikimr::NMetering::MakeMeteringServiceID(), new NKikimr::NMetering::TEvMetering::TEvWriteMeteringJson(metric));
+ }
});
}
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp
index f88fcd3268d..ab11f98f9ee 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp
@@ -1,5 +1,8 @@
#include "utils.h"
+#include <ydb/core/metering/bill_record.h>
+#include <ydb/core/metering/metering.h>
+
namespace NYq {
NYql::TIssues ValidateWriteResultData(const TString& resultId, const Ydb::ResultSet& resultSet, const TInstant& deadline, const TDuration& ttl)
@@ -102,4 +105,64 @@ NYql::TIssues ValidateCreateOrDeleteRateLimiterResource(const TString& queryId,
return issues;
}
+std::vector<TString> GetMeteringRecords(const TString& statistics, const TString& queryId, const TString& scope, const TString& sourceId) {
+
+ std::vector<TString> result;
+ NJson::TJsonReaderConfig jsonConfig;
+ NJson::TJsonValue stat;
+
+ if (NJson::ReadJsonTree(statistics, &jsonConfig, &stat)) {
+ ui64 ingress = 0;
+ ui64 egress = 0;
+ for (const auto& p : stat.GetMap()) {
+ if (p.first.StartsWith("Graph=") || p.first.StartsWith("Precompute=")) {
+ if (auto* ingressNode = p.second.GetValueByPath("TaskRunner.Stage=Total.IngressS3SourceBytes.count")) {
+ ingress += ingressNode->GetInteger();
+ }
+ if (auto* egressNode = p.second.GetValueByPath("TaskRunner.Stage=Total.EgressS3SinkBytes.count")) {
+ egress += egressNode->GetInteger();
+ }
+ }
+ }
+ if (ingress) {
+ auto now = Now();
+ result.emplace_back(TBillRecord()
+ .Id(queryId + "_osi")
+ .Schema("yq.object_storage.ingress")
+ .FolderId(TScope(scope).ParseFolder())
+ .SourceWt(now)
+ .SourceId(sourceId)
+ .Usage(TBillRecord::TUsage()
+ .Type(TBillRecord::TUsage::EType::Delta)
+ .Unit(TBillRecord::TUsage::EUnit::Byte)
+ .Quantity(ingress)
+ .Start(now)
+ .Finish(now)
+ )
+ .ToString()
+ );
+ }
+ if (egress) {
+ auto now = Now();
+ result.emplace_back(TBillRecord()
+ .Id(queryId + "_ose")
+ .Schema("yq.object_storage.egress")
+ .FolderId(TScope(scope).ParseFolder())
+ .SourceWt(now)
+ .SourceId(sourceId)
+ .Usage(TBillRecord::TUsage()
+ .Type(TBillRecord::TUsage::EType::Delta)
+ .Unit(TBillRecord::TUsage::EUnit::Byte)
+ .Quantity(egress)
+ .Start(now)
+ .Finish(now)
+ )
+ .ToString()
+ );
+ }
+ }
+
+ return result;
+}
+
};
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/utils.h b/ydb/core/yq/libs/control_plane_storage/internal/utils.h
index a82a6bbf734..98c645b5bb4 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/utils.h
+++ b/ydb/core/yq/libs/control_plane_storage/internal/utils.h
@@ -31,4 +31,6 @@ NYql::TIssues ValidateNodesHealthCheck(
NYql::TIssues ValidateCreateOrDeleteRateLimiterResource(const TString& queryId, const TString& scope, const TString& tenant, const TString& owner);
+std::vector<TString> GetMeteringRecords(const TString& statistics, const TString& queryId, const TString& scope, const TString& sourceId);
+
};