summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <[email protected]>2022-05-18 12:52:51 +0300
committerAleksandr Khoroshilov <[email protected]>2022-05-18 12:52:51 +0300
commit835ba32ee05b1c38ea642a9d86b4bd11cb18d6fa (patch)
treecb727c8834bc38aca2489458eaeb86ec034fc6a6
parentee459a2724495e70c281f3da13f29cf2b9165dce (diff)
Query count per cloud quota
ref:df00fb9c4bcf183b5f3735f8ca8d8e459f45d599
-rw-r--r--ydb/core/yq/libs/config/protos/control_plane_storage.proto1
-rw-r--r--ydb/core/yq/libs/control_plane_storage/config.cpp2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/config.h1
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h11
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp114
6 files changed, 128 insertions, 3 deletions
diff --git a/ydb/core/yq/libs/config/protos/control_plane_storage.proto b/ydb/core/yq/libs/config/protos/control_plane_storage.proto
index 4e83ef3b727..6cf23202640 100644
--- a/ydb/core/yq/libs/config/protos/control_plane_storage.proto
+++ b/ydb/core/yq/libs/config/protos/control_plane_storage.proto
@@ -65,4 +65,5 @@ message TControlPlaneStorageConfig {
string ResultSetsTtl = 24;
TQueryMapping Mapping = 25;
repeated TRetryPolicyMapping RetryPolicyMapping = 26;
+ string QuotaTtl = 28;
}
diff --git a/ydb/core/yq/libs/control_plane_storage/config.cpp b/ydb/core/yq/libs/control_plane_storage/config.cpp
index afebde45a09..d5171cbc1dd 100644
--- a/ydb/core/yq/libs/control_plane_storage/config.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/config.cpp
@@ -51,6 +51,8 @@ TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPl
TaskLeaseRetryPolicy.RetryCount = Proto.GetTaskLeaseRetryPolicy().GetRetryCount();
TaskLeaseRetryPolicy.RetryPeriod = GetDuration(Proto.GetTaskLeaseRetryPolicy().GetRetryPeriod(), TDuration::Days(1));
}
+
+ QuotaTtl = GetDuration(Proto.GetQuotaTtl(), TDuration::Zero());
}
} // NYq
diff --git a/ydb/core/yq/libs/control_plane_storage/config.h b/ydb/core/yq/libs/control_plane_storage/config.h
index bd637932ee4..7097e3c6e18 100644
--- a/ydb/core/yq/libs/control_plane_storage/config.h
+++ b/ydb/core/yq/libs/control_plane_storage/config.h
@@ -25,6 +25,7 @@ struct TControlPlaneStorageConfig {
TSet<YandexQuery::BindingSetting::BindingCase> AvailableBindings;
THashMap<ui64, TRetryPolicyItem> RetryPolicies;
TRetryPolicyItem TaskLeaseRetryPolicy;
+ TDuration QuotaTtl;
TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NConfig::TCommonConfig& common);
};
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
index adfa624ee2f..adaba3b58f1 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
@@ -121,6 +121,7 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont
RTC_WRITE_RESULT_DATA,
RTC_GET_TASK,
RTC_NODES_HEALTH_CHECK,
+ RTS_QUOTA_USAGE,
RTC_MAX,
};
@@ -142,6 +143,7 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont
{ MakeIntrusive<TRequestCounters>("WriteResultData") },
{ MakeIntrusive<TRequestCounters>("GetTask") },
{ MakeIntrusive<TRequestCounters>("NodesHealthCheck") },
+ { MakeIntrusive<TRequestCounters>("GetQuotaUsage") },
});
TMap<TMetricsScope, TScopeCountersPtr> ScopeCounters;
@@ -234,6 +236,13 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont
NKikimr::TYdbCredentialsProviderFactory CredProviderFactory;
TString TenantName;
+ // Query Quota
+ THashMap<TString, ui32> QueryQuotas;
+ THashMap<TString, TEvQuotaService::TQuotaUsageRequest::TPtr> QueryQuotaRequests;
+ TInstant QuotasUpdatedAt = TInstant::Zero();
+ ui32 QuotaGeneration = 0;
+ bool QuotasUpdating = false;
+
public:
TYdbControlPlaneStorageActor(
const NConfig::TControlPlaneStorageConfig& config,
@@ -281,6 +290,7 @@ public:
hFunc(TEvControlPlaneStorage::TEvNodesHealthCheckRequest, Handle);
hFunc(NMon::TEvHttpInfo, Handle);
hFunc(TEvQuotaService::TQuotaUsageRequest, Handle);
+ hFunc(TEvQuotaService::TQuotaUsageResponse, Handle);
)
void Handle(TEvControlPlaneStorage::TEvCreateQueryRequest::TPtr& ev);
@@ -313,6 +323,7 @@ public:
void Handle(TEvControlPlaneStorage::TEvNodesHealthCheckRequest::TPtr& ev);
void Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev);
+ void Handle(TEvQuotaService::TQuotaUsageResponse::TPtr& ev);
template<typename T>
NYql::TIssues ValidateConnection(T& ev, bool clickHousePasswordRequire = true)
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
index c283e85196c..9eec713d251 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
@@ -115,7 +115,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
auto& quota = it->second;
if (!quota.Usage) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::NOT_READY, "Control Plane is not ready yet. Please retry later."));
- } else if (*quota.Usage > quota.Limit) {
+ } else if (*quota.Usage >= quota.Limit) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::QUOTA_EXCEEDED, Sprintf("Too many queries (%lu of %lu). Please delete other queries or increase limits.", *quota.Usage, quota.Limit)));
}
}
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
index e14f9455572..8cf0490da0f 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
@@ -10,6 +10,7 @@
#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
#include <ydb/core/yq/libs/control_plane_storage/schema.h>
#include <ydb/core/yq/libs/db_schema/db_schema.h>
+#include <ydb/core/yq/libs/quota_manager/quota_manager.h>
#include <ydb/public/api/protos/yq.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
@@ -18,10 +19,119 @@
namespace NYq {
+struct TCloudAggregator {
+ TAtomic Started = 0;
+ TAtomic Finished = 0;
+};
+
void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev)
{
- auto& request = *ev->Get();
- Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(request.SubjectType, request.SubjectId, request.MetricName, 0));
+ if (ev->Get()->SubjectType != SUBJECT_TYPE_CLOUD || ev->Get()->MetricName != QUOTA_COUNT_LIMIT) {
+ Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(ev->Get()->SubjectType, ev->Get()->SubjectId, ev->Get()->MetricName, 0));
+ }
+
+ if (QuotasUpdatedAt + Config.QuotaTtl > Now()) {
+ Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, ev->Get()->SubjectId, QUOTA_COUNT_LIMIT, QueryQuotas.Value(ev->Get()->SubjectId, 0)));
+ }
+
+ QueryQuotaRequests[ev->Get()->SubjectId] = ev;
+
+ if (QuotasUpdating) {
+ return;
+ }
+
+ QuotasUpdating = true;
+ QuotaGeneration++;
+ QueryQuotas.clear();
+
+ TRequestCountersPtr requestCounters = Counters.GetCommonCounters(RTS_QUOTA_USAGE);
+
+ TSqlQueryBuilder queryBuilder(YdbConnection->TablePathPrefix, "CountPendingQueries");
+
+ queryBuilder.AddText(
+ "SELECT `" SCOPE_COLUMN_NAME "`, COUNT(`" SCOPE_COLUMN_NAME "`) AS PENDING_COUNT\n"
+ "FROM `" PENDING_SMALL_TABLE_NAME "`\n"
+ "GROUP BY `" SCOPE_COLUMN_NAME "`\n"
+ );
+
+ const auto query = queryBuilder.Build();
+ auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
+ auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo);
+
+ auto aggregator = std::make_shared<TCloudAggregator>();
+
+ result.Apply([=, resultSets=resultSets, generation=QuotaGeneration](const auto& future) {
+ try {
+ TStatus status = future.GetValue();
+ if (status.IsSuccess() && resultSets->size() == 1) {
+ TResultSetParser parser(resultSets->front());
+ while (parser.TryNextRow()) {
+ auto scope = *parser.ColumnParser(SCOPE_COLUMN_NAME).GetOptionalString();
+ auto count = parser.ColumnParser("PENDING_COUNT").GetUint64();
+ TSqlQueryBuilder queryBuilder(YdbConnection->TablePathPrefix, "GetQueryCloudId");
+
+ queryBuilder.AddText(
+ "SELECT `" INTERNAL_COLUMN_NAME "`\n"
+ "FROM `" QUERIES_TABLE_NAME "`\n"
+ "WHERE `" SCOPE_COLUMN_NAME "` = $scope LIMIT 1;\n"
+ );
+
+ queryBuilder.AddString("scope", scope);
+
+ const auto query = queryBuilder.Build();
+ auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
+
+ AtomicIncrement(aggregator->Started);
+ auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo);
+
+ result.Apply([=, resultSets=resultSets](const auto& future) {
+ try {
+ TStatus status = future.GetValue();
+ if (status.IsSuccess() && resultSets->size() == 1) {
+ TResultSetParser parser(resultSets->front());
+ if (parser.TryNextRow()) {
+ YandexQuery::Internal::QueryInternal internal;
+ if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support";
+ }
+ Send(SelfId(), new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, internal.cloud_id(), QUOTA_COUNT_LIMIT, count), 0, generation);
+ }
+ }
+ AtomicIncrement(aggregator->Finished);
+ if (AtomicGet(aggregator->Started) == AtomicGet(aggregator->Finished)) {
+ Send(SelfId(), new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, "", QUOTA_COUNT_LIMIT, 0), 0, generation);
+ }
+ } catch (...) {
+ // Cerr << "EX2 " << CurrentExceptionMessage() << Endl;
+ }
+ });
+ }
+ if (AtomicGet(aggregator->Started) == 0) {
+ Send(SelfId(), new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, "", QUOTA_COUNT_LIMIT, 0), 0, generation);
+ }
+ }
+ } catch (...) {
+ // Cerr << "EX1 " << CurrentExceptionMessage() << Endl;
+ }
+ });
+}
+
+void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageResponse::TPtr& ev)
+{
+ if (ev->Cookie == QuotaGeneration) {
+ auto subjectId = ev->Get()->SubjectId;
+ if (subjectId == "") {
+ QuotasUpdatedAt = Now();
+ for (auto& it : QueryQuotaRequests) {
+ auto ev = it.second;
+ Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, it.first, QUOTA_COUNT_LIMIT, QueryQuotas.Value(it.first, 0)));
+ }
+ QueryQuotaRequests.clear();
+ QuotasUpdating = false;
+ } else {
+ QueryQuotas[subjectId] += ev->Get()->Usage;
+ }
+ }
}
} // NYq