diff options
author | hor911 <hor911@ydb.tech> | 2022-08-03 10:04:26 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-08-03 10:04:26 +0300 |
commit | 695ebb512048b415f75334b9701bbb1d6b70d3f3 (patch) | |
tree | c9d36dbf6c499a8258d88a3f1719faa6caa5a099 | |
parent | d34d0090f4ff0c5c7661d485ddd5d28a61ef3841 (diff) | |
download | ydb-695ebb512048b415f75334b9701bbb1d6b70d3f3.tar.gz |
Quota update hook + cloud conformant rename
4 files changed, 108 insertions, 41 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h index 3baabb24de..4b91680385 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h @@ -367,6 +367,7 @@ public: hFunc(TEvControlPlaneStorage::TEvDeleteRateLimiterResourceRequest, Handle); hFunc(NMon::TEvHttpInfo, Handle); hFunc(TEvQuotaService::TQuotaUsageRequest, Handle); + hFunc(TEvQuotaService::TQuotaLimitChangeRequest, Handle); hFunc(TEvents::TEvCallback, [](TEvents::TEvCallback::TPtr& ev) { ev->Get()->Callback(); } ); ) @@ -402,6 +403,7 @@ public: void Handle(TEvControlPlaneStorage::TEvNodesHealthCheckRequest::TPtr& ev); void Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev); + void Handle(TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev); template <class TEventPtr, class TRequestActor, ERequestTypeCommon requestType> void HandleRateLimiterImpl(TEventPtr& ev); diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp index 630cc72aa5..9f0bac16a8 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp @@ -96,4 +96,11 @@ void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::T Exec(DbPool, executable); } +void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev) { + auto& record = *ev->Get(); + // LimitRequested is always accepted + Send(ev->Sender, new TEvQuotaService::TQuotaLimitChangeResponse(record.SubjectType, record.SubjectId, record.MetricName, + record.LimitRequested, record.LimitRequested)); +} + } // NYq diff --git a/ydb/core/yq/libs/quota_manager/events/events.h b/ydb/core/yq/libs/quota_manager/events/events.h index 2dded44bcb..c43204dad5 100644 --- a/ydb/core/yq/libs/quota_manager/events/events.h +++ b/ydb/core/yq/libs/quota_manager/events/events.h @@ -17,19 +17,21 @@ namespace NYq { constexpr auto SUBJECT_TYPE_CLOUD = "cloud"; -constexpr auto QUOTA_RESULT_LIMIT = "fq.queryResultLimit.bytes"; -constexpr auto QUOTA_COUNT_LIMIT = "fq.queryLimit.count"; -constexpr auto QUOTA_TIME_LIMIT = "fq.queryLimit.ttl"; -constexpr auto QUOTA_CPU_LIMIT = "fq.cpu.count"; + +constexpr auto QUOTA_CPU_LIMIT = "yq.cpu.count"; +constexpr auto QUOTA_COUNT_LIMIT = "yq.query.count"; +constexpr auto QUOTA_MEMORY_LIMIT = "yq.memory.size"; +constexpr auto QUOTA_TIME_LIMIT = "yq.ttlInSeconds.count"; +constexpr auto QUOTA_RESULT_LIMIT = "yq.result.size"; struct TQuotaInfo { ui64 DefaultLimit; ui64 HardLimit; - NActors::TActorId UsageUpdater; - TQuotaInfo(ui64 defaultLimit, ui64 hardLimit = 0, NActors::TActorId usageUpdater = {}) + NActors::TActorId QuotaController; + TQuotaInfo(ui64 defaultLimit, ui64 hardLimit = 0, NActors::TActorId quotaController = {}) : DefaultLimit(defaultLimit) , HardLimit(hardLimit) - , UsageUpdater(usageUpdater) + , QuotaController(quotaController) { } }; @@ -38,10 +40,10 @@ struct TQuotaDescription { TString SubjectType; TString MetricName; TQuotaInfo Info; - TQuotaDescription(const TString& subjectType, const TString& metricName, ui64 defaultLimit, ui64 hardLimit = 0, NActors::TActorId usageUpdater = {}) + TQuotaDescription(const TString& subjectType, const TString& metricName, ui64 defaultLimit, ui64 hardLimit = 0, NActors::TActorId quotaController = {}) : SubjectType(subjectType) , MetricName(metricName) - , Info(defaultLimit, hardLimit, usageUpdater) + , Info(defaultLimit, hardLimit, quotaController) { } }; @@ -227,20 +229,22 @@ struct TEvQuotaService { struct TQuotaLimitChangeRequest : public NActors::TEventLocal<TQuotaLimitChangeRequest, EvQuotaLimitChangeRequest> { TString SubjectType; TString SubjectId; - TQuotaUsage Quota; + TString MetricName; + ui64 Limit; ui64 LimitRequested; - TQuotaLimitChangeRequest(const TString& subjectType, const TString& subjectId) - : SubjectType(subjectType), SubjectId(subjectId) + TQuotaLimitChangeRequest(const TString& subjectType, const TString& subjectId, const TString& metricName, ui64 limit, ui64 limitRequested) + : SubjectType(subjectType), SubjectId(subjectId), MetricName(metricName), Limit(limit), LimitRequested(limitRequested) {} }; struct TQuotaLimitChangeResponse : public NActors::TEventLocal<TQuotaLimitChangeResponse, EvQuotaLimitChangeResponse> { TString SubjectType; TString SubjectId; - TQuotaUsage Quota; + TString MetricName; + ui64 Limit; ui64 LimitRequested; - TQuotaLimitChangeResponse(const TString& subjectType, const TString& subjectId) - : SubjectType(subjectType), SubjectId(subjectId) + TQuotaLimitChangeResponse(const TString& subjectType, const TString& subjectId, const TString& metricName, ui64 limit, ui64 limitRequested) + : SubjectType(subjectType), SubjectId(subjectId), MetricName(metricName), Limit(limit), LimitRequested(limitRequested) {} }; diff --git a/ydb/core/yq/libs/quota_manager/quota_manager.cpp b/ydb/core/yq/libs/quota_manager/quota_manager.cpp index 68ec8812e7..bb703cf257 100644 --- a/ydb/core/yq/libs/quota_manager/quota_manager.cpp +++ b/ydb/core/yq/libs/quota_manager/quota_manager.cpp @@ -46,9 +46,12 @@ struct TQuotaCachedUsage { }; struct TQuotaCache { - THashMap<NActors::TActorId /* Sender */, ui64 /* Cookie */> PendingRequests; + THashMap<NActors::TActorId /* Sender */, ui64 /* Cookie */> PendingUsageRequests; THashMap<TString /* MetricName */, TQuotaCachedUsage> UsageMap; THashSet<TString> PendingUsage; + THashSet<TString> PendingLimit; + NActors::TActorId PendingLimitRequest; + ui64 PendingLimitCookie; TInstant LoadedAt = TInstant::Zero(); }; @@ -159,6 +162,7 @@ private: hFunc(TEvQuotaService::TQuotaGetRequest, Handle) hFunc(TEvQuotaService::TQuotaChangeNotification, Handle) hFunc(TEvQuotaService::TQuotaUsageResponse, Handle) + hFunc(TEvQuotaService::TQuotaLimitChangeResponse, Handle) hFunc(TEvQuotaService::TQuotaSetRequest, Handle) hFunc(TEvents::TEvCallback, [](TEvents::TEvCallback::TPtr& ev) { ev->Get()->Callback(); } ); hFunc(NActors::TEvInterconnect::TEvNodesInfo, Handle) @@ -241,15 +245,15 @@ private: if (!cachedUsage.Usage.Usage || cachedUsage.Usage.Usage->UpdatedAt + UsageRefreshPeriod < Now()) { auto it = infoMap.find(metricName); if (it != infoMap.end()) { - if (it->second.UsageUpdater != NActors::TActorId{}) { + if (it->second.QuotaController != NActors::TActorId{}) { if (!cache.PendingUsage.contains(metricName)) { LOG_T(subjectType << "." << subjectId << "." << metricName << " IS STALE, Refreshing ..."); - Send(it->second.UsageUpdater, new TEvQuotaService::TQuotaUsageRequest(subjectType, subjectId, metricName)); + Send(it->second.QuotaController, new TEvQuotaService::TQuotaUsageRequest(subjectType, subjectId, metricName)); cache.PendingUsage.insert(metricName); cachedUsage.RequestedAt = Now(); } if (!pended) { - cache.PendingRequests.emplace(ev->Sender, ev->Cookie); + cache.PendingUsageRequests.emplace(ev->Sender, ev->Cookie); pended = true; } } @@ -260,12 +264,13 @@ private: if (!pended) { SendQuota(ev->Sender, ev->Cookie, subjectType, subjectId, cache); - cache.PendingRequests.erase(ev->Sender); + cache.PendingUsageRequests.erase(ev->Sender); } } void ChangeLimitsAndReply(const TString& subjectType, const TString& subjectId, TQuotaCache& cache, const TEvQuotaService::TQuotaSetRequest::TPtr& ev) { + auto pended = false; auto& infoMap = QuotaInfoMap[subjectType]; for (auto metricLimit : ev->Get()->Limits) { auto& metricName = metricLimit.first; @@ -274,16 +279,27 @@ private: if (it != cache.UsageMap.end()) { auto& cached = it->second; auto limit = metricLimit.second; - if (cached.Usage.Limit.Value == 0 || limit == 0 || limit > cached.Usage.Limit.Value) { - // check hard limit only if quota is increased - auto itI = infoMap.find(metricName); - if (itI != infoMap.end()) { - auto& info = itI->second; - if (info.HardLimit != 0 && (limit == 0 || limit > info.HardLimit)) { - limit = info.HardLimit; + + auto itI = infoMap.find(metricName); + if (itI != infoMap.end()) { + auto& info = itI->second; + if (info.QuotaController != NActors::TActorId{}) { + pended = true; + cache.PendingLimitRequest = ev->Sender; + cache.PendingLimitCookie = ev->Cookie; + cache.PendingLimit.insert(metricName); + Send(info.QuotaController, new TEvQuotaService::TQuotaLimitChangeRequest(subjectType, subjectId, metricName, cached.Usage.Limit.Value, limit)); + continue; + } else { + if (cached.Usage.Limit.Value == 0 || limit == 0 || limit > cached.Usage.Limit.Value) { + // check hard limit only if quota is increased + if (info.HardLimit != 0 && (limit == 0 || limit > info.HardLimit)) { + limit = info.HardLimit; + } } } } + if (cached.Usage.Limit.Value != limit) { cached.Usage.Limit.Value = limit; cached.Usage.Limit.UpdatedAt = Now(); @@ -293,13 +309,53 @@ private: } } - auto response = MakeHolder<TEvQuotaService::TQuotaSetResponse>(subjectType, subjectId); - for (auto it : cache.UsageMap) { - response->Limits.emplace(it.first, it.second.Usage.Limit.Value); + if (!pended) { + auto response = MakeHolder<TEvQuotaService::TQuotaSetResponse>(subjectType, subjectId); + for (auto it : cache.UsageMap) { + response->Limits.emplace(it.first, it.second.Usage.Limit.Value); + } + Send(ev->Sender, response.Release()); } - Send(ev->Sender, response.Release()); } + void Handle(TEvQuotaService::TQuotaLimitChangeResponse::TPtr& ev) { + auto subjectType = ev->Get()->SubjectType; + auto subjectId = ev->Get()->SubjectId; + auto metricName = ev->Get()->MetricName; + auto& subjectMap = QuotaCacheMap[subjectType]; + auto it = subjectMap.find(subjectId); + + if (it == subjectMap.end()) { + // if quotas are not cached - ignore usage update + return; + } + + auto& cache = it->second; + cache.PendingLimit.erase(metricName); + + auto itQ = cache.UsageMap.find(metricName); + if (itQ != cache.UsageMap.end()) { + // if metric is not defined - ignore usage update + auto& cached = itQ->second; + if (cached.Usage.Limit.Value != ev->Get()->Limit) { + cached.Usage.Limit.Value = ev->Get()->Limit; + cached.Usage.Limit.UpdatedAt = Now(); + LOG_T(cached.Usage.ToString(subjectType, subjectId, metricName) << " LIMIT Changed"); + SyncQuota(subjectType, subjectId, metricName, cached); + } + } + + if (cache.PendingLimit.size() == 0) { + if (cache.PendingLimitRequest != NActors::TActorId{}) { + auto response = MakeHolder<TEvQuotaService::TQuotaSetResponse>(subjectType, subjectId); + for (auto it : cache.UsageMap) { + response->Limits.emplace(it.first, it.second.Usage.Limit.Value); + } + Send(cache.PendingLimitRequest, response.Release()); + cache.PendingLimitRequest = NActors::TActorId{}; + } + } + } void ReadQuota(const TString& subjectType, const TString& subjectId, TReadQuotaExecuter::TCallback callback) { @@ -426,9 +482,9 @@ private: auto itm = metricMap.find(request.MetricName); if (itm != metricMap.end()) { auto& info = itm->second; - if (info.UsageUpdater != NActors::TActorId{}) { + if (info.QuotaController != NActors::TActorId{}) { LOG_T(request.SubjectType << "." << request.SubjectId << "." << request.MetricName << " FORCE UPDATE, Updating ..."); - Send(info.UsageUpdater, new TEvQuotaService::TQuotaUsageRequest(request.SubjectType, request.SubjectId, request.MetricName)); + Send(info.QuotaController, new TEvQuotaService::TQuotaUsageRequest(request.SubjectType, request.SubjectId, request.MetricName)); } } } @@ -566,21 +622,19 @@ private: cache.PendingUsage.erase(metricName); auto itQ = cache.UsageMap.find(metricName); - if (itQ == cache.UsageMap.end()) { + if (itQ != cache.UsageMap.end()) { // if metric is not defined - ignore usage update - return; + itQ->second.Usage.Usage = ev->Get()->Usage; + LOG_T(itQ->second.Usage.ToString(subjectType, subjectId, metricName) << " REFRESHED"); + SyncQuota(subjectType, subjectId, metricName, itQ->second); } - itQ->second.Usage.Usage = ev->Get()->Usage; - LOG_T(itQ->second.Usage.ToString(subjectType, subjectId, metricName) << " REFRESHED"); if (cache.PendingUsage.size() == 0) { - for (auto& itR : cache.PendingRequests) { + for (auto& itR : cache.PendingUsageRequests) { SendQuota(itR.first, itR.second, subjectType, subjectId, cache); } - cache.PendingRequests.clear(); + cache.PendingUsageRequests.clear(); } - - SyncQuota(subjectType, subjectId, metricName, itQ->second); } void Handle(TEvQuotaService::TQuotaSetRequest::TPtr& ev) { |