aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-08-03 10:04:26 +0300
committerhor911 <hor911@ydb.tech>2022-08-03 10:04:26 +0300
commit695ebb512048b415f75334b9701bbb1d6b70d3f3 (patch)
treec9d36dbf6c499a8258d88a3f1719faa6caa5a099
parentd34d0090f4ff0c5c7661d485ddd5d28a61ef3841 (diff)
downloadydb-695ebb512048b415f75334b9701bbb1d6b70d3f3.tar.gz
Quota update hook + cloud conformant rename
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp7
-rw-r--r--ydb/core/yq/libs/quota_manager/events/events.h34
-rw-r--r--ydb/core/yq/libs/quota_manager/quota_manager.cpp106
4 files changed, 108 insertions, 41 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
index 3baabb24de..4b91680385 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
@@ -367,6 +367,7 @@ public:
hFunc(TEvControlPlaneStorage::TEvDeleteRateLimiterResourceRequest, Handle);
hFunc(NMon::TEvHttpInfo, Handle);
hFunc(TEvQuotaService::TQuotaUsageRequest, Handle);
+ hFunc(TEvQuotaService::TQuotaLimitChangeRequest, Handle);
hFunc(TEvents::TEvCallback, [](TEvents::TEvCallback::TPtr& ev) { ev->Get()->Callback(); } );
)
@@ -402,6 +403,7 @@ public:
void Handle(TEvControlPlaneStorage::TEvNodesHealthCheckRequest::TPtr& ev);
void Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev);
+ void Handle(TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev);
template <class TEventPtr, class TRequestActor, ERequestTypeCommon requestType>
void HandleRateLimiterImpl(TEventPtr& ev);
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
index 630cc72aa5..9f0bac16a8 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
@@ -96,4 +96,11 @@ void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::T
Exec(DbPool, executable);
}
+void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev) {
+ auto& record = *ev->Get();
+ // LimitRequested is always accepted
+ Send(ev->Sender, new TEvQuotaService::TQuotaLimitChangeResponse(record.SubjectType, record.SubjectId, record.MetricName,
+ record.LimitRequested, record.LimitRequested));
+}
+
} // NYq
diff --git a/ydb/core/yq/libs/quota_manager/events/events.h b/ydb/core/yq/libs/quota_manager/events/events.h
index 2dded44bcb..c43204dad5 100644
--- a/ydb/core/yq/libs/quota_manager/events/events.h
+++ b/ydb/core/yq/libs/quota_manager/events/events.h
@@ -17,19 +17,21 @@
namespace NYq {
constexpr auto SUBJECT_TYPE_CLOUD = "cloud";
-constexpr auto QUOTA_RESULT_LIMIT = "fq.queryResultLimit.bytes";
-constexpr auto QUOTA_COUNT_LIMIT = "fq.queryLimit.count";
-constexpr auto QUOTA_TIME_LIMIT = "fq.queryLimit.ttl";
-constexpr auto QUOTA_CPU_LIMIT = "fq.cpu.count";
+
+constexpr auto QUOTA_CPU_LIMIT = "yq.cpu.count";
+constexpr auto QUOTA_COUNT_LIMIT = "yq.query.count";
+constexpr auto QUOTA_MEMORY_LIMIT = "yq.memory.size";
+constexpr auto QUOTA_TIME_LIMIT = "yq.ttlInSeconds.count";
+constexpr auto QUOTA_RESULT_LIMIT = "yq.result.size";
struct TQuotaInfo {
ui64 DefaultLimit;
ui64 HardLimit;
- NActors::TActorId UsageUpdater;
- TQuotaInfo(ui64 defaultLimit, ui64 hardLimit = 0, NActors::TActorId usageUpdater = {})
+ NActors::TActorId QuotaController;
+ TQuotaInfo(ui64 defaultLimit, ui64 hardLimit = 0, NActors::TActorId quotaController = {})
: DefaultLimit(defaultLimit)
, HardLimit(hardLimit)
- , UsageUpdater(usageUpdater)
+ , QuotaController(quotaController)
{
}
};
@@ -38,10 +40,10 @@ struct TQuotaDescription {
TString SubjectType;
TString MetricName;
TQuotaInfo Info;
- TQuotaDescription(const TString& subjectType, const TString& metricName, ui64 defaultLimit, ui64 hardLimit = 0, NActors::TActorId usageUpdater = {})
+ TQuotaDescription(const TString& subjectType, const TString& metricName, ui64 defaultLimit, ui64 hardLimit = 0, NActors::TActorId quotaController = {})
: SubjectType(subjectType)
, MetricName(metricName)
- , Info(defaultLimit, hardLimit, usageUpdater)
+ , Info(defaultLimit, hardLimit, quotaController)
{
}
};
@@ -227,20 +229,22 @@ struct TEvQuotaService {
struct TQuotaLimitChangeRequest : public NActors::TEventLocal<TQuotaLimitChangeRequest, EvQuotaLimitChangeRequest> {
TString SubjectType;
TString SubjectId;
- TQuotaUsage Quota;
+ TString MetricName;
+ ui64 Limit;
ui64 LimitRequested;
- TQuotaLimitChangeRequest(const TString& subjectType, const TString& subjectId)
- : SubjectType(subjectType), SubjectId(subjectId)
+ TQuotaLimitChangeRequest(const TString& subjectType, const TString& subjectId, const TString& metricName, ui64 limit, ui64 limitRequested)
+ : SubjectType(subjectType), SubjectId(subjectId), MetricName(metricName), Limit(limit), LimitRequested(limitRequested)
{}
};
struct TQuotaLimitChangeResponse : public NActors::TEventLocal<TQuotaLimitChangeResponse, EvQuotaLimitChangeResponse> {
TString SubjectType;
TString SubjectId;
- TQuotaUsage Quota;
+ TString MetricName;
+ ui64 Limit;
ui64 LimitRequested;
- TQuotaLimitChangeResponse(const TString& subjectType, const TString& subjectId)
- : SubjectType(subjectType), SubjectId(subjectId)
+ TQuotaLimitChangeResponse(const TString& subjectType, const TString& subjectId, const TString& metricName, ui64 limit, ui64 limitRequested)
+ : SubjectType(subjectType), SubjectId(subjectId), MetricName(metricName), Limit(limit), LimitRequested(limitRequested)
{}
};
diff --git a/ydb/core/yq/libs/quota_manager/quota_manager.cpp b/ydb/core/yq/libs/quota_manager/quota_manager.cpp
index 68ec8812e7..bb703cf257 100644
--- a/ydb/core/yq/libs/quota_manager/quota_manager.cpp
+++ b/ydb/core/yq/libs/quota_manager/quota_manager.cpp
@@ -46,9 +46,12 @@ struct TQuotaCachedUsage {
};
struct TQuotaCache {
- THashMap<NActors::TActorId /* Sender */, ui64 /* Cookie */> PendingRequests;
+ THashMap<NActors::TActorId /* Sender */, ui64 /* Cookie */> PendingUsageRequests;
THashMap<TString /* MetricName */, TQuotaCachedUsage> UsageMap;
THashSet<TString> PendingUsage;
+ THashSet<TString> PendingLimit;
+ NActors::TActorId PendingLimitRequest;
+ ui64 PendingLimitCookie;
TInstant LoadedAt = TInstant::Zero();
};
@@ -159,6 +162,7 @@ private:
hFunc(TEvQuotaService::TQuotaGetRequest, Handle)
hFunc(TEvQuotaService::TQuotaChangeNotification, Handle)
hFunc(TEvQuotaService::TQuotaUsageResponse, Handle)
+ hFunc(TEvQuotaService::TQuotaLimitChangeResponse, Handle)
hFunc(TEvQuotaService::TQuotaSetRequest, Handle)
hFunc(TEvents::TEvCallback, [](TEvents::TEvCallback::TPtr& ev) { ev->Get()->Callback(); } );
hFunc(NActors::TEvInterconnect::TEvNodesInfo, Handle)
@@ -241,15 +245,15 @@ private:
if (!cachedUsage.Usage.Usage || cachedUsage.Usage.Usage->UpdatedAt + UsageRefreshPeriod < Now()) {
auto it = infoMap.find(metricName);
if (it != infoMap.end()) {
- if (it->second.UsageUpdater != NActors::TActorId{}) {
+ if (it->second.QuotaController != NActors::TActorId{}) {
if (!cache.PendingUsage.contains(metricName)) {
LOG_T(subjectType << "." << subjectId << "." << metricName << " IS STALE, Refreshing ...");
- Send(it->second.UsageUpdater, new TEvQuotaService::TQuotaUsageRequest(subjectType, subjectId, metricName));
+ Send(it->second.QuotaController, new TEvQuotaService::TQuotaUsageRequest(subjectType, subjectId, metricName));
cache.PendingUsage.insert(metricName);
cachedUsage.RequestedAt = Now();
}
if (!pended) {
- cache.PendingRequests.emplace(ev->Sender, ev->Cookie);
+ cache.PendingUsageRequests.emplace(ev->Sender, ev->Cookie);
pended = true;
}
}
@@ -260,12 +264,13 @@ private:
if (!pended) {
SendQuota(ev->Sender, ev->Cookie, subjectType, subjectId, cache);
- cache.PendingRequests.erase(ev->Sender);
+ cache.PendingUsageRequests.erase(ev->Sender);
}
}
void ChangeLimitsAndReply(const TString& subjectType, const TString& subjectId, TQuotaCache& cache, const TEvQuotaService::TQuotaSetRequest::TPtr& ev) {
+ auto pended = false;
auto& infoMap = QuotaInfoMap[subjectType];
for (auto metricLimit : ev->Get()->Limits) {
auto& metricName = metricLimit.first;
@@ -274,16 +279,27 @@ private:
if (it != cache.UsageMap.end()) {
auto& cached = it->second;
auto limit = metricLimit.second;
- if (cached.Usage.Limit.Value == 0 || limit == 0 || limit > cached.Usage.Limit.Value) {
- // check hard limit only if quota is increased
- auto itI = infoMap.find(metricName);
- if (itI != infoMap.end()) {
- auto& info = itI->second;
- if (info.HardLimit != 0 && (limit == 0 || limit > info.HardLimit)) {
- limit = info.HardLimit;
+
+ auto itI = infoMap.find(metricName);
+ if (itI != infoMap.end()) {
+ auto& info = itI->second;
+ if (info.QuotaController != NActors::TActorId{}) {
+ pended = true;
+ cache.PendingLimitRequest = ev->Sender;
+ cache.PendingLimitCookie = ev->Cookie;
+ cache.PendingLimit.insert(metricName);
+ Send(info.QuotaController, new TEvQuotaService::TQuotaLimitChangeRequest(subjectType, subjectId, metricName, cached.Usage.Limit.Value, limit));
+ continue;
+ } else {
+ if (cached.Usage.Limit.Value == 0 || limit == 0 || limit > cached.Usage.Limit.Value) {
+ // check hard limit only if quota is increased
+ if (info.HardLimit != 0 && (limit == 0 || limit > info.HardLimit)) {
+ limit = info.HardLimit;
+ }
}
}
}
+
if (cached.Usage.Limit.Value != limit) {
cached.Usage.Limit.Value = limit;
cached.Usage.Limit.UpdatedAt = Now();
@@ -293,13 +309,53 @@ private:
}
}
- auto response = MakeHolder<TEvQuotaService::TQuotaSetResponse>(subjectType, subjectId);
- for (auto it : cache.UsageMap) {
- response->Limits.emplace(it.first, it.second.Usage.Limit.Value);
+ if (!pended) {
+ auto response = MakeHolder<TEvQuotaService::TQuotaSetResponse>(subjectType, subjectId);
+ for (auto it : cache.UsageMap) {
+ response->Limits.emplace(it.first, it.second.Usage.Limit.Value);
+ }
+ Send(ev->Sender, response.Release());
}
- Send(ev->Sender, response.Release());
}
+ void Handle(TEvQuotaService::TQuotaLimitChangeResponse::TPtr& ev) {
+ auto subjectType = ev->Get()->SubjectType;
+ auto subjectId = ev->Get()->SubjectId;
+ auto metricName = ev->Get()->MetricName;
+ auto& subjectMap = QuotaCacheMap[subjectType];
+ auto it = subjectMap.find(subjectId);
+
+ if (it == subjectMap.end()) {
+ // if quotas are not cached - ignore usage update
+ return;
+ }
+
+ auto& cache = it->second;
+ cache.PendingLimit.erase(metricName);
+
+ auto itQ = cache.UsageMap.find(metricName);
+ if (itQ != cache.UsageMap.end()) {
+ // if metric is not defined - ignore usage update
+ auto& cached = itQ->second;
+ if (cached.Usage.Limit.Value != ev->Get()->Limit) {
+ cached.Usage.Limit.Value = ev->Get()->Limit;
+ cached.Usage.Limit.UpdatedAt = Now();
+ LOG_T(cached.Usage.ToString(subjectType, subjectId, metricName) << " LIMIT Changed");
+ SyncQuota(subjectType, subjectId, metricName, cached);
+ }
+ }
+
+ if (cache.PendingLimit.size() == 0) {
+ if (cache.PendingLimitRequest != NActors::TActorId{}) {
+ auto response = MakeHolder<TEvQuotaService::TQuotaSetResponse>(subjectType, subjectId);
+ for (auto it : cache.UsageMap) {
+ response->Limits.emplace(it.first, it.second.Usage.Limit.Value);
+ }
+ Send(cache.PendingLimitRequest, response.Release());
+ cache.PendingLimitRequest = NActors::TActorId{};
+ }
+ }
+ }
void ReadQuota(const TString& subjectType, const TString& subjectId, TReadQuotaExecuter::TCallback callback) {
@@ -426,9 +482,9 @@ private:
auto itm = metricMap.find(request.MetricName);
if (itm != metricMap.end()) {
auto& info = itm->second;
- if (info.UsageUpdater != NActors::TActorId{}) {
+ if (info.QuotaController != NActors::TActorId{}) {
LOG_T(request.SubjectType << "." << request.SubjectId << "." << request.MetricName << " FORCE UPDATE, Updating ...");
- Send(info.UsageUpdater, new TEvQuotaService::TQuotaUsageRequest(request.SubjectType, request.SubjectId, request.MetricName));
+ Send(info.QuotaController, new TEvQuotaService::TQuotaUsageRequest(request.SubjectType, request.SubjectId, request.MetricName));
}
}
}
@@ -566,21 +622,19 @@ private:
cache.PendingUsage.erase(metricName);
auto itQ = cache.UsageMap.find(metricName);
- if (itQ == cache.UsageMap.end()) {
+ if (itQ != cache.UsageMap.end()) {
// if metric is not defined - ignore usage update
- return;
+ itQ->second.Usage.Usage = ev->Get()->Usage;
+ LOG_T(itQ->second.Usage.ToString(subjectType, subjectId, metricName) << " REFRESHED");
+ SyncQuota(subjectType, subjectId, metricName, itQ->second);
}
- itQ->second.Usage.Usage = ev->Get()->Usage;
- LOG_T(itQ->second.Usage.ToString(subjectType, subjectId, metricName) << " REFRESHED");
if (cache.PendingUsage.size() == 0) {
- for (auto& itR : cache.PendingRequests) {
+ for (auto& itR : cache.PendingUsageRequests) {
SendQuota(itR.first, itR.second, subjectType, subjectId, cache);
}
- cache.PendingRequests.clear();
+ cache.PendingUsageRequests.clear();
}
-
- SyncQuota(subjectType, subjectId, metricName, itQ->second);
}
void Handle(TEvQuotaService::TQuotaSetRequest::TPtr& ev) {