diff options
| author | Aleksandr Khoroshilov <[email protected]> | 2022-05-18 12:52:51 +0300 |
|---|---|---|
| committer | Aleksandr Khoroshilov <[email protected]> | 2022-05-18 12:52:51 +0300 |
| commit | 835ba32ee05b1c38ea642a9d86b4bd11cb18d6fa (patch) | |
| tree | cb727c8834bc38aca2489458eaeb86ec034fc6a6 | |
| parent | ee459a2724495e70c281f3da13f29cf2b9165dce (diff) | |
Query count per cloud quota
ref:df00fb9c4bcf183b5f3735f8ca8d8e459f45d599
6 files changed, 128 insertions, 3 deletions
diff --git a/ydb/core/yq/libs/config/protos/control_plane_storage.proto b/ydb/core/yq/libs/config/protos/control_plane_storage.proto index 4e83ef3b727..6cf23202640 100644 --- a/ydb/core/yq/libs/config/protos/control_plane_storage.proto +++ b/ydb/core/yq/libs/config/protos/control_plane_storage.proto @@ -65,4 +65,5 @@ message TControlPlaneStorageConfig { string ResultSetsTtl = 24; TQueryMapping Mapping = 25; repeated TRetryPolicyMapping RetryPolicyMapping = 26; + string QuotaTtl = 28; } diff --git a/ydb/core/yq/libs/control_plane_storage/config.cpp b/ydb/core/yq/libs/control_plane_storage/config.cpp index afebde45a09..d5171cbc1dd 100644 --- a/ydb/core/yq/libs/control_plane_storage/config.cpp +++ b/ydb/core/yq/libs/control_plane_storage/config.cpp @@ -51,6 +51,8 @@ TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPl TaskLeaseRetryPolicy.RetryCount = Proto.GetTaskLeaseRetryPolicy().GetRetryCount(); TaskLeaseRetryPolicy.RetryPeriod = GetDuration(Proto.GetTaskLeaseRetryPolicy().GetRetryPeriod(), TDuration::Days(1)); } + + QuotaTtl = GetDuration(Proto.GetQuotaTtl(), TDuration::Zero()); } } // NYq diff --git a/ydb/core/yq/libs/control_plane_storage/config.h b/ydb/core/yq/libs/control_plane_storage/config.h index bd637932ee4..7097e3c6e18 100644 --- a/ydb/core/yq/libs/control_plane_storage/config.h +++ b/ydb/core/yq/libs/control_plane_storage/config.h @@ -25,6 +25,7 @@ struct TControlPlaneStorageConfig { TSet<YandexQuery::BindingSetting::BindingCase> AvailableBindings; THashMap<ui64, TRetryPolicyItem> RetryPolicies; TRetryPolicyItem TaskLeaseRetryPolicy; + TDuration QuotaTtl; TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NConfig::TCommonConfig& common); }; diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h index adfa624ee2f..adaba3b58f1 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h @@ -121,6 +121,7 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont RTC_WRITE_RESULT_DATA, RTC_GET_TASK, RTC_NODES_HEALTH_CHECK, + RTS_QUOTA_USAGE, RTC_MAX, }; @@ -142,6 +143,7 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont { MakeIntrusive<TRequestCounters>("WriteResultData") }, { MakeIntrusive<TRequestCounters>("GetTask") }, { MakeIntrusive<TRequestCounters>("NodesHealthCheck") }, + { MakeIntrusive<TRequestCounters>("GetQuotaUsage") }, }); TMap<TMetricsScope, TScopeCountersPtr> ScopeCounters; @@ -234,6 +236,13 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont NKikimr::TYdbCredentialsProviderFactory CredProviderFactory; TString TenantName; + // Query Quota + THashMap<TString, ui32> QueryQuotas; + THashMap<TString, TEvQuotaService::TQuotaUsageRequest::TPtr> QueryQuotaRequests; + TInstant QuotasUpdatedAt = TInstant::Zero(); + ui32 QuotaGeneration = 0; + bool QuotasUpdating = false; + public: TYdbControlPlaneStorageActor( const NConfig::TControlPlaneStorageConfig& config, @@ -281,6 +290,7 @@ public: hFunc(TEvControlPlaneStorage::TEvNodesHealthCheckRequest, Handle); hFunc(NMon::TEvHttpInfo, Handle); hFunc(TEvQuotaService::TQuotaUsageRequest, Handle); + hFunc(TEvQuotaService::TQuotaUsageResponse, Handle); ) void Handle(TEvControlPlaneStorage::TEvCreateQueryRequest::TPtr& ev); @@ -313,6 +323,7 @@ public: void Handle(TEvControlPlaneStorage::TEvNodesHealthCheckRequest::TPtr& ev); void Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev); + void Handle(TEvQuotaService::TQuotaUsageResponse::TPtr& ev); template<typename T> NYql::TIssues ValidateConnection(T& ev, bool clickHousePasswordRequire = true) diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index c283e85196c..9eec713d251 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -115,7 +115,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery auto& quota = it->second; if (!quota.Usage) { issues.AddIssue(MakeErrorIssue(TIssuesIds::NOT_READY, "Control Plane is not ready yet. Please retry later.")); - } else if (*quota.Usage > quota.Limit) { + } else if (*quota.Usage >= quota.Limit) { issues.AddIssue(MakeErrorIssue(TIssuesIds::QUOTA_EXCEEDED, Sprintf("Too many queries (%lu of %lu). Please delete other queries or increase limits.", *quota.Usage, quota.Limit))); } } diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp index e14f9455572..8cf0490da0f 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp @@ -10,6 +10,7 @@ #include <ydb/core/yq/libs/control_plane_storage/events/events.h> #include <ydb/core/yq/libs/control_plane_storage/schema.h> #include <ydb/core/yq/libs/db_schema/db_schema.h> +#include <ydb/core/yq/libs/quota_manager/quota_manager.h> #include <ydb/public/api/protos/yq.pb.h> #include <ydb/public/sdk/cpp/client/ydb_value/value.h> @@ -18,10 +19,119 @@ namespace NYq { +struct TCloudAggregator { + TAtomic Started = 0; + TAtomic Finished = 0; +}; + void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev) { - auto& request = *ev->Get(); - Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(request.SubjectType, request.SubjectId, request.MetricName, 0)); + if (ev->Get()->SubjectType != SUBJECT_TYPE_CLOUD || ev->Get()->MetricName != QUOTA_COUNT_LIMIT) { + Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(ev->Get()->SubjectType, ev->Get()->SubjectId, ev->Get()->MetricName, 0)); + } + + if (QuotasUpdatedAt + Config.QuotaTtl > Now()) { + Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, ev->Get()->SubjectId, QUOTA_COUNT_LIMIT, QueryQuotas.Value(ev->Get()->SubjectId, 0))); + } + + QueryQuotaRequests[ev->Get()->SubjectId] = ev; + + if (QuotasUpdating) { + return; + } + + QuotasUpdating = true; + QuotaGeneration++; + QueryQuotas.clear(); + + TRequestCountersPtr requestCounters = Counters.GetCommonCounters(RTS_QUOTA_USAGE); + + TSqlQueryBuilder queryBuilder(YdbConnection->TablePathPrefix, "CountPendingQueries"); + + queryBuilder.AddText( + "SELECT `" SCOPE_COLUMN_NAME "`, COUNT(`" SCOPE_COLUMN_NAME "`) AS PENDING_COUNT\n" + "FROM `" PENDING_SMALL_TABLE_NAME "`\n" + "GROUP BY `" SCOPE_COLUMN_NAME "`\n" + ); + + const auto query = queryBuilder.Build(); + auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; + auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); + + auto aggregator = std::make_shared<TCloudAggregator>(); + + result.Apply([=, resultSets=resultSets, generation=QuotaGeneration](const auto& future) { + try { + TStatus status = future.GetValue(); + if (status.IsSuccess() && resultSets->size() == 1) { + TResultSetParser parser(resultSets->front()); + while (parser.TryNextRow()) { + auto scope = *parser.ColumnParser(SCOPE_COLUMN_NAME).GetOptionalString(); + auto count = parser.ColumnParser("PENDING_COUNT").GetUint64(); + TSqlQueryBuilder queryBuilder(YdbConnection->TablePathPrefix, "GetQueryCloudId"); + + queryBuilder.AddText( + "SELECT `" INTERNAL_COLUMN_NAME "`\n" + "FROM `" QUERIES_TABLE_NAME "`\n" + "WHERE `" SCOPE_COLUMN_NAME "` = $scope LIMIT 1;\n" + ); + + queryBuilder.AddString("scope", scope); + + const auto query = queryBuilder.Build(); + auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; + + AtomicIncrement(aggregator->Started); + auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); + + result.Apply([=, resultSets=resultSets](const auto& future) { + try { + TStatus status = future.GetValue(); + if (status.IsSuccess() && resultSets->size() == 1) { + TResultSetParser parser(resultSets->front()); + if (parser.TryNextRow()) { + YandexQuery::Internal::QueryInternal internal; + if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support"; + } + Send(SelfId(), new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, internal.cloud_id(), QUOTA_COUNT_LIMIT, count), 0, generation); + } + } + AtomicIncrement(aggregator->Finished); + if (AtomicGet(aggregator->Started) == AtomicGet(aggregator->Finished)) { + Send(SelfId(), new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, "", QUOTA_COUNT_LIMIT, 0), 0, generation); + } + } catch (...) { + // Cerr << "EX2 " << CurrentExceptionMessage() << Endl; + } + }); + } + if (AtomicGet(aggregator->Started) == 0) { + Send(SelfId(), new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, "", QUOTA_COUNT_LIMIT, 0), 0, generation); + } + } + } catch (...) { + // Cerr << "EX1 " << CurrentExceptionMessage() << Endl; + } + }); +} + +void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageResponse::TPtr& ev) +{ + if (ev->Cookie == QuotaGeneration) { + auto subjectId = ev->Get()->SubjectId; + if (subjectId == "") { + QuotasUpdatedAt = Now(); + for (auto& it : QueryQuotaRequests) { + auto ev = it.second; + Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, it.first, QUOTA_COUNT_LIMIT, QueryQuotas.Value(it.first, 0))); + } + QueryQuotaRequests.clear(); + QuotasUpdating = false; + } else { + QueryQuotas[subjectId] += ev->Get()->Usage; + } + } } } // NYq |
