diff options
author | hor911 <hor911@ydb.tech> | 2022-12-07 01:35:49 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-12-07 01:35:49 +0300 |
commit | f00f6934361afa3ec43a0a0355082063a772367f (patch) | |
tree | 4400118076e359ee76da48d64f7206b175c6c1cf | |
parent | 5b2521a5d471e8eca93b51fa3c56b52caa7ce540 (diff) | |
download | ydb-f00f6934361afa3ec43a0a0355082063a772367f.tar.gz |
Metering records on query finish
6 files changed, 99 insertions, 21 deletions
diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h index 9863b0aefbf..a2fc8bc2f3c 100644 --- a/ydb/core/driver_lib/run/config.h +++ b/ydb/core/driver_lib/run/config.h @@ -90,6 +90,7 @@ union TBasicKikimrServicesMask { EnableSecurityServices = true; EnableYandexQuery = true; EnableViewerService = true; + EnableMeteringWriter = true; } TBasicKikimrServicesMask() { diff --git a/ydb/core/metering/bill_record.h b/ydb/core/metering/bill_record.h index 2959f7f29bd..8edd622f51f 100644 --- a/ydb/core/metering/bill_record.h +++ b/ydb/core/metering/bill_record.h @@ -32,6 +32,7 @@ struct TBillRecord { enum class EUnit { RequestUnit /* "request_unit" */, + Byte /* "byte" */, }; BILL_RECORD_FIELD(EType, Type); diff --git a/ydb/core/yq/libs/control_plane_storage/internal/CMakeLists.txt b/ydb/core/yq/libs/control_plane_storage/internal/CMakeLists.txt index 0302cfa6aa7..ee878055f6f 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/CMakeLists.txt +++ b/ydb/core/yq/libs/control_plane_storage/internal/CMakeLists.txt @@ -18,6 +18,7 @@ target_link_libraries(libs-control_plane_storage-internal PUBLIC cpp-lwtrace-mon monlib-service-pages ydb-core-base + ydb-core-metering ydb-core-mon yq-libs-common yq-libs-config diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp index eb6056b4e8b..2fd62cd16ba 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp @@ -2,25 +2,25 @@ #include <util/datetime/base.h> +#include <ydb/core/metering/metering.h> +#include <ydb/core/yq/libs/control_plane_storage/util.h> #include <ydb/core/yq/libs/db_schema/db_schema.h> #include <ydb/library/protobuf_printer/size_printer.h> #include <google/protobuf/util/time_util.h> -namespace NYq { - -namespace { +#include <util/system/hostname.h> -bool IsFinishedStatus(YandexQuery::QueryMeta::ComputeStatus status) { - return status == YandexQuery::QueryMeta::ABORTED_BY_SYSTEM - || status == YandexQuery::QueryMeta::ABORTED_BY_USER - || status == YandexQuery::QueryMeta::COMPLETED - || status == YandexQuery::QueryMeta::FAILED; -} +namespace NYq { -} // namespace +struct TPingTaskParams { + TString Query; + TParams Params; + const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)> Prepare; + std::vector<TString> Metrics; +}; -std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>> ConstructHardPingTask( +TPingTaskParams ConstructHardPingTask( const Fq::Private::PingTaskRequest& request, std::shared_ptr<Fq::Private::PingTaskResult> response, const TString& tablePathPrefix, const TDuration& automaticQueriesTtl, const TDuration& taskLeaseTtl, const THashMap<ui64, TRetryPolicyItem>& retryPolicies, ::NMonitoring::TDynamicCounterPtr rootCounters, uint64_t maxRequestSize) { @@ -247,7 +247,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam *internal.mutable_disposition() = request.disposition(); } - if (request.status() && IsFinishedStatus(request.status())) { + if (request.status() && IsTerminalStatus(request.status())) { internal.clear_created_topic_consumers(); // internal.clear_dq_graph(); keep for debug internal.clear_dq_graph_index(); @@ -374,10 +374,21 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam return std::make_pair(writeQuery.Sql, writeQuery.Params); }; const auto readQuery = readQueryBuilder.Build(); - return std::make_tuple(readQuery.Sql, readQuery.Params, prepareParams); + + std::vector<TString> meteringRecords; + + if (IsTerminalStatus(request.status()) && request.statistics()) { + try { + meteringRecords = GetMeteringRecords(request.statistics(), request.query_id().value(), request.scope(), HostName()); + } catch (yexception &e) { + CPS_LOG_E(e.what()); + } + } + + return {readQuery.Sql, readQuery.Params, prepareParams, meteringRecords}; } -std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>> ConstructSoftPingTask( +TPingTaskParams ConstructSoftPingTask( const Fq::Private::PingTaskRequest& request, std::shared_ptr<Fq::Private::PingTaskResult> response, const TString& tablePathPrefix, const TDuration& taskLeaseTtl) { TSqlQueryBuilder readQueryBuilder(tablePathPrefix, "SoftPingTask(read)"); @@ -440,7 +451,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam return std::make_pair(writeQuery.Sql, writeQuery.Params); }; const auto readQuery = readQueryBuilder.Build(); - return std::make_tuple(readQuery.Sql, readQuery.Params, prepareParams); + return {readQuery.Sql, readQuery.Params, prepareParams, std::vector<TString>{}}; } void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskRequest::TPtr& ev) @@ -487,12 +498,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq auto pingTaskParams = DoesPingTaskUpdateQueriesTable(request) ? ConstructHardPingTask(request, response, YdbConnection->TablePathPrefix, Config->AutomaticQueriesTtl, Config->TaskLeaseTtl, Config->RetryPolicies, Counters.Counters, Config->Proto.GetMaxRequestSize()) : ConstructSoftPingTask(request, response, YdbConnection->TablePathPrefix, Config->TaskLeaseTtl); - auto readQuery = std::get<0>(pingTaskParams); // Use std::get for win compiler - auto readParams = std::get<1>(pingTaskParams); - auto prepareParams = std::get<2>(pingTaskParams); - auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto result = ReadModifyWrite(readQuery, readParams, prepareParams, requestCounters, debugInfo); + auto result = ReadModifyWrite(pingTaskParams.Query, pingTaskParams.Params, pingTaskParams.Prepare, requestCounters, debugInfo); auto prepare = [response] { return *response; }; auto success = SendResponse<TEvControlPlaneStorage::TEvPingTaskResponse, Fq::Private::PingTaskResult>( "PingTaskRequest - PingTaskResult", @@ -505,9 +512,12 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq prepare, debugInfo); - success.Apply([=](const auto& future) { + success.Apply([=, actorSystem=NActors::TActivationContext::ActorSystem(), metrics=pingTaskParams.Metrics](const auto& future) { TDuration delta = TInstant::Now() - startTime; LWPROBE(PingTaskRequest, queryId, delta, future.GetValue()); + for (const auto& metric : metrics) { + actorSystem->Send(NKikimr::NMetering::MakeMeteringServiceID(), new NKikimr::NMetering::TEvMetering::TEvWriteMeteringJson(metric)); + } }); } diff --git a/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp index f88fcd3268d..ab11f98f9ee 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp @@ -1,5 +1,8 @@ #include "utils.h" +#include <ydb/core/metering/bill_record.h> +#include <ydb/core/metering/metering.h> + namespace NYq { NYql::TIssues ValidateWriteResultData(const TString& resultId, const Ydb::ResultSet& resultSet, const TInstant& deadline, const TDuration& ttl) @@ -102,4 +105,64 @@ NYql::TIssues ValidateCreateOrDeleteRateLimiterResource(const TString& queryId, return issues; } +std::vector<TString> GetMeteringRecords(const TString& statistics, const TString& queryId, const TString& scope, const TString& sourceId) { + + std::vector<TString> result; + NJson::TJsonReaderConfig jsonConfig; + NJson::TJsonValue stat; + + if (NJson::ReadJsonTree(statistics, &jsonConfig, &stat)) { + ui64 ingress = 0; + ui64 egress = 0; + for (const auto& p : stat.GetMap()) { + if (p.first.StartsWith("Graph=") || p.first.StartsWith("Precompute=")) { + if (auto* ingressNode = p.second.GetValueByPath("TaskRunner.Stage=Total.IngressS3SourceBytes.count")) { + ingress += ingressNode->GetInteger(); + } + if (auto* egressNode = p.second.GetValueByPath("TaskRunner.Stage=Total.EgressS3SinkBytes.count")) { + egress += egressNode->GetInteger(); + } + } + } + if (ingress) { + auto now = Now(); + result.emplace_back(TBillRecord() + .Id(queryId + "_osi") + .Schema("yq.object_storage.ingress") + .FolderId(TScope(scope).ParseFolder()) + .SourceWt(now) + .SourceId(sourceId) + .Usage(TBillRecord::TUsage() + .Type(TBillRecord::TUsage::EType::Delta) + .Unit(TBillRecord::TUsage::EUnit::Byte) + .Quantity(ingress) + .Start(now) + .Finish(now) + ) + .ToString() + ); + } + if (egress) { + auto now = Now(); + result.emplace_back(TBillRecord() + .Id(queryId + "_ose") + .Schema("yq.object_storage.egress") + .FolderId(TScope(scope).ParseFolder()) + .SourceWt(now) + .SourceId(sourceId) + .Usage(TBillRecord::TUsage() + .Type(TBillRecord::TUsage::EType::Delta) + .Unit(TBillRecord::TUsage::EUnit::Byte) + .Quantity(egress) + .Start(now) + .Finish(now) + ) + .ToString() + ); + } + } + + return result; +} + }; diff --git a/ydb/core/yq/libs/control_plane_storage/internal/utils.h b/ydb/core/yq/libs/control_plane_storage/internal/utils.h index a82a6bbf734..98c645b5bb4 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/utils.h +++ b/ydb/core/yq/libs/control_plane_storage/internal/utils.h @@ -31,4 +31,6 @@ NYql::TIssues ValidateNodesHealthCheck( NYql::TIssues ValidateCreateOrDeleteRateLimiterResource(const TString& queryId, const TString& scope, const TString& tenant, const TString& owner); +std::vector<TString> GetMeteringRecords(const TString& statistics, const TString& queryId, const TString& scope, const TString& sourceId); + }; |