diff options
author | hor911 <[email protected]> | 2023-05-11 17:43:27 +0300 |
---|---|---|
committer | hor911 <[email protected]> | 2023-05-11 17:43:27 +0300 |
commit | f92d86b7c6a95421b7ba1796565b3e4c2780cf34 (patch) | |
tree | 339f5a52bab5eced447dbec2593622ad10842174 | |
parent | 35bb1cc919fd91eb1bd4981766811e7b09ecac47 (diff) |
Tune Billing for Ingress
9 files changed, 92 insertions, 64 deletions
diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index 6b9ebafff7b..669e09ec837 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -1649,7 +1649,9 @@ private: FinalQueryStatus = status; QueryStateUpdateRequest.set_status(FinalQueryStatus); // Can be changed later. - QueryStateUpdateRequest.set_status_code(NYql::NDqProto::StatusIds::SUCCESS); + if (FinalQueryStatus == FederatedQuery::QueryMeta::COMPLETED && QueryStateUpdateRequest.status_code() == NYql::NDqProto::StatusIds::UNSPECIFIED) { + QueryStateUpdateRequest.set_status_code(NYql::NDqProto::StatusIds::SUCCESS); + } *QueryStateUpdateRequest.mutable_finished_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds()); Become(&TRunActor::StateFuncWrapper<&TRunActor::FinishStateFunc>); diff --git a/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h b/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h index 790886c1aed..ef01e0deae0 100644 --- a/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h +++ b/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h @@ -8,6 +8,8 @@ #include <ydb/core/fq/libs/shared_resources/shared_resources.h> #include <ydb/core/fq/libs/actors/logging/log.h> +#define CPS_LOG_N(s) \ + LOG_YQ_CONTROL_PLANE_STORAGE_NOTICE(s) #define CPS_LOG_D(s) \ LOG_YQ_CONTROL_PLANE_STORAGE_DEBUG(s) #define CPS_LOG_I(s) \ @@ -20,6 +22,8 @@ LOG_YQ_CONTROL_PLANE_STORAGE_TRACE(s) +#define CPS_LOG_AS_N(a, s) \ + LOG_YQ_CONTROL_PLANE_STORAGE_AS_NOTICE(a, s) #define CPS_LOG_AS_D(a, s) \ LOG_YQ_CONTROL_PLANE_STORAGE_AS_DEBUG(a, s) #define CPS_LOG_AS_I(a, s) \ diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp index 8510b37576c..b4b499d6d9b 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp @@ -17,7 +17,7 @@ struct TPingTaskParams { TString Query; TParams Params; const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)> Prepare; - std::vector<TString> Metrics; + std::shared_ptr<std::vector<TString>> MeteringRecords; }; TPingTaskParams ConstructHardPingTask( @@ -44,6 +44,8 @@ TPingTaskParams ConstructHardPingTask( "FROM `" PENDING_SMALL_TABLE_NAME "` WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" ); + auto meteringRecords = std::make_shared<std::vector<TString>>(); + auto prepareParams = [=, counters=counters, actorSystem = NActors::TActivationContext::ActorSystem()](const TVector<TResultSet>& resultSets) { TString jobId; FederatedQuery::Query query; @@ -166,6 +168,10 @@ TPingTaskParams ConstructHardPingTask( job.mutable_query_meta()->set_status(*queryStatus); } + if (request.status_code() != NYql::NDqProto::StatusIds::UNSPECIFIED) { + internal.set_status_code(request.status_code()); + } + if (issues) { NYql::IssuesToMessage(*issues, query.mutable_issue()); NYql::IssuesToMessage(*issues, job.mutable_issue()); @@ -389,20 +395,26 @@ TPingTaskParams ConstructHardPingTask( *response->mutable_expired_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(ttl.MilliSeconds()); } const auto writeQuery = writeQueryBuilder.Build(); + + if (IsTerminalStatus(request.status())) { + try { + auto isBillable = IsBillablelStatus(request.status(), internal.status_code()); + if (!isBillable) { + CPS_LOG_AS_N(*actorSystem, "Query " << request.query_id().value() << " is NOT billable, status: " + << FederatedQuery::QueryMeta::ComputeStatus_Name(request.status()) + << ", statusCode: " << NYql::NDqProto::StatusIds_StatusCode_Name(internal.status_code())); + } + auto records = GetMeteringRecords(request.statistics(), isBillable, jobId, request.scope(), HostName()); + meteringRecords->swap(records); + } catch (const std::exception&) { + CPS_LOG_AS_E(*actorSystem, "Error on statistics meterification: " << CurrentExceptionMessage()); + } + } + return std::make_pair(writeQuery.Sql, writeQuery.Params); }; const auto readQuery = readQueryBuilder.Build(); - std::vector<TString> meteringRecords; - - if (IsTerminalStatus(request.status()) && request.statistics()) { - try { - meteringRecords = GetMeteringRecords(request.statistics(), request.query_id().value(), request.scope(), HostName()); - } catch (const std::exception&) { - CPS_LOG_E("Error on statistics meterification: " << CurrentExceptionMessage()); - } - } - return {readQuery.Sql, readQuery.Params, prepareParams, meteringRecords}; } @@ -469,7 +481,7 @@ TPingTaskParams ConstructSoftPingTask( return std::make_pair(writeQuery.Sql, writeQuery.Params); }; const auto readQuery = readQueryBuilder.Build(); - return {readQuery.Sql, readQuery.Params, prepareParams, std::vector<TString>{}}; + return {readQuery.Sql, readQuery.Params, prepareParams, std::shared_ptr<std::vector<TString>>{}}; } void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskRequest::TPtr& ev) @@ -530,11 +542,13 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq prepare, debugInfo); - success.Apply([=, actorSystem=NActors::TActivationContext::ActorSystem(), metrics=pingTaskParams.Metrics](const auto& future) { + success.Apply([=, actorSystem=NActors::TActivationContext::ActorSystem(), meteringRecords=pingTaskParams.MeteringRecords](const auto& future) { TDuration delta = TInstant::Now() - startTime; LWPROBE(PingTaskRequest, queryId, delta, future.GetValue()); - for (const auto& metric : metrics) { - actorSystem->Send(NKikimr::NMetering::MakeMeteringServiceID(), new NKikimr::NMetering::TEvMetering::TEvWriteMeteringJson(metric)); + if (meteringRecords) { + for (const auto& metric : *meteringRecords) { + actorSystem->Send(NKikimr::NMetering::MakeMeteringServiceID(), new NKikimr::NMetering::TEvMetering::TEvWriteMeteringJson(metric)); + } } }); } diff --git a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp index ad85a781179..025e3941e8a 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp @@ -110,61 +110,48 @@ NYql::TIssues ValidateCreateOrDeleteRateLimiterResource(const TString& queryId, return issues; } -std::vector<TString> GetMeteringRecords(const TString& statistics, const TString& queryId, const TString& scope, const TString& sourceId) { +std::vector<TString> GetMeteringRecords(const TString& statistics, bool billable, const TString& jobId, const TString& scope, const TString& sourceId) { std::vector<TString> result; NJson::TJsonReaderConfig jsonConfig; NJson::TJsonValue stat; - if (NJson::ReadJsonTree(statistics, &jsonConfig, &stat)) { - ui64 ingress = 0; - ui64 egress = 0; - for (const auto& p : stat.GetMap()) { - if (p.first.StartsWith("Graph=") || p.first.StartsWith("Precompute=")) { - if (auto* ingressNode = p.second.GetValueByPath("TaskRunner.Stage=Total.IngressS3SourceBytes.count")) { - ingress += ingressNode->GetInteger(); - } - if (auto* egressNode = p.second.GetValueByPath("TaskRunner.Stage=Total.EgressS3SinkBytes.count")) { - egress += egressNode->GetInteger(); + ui64 ingress = 0; + + if (billable) { + if (NJson::ReadJsonTree(statistics, &jsonConfig, &stat)) { + for (const auto& p : stat.GetMap()) { + if (p.first.StartsWith("Graph=") || p.first.StartsWith("Precompute=")) { + if (auto* ingressNode = p.second.GetValueByPath("TaskRunner.Stage=Total.IngressS3SourceBytes.count")) { + ingress += ingressNode->GetInteger(); + } } } } - if (ingress) { - auto now = Now(); - result.emplace_back(TBillRecord() - .Id(queryId + "_osi") - .Schema("yq.object_storage.ingress") - .FolderId(TScope(scope).ParseFolder()) - .SourceWt(now) - .SourceId(sourceId) - .Usage(TBillRecord::TUsage() - .Type(TBillRecord::TUsage::EType::Delta) - .Unit(TBillRecord::TUsage::EUnit::Byte) - .Quantity(ingress) - .Start(now) - .Finish(now) - ) - .ToString() - ); - } - if (egress) { - auto now = Now(); - result.emplace_back(TBillRecord() - .Id(queryId + "_ose") - .Schema("yq.object_storage.egress") - .FolderId(TScope(scope).ParseFolder()) - .SourceWt(now) - .SourceId(sourceId) - .Usage(TBillRecord::TUsage() - .Type(TBillRecord::TUsage::EType::Delta) - .Unit(TBillRecord::TUsage::EUnit::Byte) - .Quantity(egress) - .Start(now) - .Finish(now) - ) - .ToString() - ); + } + + if (ingress) { + auto ingressMBytes = (ingress + 1_MB - 1) >> 20; // round up to 1 MB boundary + if (ingressMBytes < 10) { + ingressMBytes = 10; } + + auto now = Now(); + result.emplace_back(TBillRecord() + .Id(jobId + "_i") + .Schema("yq.ingress.mbytes") + .FolderId(TScope(scope).ParseFolder()) + .SourceWt(now) + .SourceId(sourceId) + .Usage(TBillRecord::TUsage() + .Type(TBillRecord::TUsage::EType::Delta) + .Unit(TBillRecord::TUsage::EUnit::MByte) + .Quantity(ingressMBytes) + .Start(now) + .Finish(now) + ) + .ToString() + ); } return result; diff --git a/ydb/core/fq/libs/control_plane_storage/internal/utils.h b/ydb/core/fq/libs/control_plane_storage/internal/utils.h index 77439bfaab8..6e8a226d92b 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/utils.h +++ b/ydb/core/fq/libs/control_plane_storage/internal/utils.h @@ -32,7 +32,7 @@ NYql::TIssues ValidateNodesHealthCheck( NYql::TIssues ValidateCreateOrDeleteRateLimiterResource(const TString& queryId, const TString& scope, const TString& tenant, const TString& owner); -std::vector<TString> GetMeteringRecords(const TString& statistics, const TString& queryId, const TString& scope, const TString& sourceId); +std::vector<TString> GetMeteringRecords(const TString& statistics, bool billable, const TString& jobId, const TString& scope, const TString& sourceId); TString GetPrettyStatistics(const TString& statistics); }; diff --git a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto index 24a22049850..dcb545065e1 100644 --- a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto +++ b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto @@ -6,6 +6,7 @@ option java_package = "com.yandex.query.internal"; option java_outer_classname = "YandexQueryInternalProtos"; import "ydb/library/yql/providers/dq/api/protos/service.proto"; +import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto"; import "ydb/library/yql/dq/proto/dq_tasks.proto"; import "ydb/public/api/protos/ydb_issue_message.proto"; import "ydb/core/fq/libs/protos/fq_private.proto"; @@ -40,6 +41,7 @@ message QueryInternal { repeated Fq.Private.CompressedData dq_graph_compressed = 20; Fq.Private.TaskResources resources = 21; repeated Ydb.Issue.IssueMessage internal_issue = 22; + NYql.NDqProto.StatusIds.StatusCode status_code = 23; } message JobInternal { diff --git a/ydb/core/fq/libs/control_plane_storage/util.cpp b/ydb/core/fq/libs/control_plane_storage/util.cpp index 20e8cb21bf1..61eaa149bfc 100644 --- a/ydb/core/fq/libs/control_plane_storage/util.cpp +++ b/ydb/core/fq/libs/control_plane_storage/util.cpp @@ -47,6 +47,21 @@ bool IsAbortedStatus(FederatedQuery::QueryMeta::ComputeStatus status) return IsIn({ FederatedQuery::QueryMeta::ABORTED_BY_USER, FederatedQuery::QueryMeta::ABORTED_BY_SYSTEM }, status); } +bool IsBillablelStatus(FederatedQuery::QueryMeta::ComputeStatus status, NYql::NDqProto::StatusIds::StatusCode statusCode) { + switch(status) { + case FederatedQuery::QueryMeta::ABORTED_BY_USER: + return statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED; + case FederatedQuery::QueryMeta::ABORTED_BY_SYSTEM: + return false; + case FederatedQuery::QueryMeta::COMPLETED: + return true; + case FederatedQuery::QueryMeta::FAILED: + return IsIn({NYql::NDqProto::StatusIds::BAD_REQUEST, NYql::NDqProto::StatusIds::LIMIT_EXCEEDED}, statusCode); + default: + return false; + } +} + TDuration GetDuration(const TString& value, const TDuration& defaultValue) { TDuration result = defaultValue; diff --git a/ydb/core/fq/libs/control_plane_storage/util.h b/ydb/core/fq/libs/control_plane_storage/util.h index 136472b0ff5..fd9c30cf11b 100644 --- a/ydb/core/fq/libs/control_plane_storage/util.h +++ b/ydb/core/fq/libs/control_plane_storage/util.h @@ -4,6 +4,8 @@ #include <ydb/core/fq/libs/config/protos/control_plane_storage.pb.h> #include <ydb/core/fq/libs/control_plane_storage/events/events.h> +#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h> + #include <google/protobuf/timestamp.pb.h> #include <util/datetime/base.h> @@ -36,6 +38,8 @@ bool IsTerminalStatus(FederatedQuery::QueryMeta::ComputeStatus status); bool IsAbortedStatus(FederatedQuery::QueryMeta::ComputeStatus status); +bool IsBillablelStatus(FederatedQuery::QueryMeta::ComputeStatus status, NYql::NDqProto::StatusIds::StatusCode statusCode); + TDuration GetDuration(const TString& value, const TDuration& defaultValue); NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlaneStorageConfig config); diff --git a/ydb/core/metering/bill_record.h b/ydb/core/metering/bill_record.h index 8edd622f51f..2fbb93d157b 100644 --- a/ydb/core/metering/bill_record.h +++ b/ydb/core/metering/bill_record.h @@ -32,7 +32,7 @@ struct TBillRecord { enum class EUnit { RequestUnit /* "request_unit" */, - Byte /* "byte" */, + MByte /* "mbyte" */, }; BILL_RECORD_FIELD(EType, Type); |