summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <[email protected]>2023-05-11 17:43:27 +0300
committerhor911 <[email protected]>2023-05-11 17:43:27 +0300
commitf92d86b7c6a95421b7ba1796565b3e4c2780cf34 (patch)
tree339f5a52bab5eced447dbec2593622ad10842174
parent35bb1cc919fd91eb1bd4981766811e7b09ecac47 (diff)
Tune Billing for Ingress
-rw-r--r--ydb/core/fq/libs/actors/run_actor.cpp4
-rw-r--r--ydb/core/fq/libs/control_plane_storage/control_plane_storage.h4
-rw-r--r--ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp44
-rw-r--r--ydb/core/fq/libs/control_plane_storage/internal/utils.cpp79
-rw-r--r--ydb/core/fq/libs/control_plane_storage/internal/utils.h2
-rw-r--r--ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto2
-rw-r--r--ydb/core/fq/libs/control_plane_storage/util.cpp15
-rw-r--r--ydb/core/fq/libs/control_plane_storage/util.h4
-rw-r--r--ydb/core/metering/bill_record.h2
9 files changed, 92 insertions, 64 deletions
diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp
index 6b9ebafff7b..669e09ec837 100644
--- a/ydb/core/fq/libs/actors/run_actor.cpp
+++ b/ydb/core/fq/libs/actors/run_actor.cpp
@@ -1649,7 +1649,9 @@ private:
FinalQueryStatus = status;
QueryStateUpdateRequest.set_status(FinalQueryStatus); // Can be changed later.
- QueryStateUpdateRequest.set_status_code(NYql::NDqProto::StatusIds::SUCCESS);
+ if (FinalQueryStatus == FederatedQuery::QueryMeta::COMPLETED && QueryStateUpdateRequest.status_code() == NYql::NDqProto::StatusIds::UNSPECIFIED) {
+ QueryStateUpdateRequest.set_status_code(NYql::NDqProto::StatusIds::SUCCESS);
+ }
*QueryStateUpdateRequest.mutable_finished_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds());
Become(&TRunActor::StateFuncWrapper<&TRunActor::FinishStateFunc>);
diff --git a/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h b/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h
index 790886c1aed..ef01e0deae0 100644
--- a/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h
+++ b/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h
@@ -8,6 +8,8 @@
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>
#include <ydb/core/fq/libs/actors/logging/log.h>
+#define CPS_LOG_N(s) \
+ LOG_YQ_CONTROL_PLANE_STORAGE_NOTICE(s)
#define CPS_LOG_D(s) \
LOG_YQ_CONTROL_PLANE_STORAGE_DEBUG(s)
#define CPS_LOG_I(s) \
@@ -20,6 +22,8 @@
LOG_YQ_CONTROL_PLANE_STORAGE_TRACE(s)
+#define CPS_LOG_AS_N(a, s) \
+ LOG_YQ_CONTROL_PLANE_STORAGE_AS_NOTICE(a, s)
#define CPS_LOG_AS_D(a, s) \
LOG_YQ_CONTROL_PLANE_STORAGE_AS_DEBUG(a, s)
#define CPS_LOG_AS_I(a, s) \
diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp
index 8510b37576c..b4b499d6d9b 100644
--- a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp
@@ -17,7 +17,7 @@ struct TPingTaskParams {
TString Query;
TParams Params;
const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)> Prepare;
- std::vector<TString> Metrics;
+ std::shared_ptr<std::vector<TString>> MeteringRecords;
};
TPingTaskParams ConstructHardPingTask(
@@ -44,6 +44,8 @@ TPingTaskParams ConstructHardPingTask(
"FROM `" PENDING_SMALL_TABLE_NAME "` WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
);
+ auto meteringRecords = std::make_shared<std::vector<TString>>();
+
auto prepareParams = [=, counters=counters, actorSystem = NActors::TActivationContext::ActorSystem()](const TVector<TResultSet>& resultSets) {
TString jobId;
FederatedQuery::Query query;
@@ -166,6 +168,10 @@ TPingTaskParams ConstructHardPingTask(
job.mutable_query_meta()->set_status(*queryStatus);
}
+ if (request.status_code() != NYql::NDqProto::StatusIds::UNSPECIFIED) {
+ internal.set_status_code(request.status_code());
+ }
+
if (issues) {
NYql::IssuesToMessage(*issues, query.mutable_issue());
NYql::IssuesToMessage(*issues, job.mutable_issue());
@@ -389,20 +395,26 @@ TPingTaskParams ConstructHardPingTask(
*response->mutable_expired_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(ttl.MilliSeconds());
}
const auto writeQuery = writeQueryBuilder.Build();
+
+ if (IsTerminalStatus(request.status())) {
+ try {
+ auto isBillable = IsBillablelStatus(request.status(), internal.status_code());
+ if (!isBillable) {
+ CPS_LOG_AS_N(*actorSystem, "Query " << request.query_id().value() << " is NOT billable, status: "
+ << FederatedQuery::QueryMeta::ComputeStatus_Name(request.status())
+ << ", statusCode: " << NYql::NDqProto::StatusIds_StatusCode_Name(internal.status_code()));
+ }
+ auto records = GetMeteringRecords(request.statistics(), isBillable, jobId, request.scope(), HostName());
+ meteringRecords->swap(records);
+ } catch (const std::exception&) {
+ CPS_LOG_AS_E(*actorSystem, "Error on statistics meterification: " << CurrentExceptionMessage());
+ }
+ }
+
return std::make_pair(writeQuery.Sql, writeQuery.Params);
};
const auto readQuery = readQueryBuilder.Build();
- std::vector<TString> meteringRecords;
-
- if (IsTerminalStatus(request.status()) && request.statistics()) {
- try {
- meteringRecords = GetMeteringRecords(request.statistics(), request.query_id().value(), request.scope(), HostName());
- } catch (const std::exception&) {
- CPS_LOG_E("Error on statistics meterification: " << CurrentExceptionMessage());
- }
- }
-
return {readQuery.Sql, readQuery.Params, prepareParams, meteringRecords};
}
@@ -469,7 +481,7 @@ TPingTaskParams ConstructSoftPingTask(
return std::make_pair(writeQuery.Sql, writeQuery.Params);
};
const auto readQuery = readQueryBuilder.Build();
- return {readQuery.Sql, readQuery.Params, prepareParams, std::vector<TString>{}};
+ return {readQuery.Sql, readQuery.Params, prepareParams, std::shared_ptr<std::vector<TString>>{}};
}
void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskRequest::TPtr& ev)
@@ -530,11 +542,13 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq
prepare,
debugInfo);
- success.Apply([=, actorSystem=NActors::TActivationContext::ActorSystem(), metrics=pingTaskParams.Metrics](const auto& future) {
+ success.Apply([=, actorSystem=NActors::TActivationContext::ActorSystem(), meteringRecords=pingTaskParams.MeteringRecords](const auto& future) {
TDuration delta = TInstant::Now() - startTime;
LWPROBE(PingTaskRequest, queryId, delta, future.GetValue());
- for (const auto& metric : metrics) {
- actorSystem->Send(NKikimr::NMetering::MakeMeteringServiceID(), new NKikimr::NMetering::TEvMetering::TEvWriteMeteringJson(metric));
+ if (meteringRecords) {
+ for (const auto& metric : *meteringRecords) {
+ actorSystem->Send(NKikimr::NMetering::MakeMeteringServiceID(), new NKikimr::NMetering::TEvMetering::TEvWriteMeteringJson(metric));
+ }
}
});
}
diff --git a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
index ad85a781179..025e3941e8a 100644
--- a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
@@ -110,61 +110,48 @@ NYql::TIssues ValidateCreateOrDeleteRateLimiterResource(const TString& queryId,
return issues;
}
-std::vector<TString> GetMeteringRecords(const TString& statistics, const TString& queryId, const TString& scope, const TString& sourceId) {
+std::vector<TString> GetMeteringRecords(const TString& statistics, bool billable, const TString& jobId, const TString& scope, const TString& sourceId) {
std::vector<TString> result;
NJson::TJsonReaderConfig jsonConfig;
NJson::TJsonValue stat;
- if (NJson::ReadJsonTree(statistics, &jsonConfig, &stat)) {
- ui64 ingress = 0;
- ui64 egress = 0;
- for (const auto& p : stat.GetMap()) {
- if (p.first.StartsWith("Graph=") || p.first.StartsWith("Precompute=")) {
- if (auto* ingressNode = p.second.GetValueByPath("TaskRunner.Stage=Total.IngressS3SourceBytes.count")) {
- ingress += ingressNode->GetInteger();
- }
- if (auto* egressNode = p.second.GetValueByPath("TaskRunner.Stage=Total.EgressS3SinkBytes.count")) {
- egress += egressNode->GetInteger();
+ ui64 ingress = 0;
+
+ if (billable) {
+ if (NJson::ReadJsonTree(statistics, &jsonConfig, &stat)) {
+ for (const auto& p : stat.GetMap()) {
+ if (p.first.StartsWith("Graph=") || p.first.StartsWith("Precompute=")) {
+ if (auto* ingressNode = p.second.GetValueByPath("TaskRunner.Stage=Total.IngressS3SourceBytes.count")) {
+ ingress += ingressNode->GetInteger();
+ }
}
}
}
- if (ingress) {
- auto now = Now();
- result.emplace_back(TBillRecord()
- .Id(queryId + "_osi")
- .Schema("yq.object_storage.ingress")
- .FolderId(TScope(scope).ParseFolder())
- .SourceWt(now)
- .SourceId(sourceId)
- .Usage(TBillRecord::TUsage()
- .Type(TBillRecord::TUsage::EType::Delta)
- .Unit(TBillRecord::TUsage::EUnit::Byte)
- .Quantity(ingress)
- .Start(now)
- .Finish(now)
- )
- .ToString()
- );
- }
- if (egress) {
- auto now = Now();
- result.emplace_back(TBillRecord()
- .Id(queryId + "_ose")
- .Schema("yq.object_storage.egress")
- .FolderId(TScope(scope).ParseFolder())
- .SourceWt(now)
- .SourceId(sourceId)
- .Usage(TBillRecord::TUsage()
- .Type(TBillRecord::TUsage::EType::Delta)
- .Unit(TBillRecord::TUsage::EUnit::Byte)
- .Quantity(egress)
- .Start(now)
- .Finish(now)
- )
- .ToString()
- );
+ }
+
+ if (ingress) {
+ auto ingressMBytes = (ingress + 1_MB - 1) >> 20; // round up to 1 MB boundary
+ if (ingressMBytes < 10) {
+ ingressMBytes = 10;
}
+
+ auto now = Now();
+ result.emplace_back(TBillRecord()
+ .Id(jobId + "_i")
+ .Schema("yq.ingress.mbytes")
+ .FolderId(TScope(scope).ParseFolder())
+ .SourceWt(now)
+ .SourceId(sourceId)
+ .Usage(TBillRecord::TUsage()
+ .Type(TBillRecord::TUsage::EType::Delta)
+ .Unit(TBillRecord::TUsage::EUnit::MByte)
+ .Quantity(ingressMBytes)
+ .Start(now)
+ .Finish(now)
+ )
+ .ToString()
+ );
}
return result;
diff --git a/ydb/core/fq/libs/control_plane_storage/internal/utils.h b/ydb/core/fq/libs/control_plane_storage/internal/utils.h
index 77439bfaab8..6e8a226d92b 100644
--- a/ydb/core/fq/libs/control_plane_storage/internal/utils.h
+++ b/ydb/core/fq/libs/control_plane_storage/internal/utils.h
@@ -32,7 +32,7 @@ NYql::TIssues ValidateNodesHealthCheck(
NYql::TIssues ValidateCreateOrDeleteRateLimiterResource(const TString& queryId, const TString& scope, const TString& tenant, const TString& owner);
-std::vector<TString> GetMeteringRecords(const TString& statistics, const TString& queryId, const TString& scope, const TString& sourceId);
+std::vector<TString> GetMeteringRecords(const TString& statistics, bool billable, const TString& jobId, const TString& scope, const TString& sourceId);
TString GetPrettyStatistics(const TString& statistics);
};
diff --git a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto
index 24a22049850..dcb545065e1 100644
--- a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto
+++ b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto
@@ -6,6 +6,7 @@ option java_package = "com.yandex.query.internal";
option java_outer_classname = "YandexQueryInternalProtos";
import "ydb/library/yql/providers/dq/api/protos/service.proto";
+import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto";
import "ydb/library/yql/dq/proto/dq_tasks.proto";
import "ydb/public/api/protos/ydb_issue_message.proto";
import "ydb/core/fq/libs/protos/fq_private.proto";
@@ -40,6 +41,7 @@ message QueryInternal {
repeated Fq.Private.CompressedData dq_graph_compressed = 20;
Fq.Private.TaskResources resources = 21;
repeated Ydb.Issue.IssueMessage internal_issue = 22;
+ NYql.NDqProto.StatusIds.StatusCode status_code = 23;
}
message JobInternal {
diff --git a/ydb/core/fq/libs/control_plane_storage/util.cpp b/ydb/core/fq/libs/control_plane_storage/util.cpp
index 20e8cb21bf1..61eaa149bfc 100644
--- a/ydb/core/fq/libs/control_plane_storage/util.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/util.cpp
@@ -47,6 +47,21 @@ bool IsAbortedStatus(FederatedQuery::QueryMeta::ComputeStatus status)
return IsIn({ FederatedQuery::QueryMeta::ABORTED_BY_USER, FederatedQuery::QueryMeta::ABORTED_BY_SYSTEM }, status);
}
+bool IsBillablelStatus(FederatedQuery::QueryMeta::ComputeStatus status, NYql::NDqProto::StatusIds::StatusCode statusCode) {
+ switch(status) {
+ case FederatedQuery::QueryMeta::ABORTED_BY_USER:
+ return statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED;
+ case FederatedQuery::QueryMeta::ABORTED_BY_SYSTEM:
+ return false;
+ case FederatedQuery::QueryMeta::COMPLETED:
+ return true;
+ case FederatedQuery::QueryMeta::FAILED:
+ return IsIn({NYql::NDqProto::StatusIds::BAD_REQUEST, NYql::NDqProto::StatusIds::LIMIT_EXCEEDED}, statusCode);
+ default:
+ return false;
+ }
+}
+
TDuration GetDuration(const TString& value, const TDuration& defaultValue)
{
TDuration result = defaultValue;
diff --git a/ydb/core/fq/libs/control_plane_storage/util.h b/ydb/core/fq/libs/control_plane_storage/util.h
index 136472b0ff5..fd9c30cf11b 100644
--- a/ydb/core/fq/libs/control_plane_storage/util.h
+++ b/ydb/core/fq/libs/control_plane_storage/util.h
@@ -4,6 +4,8 @@
#include <ydb/core/fq/libs/config/protos/control_plane_storage.pb.h>
#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
+#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
+
#include <google/protobuf/timestamp.pb.h>
#include <util/datetime/base.h>
@@ -36,6 +38,8 @@ bool IsTerminalStatus(FederatedQuery::QueryMeta::ComputeStatus status);
bool IsAbortedStatus(FederatedQuery::QueryMeta::ComputeStatus status);
+bool IsBillablelStatus(FederatedQuery::QueryMeta::ComputeStatus status, NYql::NDqProto::StatusIds::StatusCode statusCode);
+
TDuration GetDuration(const TString& value, const TDuration& defaultValue);
NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlaneStorageConfig config);
diff --git a/ydb/core/metering/bill_record.h b/ydb/core/metering/bill_record.h
index 8edd622f51f..2fbb93d157b 100644
--- a/ydb/core/metering/bill_record.h
+++ b/ydb/core/metering/bill_record.h
@@ -32,7 +32,7 @@ struct TBillRecord {
enum class EUnit {
RequestUnit /* "request_unit" */,
- Byte /* "byte" */,
+ MByte /* "mbyte" */,
};
BILL_RECORD_FIELD(EType, Type);