summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <[email protected]>2022-05-11 16:35:02 +0300
committerAleksandr Khoroshilov <[email protected]>2022-05-11 16:35:02 +0300
commit7f4ba675d925613a0103d77a14d688872543cbbc (patch)
tree3f76ee2d263e9dee47a7763c8c39b55a43dac859
parent8081dabe0c602237a85fac90c08c050810c240c3 (diff)
Quota Usage Update
ref:aa1759625d81930efc4aa45bbf5d6a69435edff4
-rw-r--r--ydb/core/yq/libs/config/protos/issue_id.proto2
-rw-r--r--ydb/core/yq/libs/config/protos/quotas_manager.proto1
-rw-r--r--ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp13
-rw-r--r--ydb/core/yq/libs/control_plane_storage/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h4
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp13
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp27
-rw-r--r--ydb/core/yq/libs/init/init.cpp7
-rw-r--r--ydb/core/yq/libs/quota_manager/events/events.h59
-rw-r--r--ydb/core/yq/libs/quota_manager/quota_manager.cpp151
-rw-r--r--ydb/core/yq/libs/quota_manager/quota_manager.h26
11 files changed, 275 insertions, 29 deletions
diff --git a/ydb/core/yq/libs/config/protos/issue_id.proto b/ydb/core/yq/libs/config/protos/issue_id.proto
index 4481b529654..9fbd4662010 100644
--- a/ydb/core/yq/libs/config/protos/issue_id.proto
+++ b/ydb/core/yq/libs/config/protos/issue_id.proto
@@ -17,5 +17,7 @@ message TIssuesIds {
EXPIRED = 1004;
UNSUPPORTED = 1005;
UNAUTHORIZED = 1006;
+ NOT_READY = 1007;
+ QUOTA_EXCEEDED = 1008;
}
}
diff --git a/ydb/core/yq/libs/config/protos/quotas_manager.proto b/ydb/core/yq/libs/config/protos/quotas_manager.proto
index b1e36cf5fcc..e0a47539205 100644
--- a/ydb/core/yq/libs/config/protos/quotas_manager.proto
+++ b/ydb/core/yq/libs/config/protos/quotas_manager.proto
@@ -20,4 +20,5 @@ message TQuotaList {
message TQuotasManagerConfig {
bool Enabled = 1;
repeated TQuotaList Quotas = 2;
+ string UsageRefreshPeriod = 3;
}
diff --git a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp
index 03dfec68fc5..73ed47736b5 100644
--- a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp
+++ b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp
@@ -102,15 +102,13 @@ public:
void Bootstrap() {
CPP_LOG_T("Get quotas bootstrap. Cloud id: " << Event->Get()->CloudId << " Actor id: " << SelfId());
- Become(&TGetQuotaActor::StateFunc);
- auto request = std::make_unique<TEvQuotaService::TQuotaGetRequest>();
- request->SubjectType = SUBJECT_TYPE_CLOUD;
- request->SubjectId = Event->Get()->CloudId;
- Send(MakeQuotaServiceActorId(), request.release(), 0, 0);
+ Become(&TGetQuotaActor::StateFunc, TDuration::Seconds(10), new NActors::TEvents::TEvWakeup());
+ Send(MakeQuotaServiceActorId(), new TEvQuotaService::TQuotaGetRequest(SUBJECT_TYPE_CLOUD, Event->Get()->CloudId));
}
STRICT_STFUNC(StateFunc,
hFunc(TEvQuotaService::TQuotaGetResponse, Handle);
+ cFunc(NActors::TEvents::TSystem::Wakeup, HandleTimeout);
)
void Handle(TEvQuotaService::TQuotaGetResponse::TPtr& ev) {
@@ -119,6 +117,11 @@ public:
TActivationContext::Send(Event->Forward(ControlPlaneProxyActorId()));
PassAway();
}
+
+ void HandleTimeout() {
+ CPP_LOG_D("Quota request timeout. Cloud id: " << Event->Get()->CloudId << " Actor id: " << SelfId());
+ Send(MakeQuotaServiceActorId(), new TEvQuotaService::TQuotaGetRequest(SUBJECT_TYPE_CLOUD, Event->Get()->CloudId, true));
+ }
};
template<class TEventRequest, class TResponseProxy>
diff --git a/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt b/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt
index 6db7c58dabf..ba0038e4a50 100644
--- a/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt
+++ b/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt
@@ -48,4 +48,5 @@ target_sources(yq-libs-control_plane_storage PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
)
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
index 8a2d404fc9e..e3e01ba4998 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
@@ -33,6 +33,7 @@
#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
#include <ydb/core/yq/libs/control_plane_storage/proto/yq_internal.pb.h>
#include <ydb/core/yq/libs/db_schema/db_schema.h>
+#include <ydb/core/yq/libs/quota_manager/events/events.h>
#include <ydb/core/yq/libs/ydb/util.h>
#include <ydb/core/yq/libs/ydb/ydb.h>
@@ -279,6 +280,7 @@ public:
hFunc(TEvControlPlaneStorage::TEvPingTaskRequest, Handle);
hFunc(TEvControlPlaneStorage::TEvNodesHealthCheckRequest, Handle);
hFunc(NMon::TEvHttpInfo, Handle);
+ hFunc(TEvQuotaService::TQuotaUsageRequest, Handle);
)
void Handle(TEvControlPlaneStorage::TEvCreateQueryRequest::TPtr& ev);
@@ -310,6 +312,8 @@ public:
void Handle(TEvControlPlaneStorage::TEvNodesHealthCheckRequest::TPtr& ev);
+ void Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev);
+
template<typename T>
NYql::TIssues ValidateConnection(T& ev, bool clickHousePasswordRequire = true)
{
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
index f37dd98eb9b..cf0a6657da1 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
@@ -108,6 +108,19 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
if (request.disposition().has_from_last_checkpoint()) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Streaming disposition \"from_last_checkpoint\" is not allowed in CreateQuery request"));
}
+
+ {
+ auto it = event.Quotas.find(QUOTA_COUNT_LIMIT);
+ if (it != event.Quotas.end()) {
+ auto& quota = it->second;
+ if (!quota.Usage) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::NOT_READY, "Control Plane is not ready yet. Please retry later."));
+ } else if (*quota.Usage > quota.Limit) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::QUOTA_EXCEEDED, Sprintf("Too many queries (%lu of %lu). Please delete other queries or increase limits.", *quota.Usage, quota.Limit)));
+ }
+ }
+ }
+
if (issues) {
CPS_LOG_D(MakeLogPrefix(scope, user, queryId)
<< "CreateQueryRequest, validation failed: "
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
new file mode 100644
index 00000000000..e14f9455572
--- /dev/null
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
@@ -0,0 +1,27 @@
+#include "ydb_control_plane_storage_impl.h"
+
+#include <cstdint>
+
+#include <util/datetime/base.h>
+#include <util/generic/yexception.h>
+#include <util/string/join.h>
+
+#include <ydb/core/yq/libs/common/entity_id.h>
+#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
+#include <ydb/core/yq/libs/control_plane_storage/schema.h>
+#include <ydb/core/yq/libs/db_schema/db_schema.h>
+
+#include <ydb/public/api/protos/yq.pb.h>
+#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
+
+#include <util/digest/multi.h>
+
+namespace NYq {
+
+void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev)
+{
+ auto& request = *ev->Get();
+ Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(request.SubjectType, request.SubjectId, request.MetricName, 0));
+}
+
+} // NYq
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index ab3de5915f3..bf27ed81006 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -263,9 +263,12 @@ void Init(
if (protoConfig.GetQuotasManager().GetEnabled()) {
auto quotaService = NYq::CreateQuotaServiceActor(
protoConfig.GetQuotasManager(),
- yqSharedResources,
+ /* yqSharedResources, */
serviceCounters.Counters,
- { TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_RESULT_LIMIT, 20_MB) });
+ {
+ TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_RESULT_LIMIT, 20_MB),
+ TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_COUNT_LIMIT, 100, NYq::ControlPlaneStorageServiceActorId())
+ });
actorRegistrator(NYq::MakeQuotaServiceActorId(), quotaService);
}
}
diff --git a/ydb/core/yq/libs/quota_manager/events/events.h b/ydb/core/yq/libs/quota_manager/events/events.h
index 05507def802..9187b1ba150 100644
--- a/ydb/core/yq/libs/quota_manager/events/events.h
+++ b/ydb/core/yq/libs/quota_manager/events/events.h
@@ -14,20 +14,26 @@ namespace NYq {
constexpr auto SUBJECT_TYPE_CLOUD = "cloud";
constexpr auto QUOTA_RESULT_LIMIT = "fq.queryResultLimit.bytes";
+constexpr auto QUOTA_COUNT_LIMIT = "fq.queryLimit.count";
-struct IQuotaCalculator {
- // ui64 GetQuotaUsage(const TString& quota, const TString& subjectType, const TString& subjectId) = 0;
+struct TQuotaInfo {
+ ui64 DefaultLimit;
+ NActors::TActorId UsageUpdater;
+ TQuotaInfo(ui64 defaultLimit, NActors::TActorId usageUpdater = {})
+ : DefaultLimit(defaultLimit)
+ , UsageUpdater(usageUpdater)
+ {
+ }
};
struct TQuotaDescription {
TString SubjectType;
TString MetricName;
- ui64 DefaultLimit;
- std::shared_ptr<IQuotaCalculator> QuotaCalculator;
- TQuotaDescription(const TString& subjectType, const TString& metricName, ui64 defaultLimit)
+ TQuotaInfo Info;
+ TQuotaDescription(const TString& subjectType, const TString& metricName, ui64 defaultLimit, NActors::TActorId usageUpdater = {})
: SubjectType(subjectType)
, MetricName(metricName)
- , DefaultLimit(defaultLimit)
+ , Info(defaultLimit, usageUpdater)
{
}
};
@@ -35,8 +41,10 @@ struct TQuotaDescription {
struct TQuotaUsage {
ui64 Limit;
TMaybe<ui64> Usage;
- TQuotaUsage(ui64 limit) : Limit(limit) {}
- TQuotaUsage(ui64 limit, ui64 usage) : Limit(limit), Usage(usage) {}
+ TInstant UpdatedAt;
+ TQuotaUsage(ui64 limit) : Limit(limit), UpdatedAt(TInstant::Zero()) {}
+ TQuotaUsage(ui64 limit, ui64 usage, const TInstant& updatedAt = Now())
+ : Limit(limit), Usage(usage), UpdatedAt(updatedAt) {}
};
using TQuotaMap = THashMap<TString, TQuotaUsage>;
@@ -46,6 +54,9 @@ struct TEvQuotaService {
enum EEv : ui32 {
EvQuotaGetRequest = YqEventSubspaceBegin(NYq::TYqEventSubspace::QuotaService),
EvQuotaGetResponse,
+ EvQuotaChangeNotification,
+ EvQuotaUsageRequest,
+ EvQuotaUsageResponse,
EvEnd,
};
@@ -54,12 +65,44 @@ struct TEvQuotaService {
struct TQuotaGetRequest : public NActors::TEventLocal<TQuotaGetRequest, EvQuotaGetRequest> {
TString SubjectType;
TString SubjectId;
+ bool AllowStaleUsage;
+ TQuotaGetRequest(const TString& subjectType, const TString& subjectId, bool allowStaleUsage = false)
+ : SubjectType(subjectType), SubjectId(subjectId), AllowStaleUsage(allowStaleUsage)
+ {}
};
// Quota request never fails, if no quota exist (i.e. SubjectType is incorrect) empty list will be returned
struct TQuotaGetResponse : public NActors::TEventLocal<TQuotaGetResponse, EvQuotaGetResponse> {
TQuotaMap Quotas;
};
+
+ struct TQuotaChangeNotification : public NActors::TEventLocal<TQuotaChangeNotification, EvQuotaChangeNotification> {
+ TString SubjectType;
+ TString SubjectId;
+ TString MetricName;
+ TQuotaChangeNotification(const TString& subjectType, const TString& subjectId, const TString& metricName)
+ : SubjectType(subjectType), SubjectId(subjectId), MetricName(metricName)
+ {}
+ };
+
+ struct TQuotaUsageRequest : public NActors::TEventLocal<TQuotaUsageRequest, EvQuotaUsageRequest> {
+ TString SubjectType;
+ TString SubjectId;
+ TString MetricName;
+ TQuotaUsageRequest(const TString& subjectType, const TString& subjectId, const TString& metricName)
+ : SubjectType(subjectType), SubjectId(subjectId), MetricName(metricName)
+ {}
+ };
+
+ struct TQuotaUsageResponse : public NActors::TEventLocal<TQuotaUsageResponse, EvQuotaUsageResponse> {
+ TString SubjectType;
+ TString SubjectId;
+ TString MetricName;
+ ui64 Usage;
+ TQuotaUsageResponse(const TString& subjectType, const TString& subjectId, const TString& metricName, ui64 usage)
+ : SubjectType(subjectType), SubjectId(subjectId), MetricName(metricName), Usage(usage)
+ {}
+ };
};
} /* NYq */
diff --git a/ydb/core/yq/libs/quota_manager/quota_manager.cpp b/ydb/core/yq/libs/quota_manager/quota_manager.cpp
index 1d532063454..2e3617f8ea5 100644
--- a/ydb/core/yq/libs/quota_manager/quota_manager.cpp
+++ b/ydb/core/yq/libs/quota_manager/quota_manager.cpp
@@ -5,6 +5,7 @@
#include <library/cpp/actors/core/log.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/core/yq/libs/control_plane_storage/util.h>
#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
#include <ydb/core/protos/services.pb.h>
@@ -21,22 +22,42 @@
namespace NYq {
NActors::TActorId MakeQuotaServiceActorId() {
- constexpr TStringBuf name = "FQQTASRV";
+ constexpr TStringBuf name = "FQ_QUOTA";
return NActors::TActorId(0, name);
}
+constexpr TDuration USAGE_REFRESH_PERIOD = TDuration::Seconds(10);
+
+struct TQuotaCachedUsage {
+ TQuotaUsage Usage;
+ TInstant RequestedAt = TInstant::Zero();
+ TQuotaCachedUsage(ui64 limit)
+ : Usage(limit) {}
+ TQuotaCachedUsage(ui64 limit, ui64 usage, const TInstant& updatedAt)
+ : Usage(limit, usage, updatedAt) {}
+};
+
+struct TQuotaCache {
+ THashMap<NActors::TActorId /* Sender */, ui64 /* Cookie */> PendingRequests;
+ THashMap<TString /* MetricName */, TQuotaCachedUsage> UsageMap;
+ THashSet<TString> PendingUsage;
+};
+
class TQuotaManagementService : public NActors::TActorBootstrapped<TQuotaManagementService> {
public:
TQuotaManagementService(
const NConfig::TQuotasManagerConfig& config,
- const NYq::TYqSharedResources::TPtr& yqSharedResources,
+ /* const NYq::TYqSharedResources::TPtr& yqSharedResources, */
const NMonitoring::TDynamicCounterPtr& counters,
- std::vector<TQuotaDescription> quotaDesc)
+ std::vector<TQuotaDescription> quotaDescriptions)
: Config(config)
, ServiceCounters(counters->GetSubgroup("subsystem", "QuotaService"))
{
- Y_UNUSED(yqSharedResources);
- Y_UNUSED(quotaDesc);
+ /* Y_UNUSED(yqSharedResources); */
+ for (auto& description : quotaDescriptions) {
+ QuotaInfoMap[description.SubjectType].emplace(description.MetricName, description.Info);
+ }
+ UsageRefreshPeriod = GetDuration(Config.GetUsageRefreshPeriod(), USAGE_REFRESH_PERIOD);
}
static constexpr char ActorName[] = "FQ_QUOTA_SERVICE";
@@ -49,30 +70,134 @@ public:
private:
STRICT_STFUNC(StateFunc,
hFunc(TEvQuotaService::TQuotaGetRequest, Handle)
+ hFunc(TEvQuotaService::TQuotaChangeNotification, Handle)
+ hFunc(TEvQuotaService::TQuotaUsageResponse, Handle)
);
void Handle(TEvQuotaService::TQuotaGetRequest::TPtr& ev) {
- auto response = MakeHolder<TEvQuotaService::TQuotaGetResponse>();
- for (const auto& quota : Config.GetQuotas()) {
- if (quota.GetSubjectType() == ev->Get()->SubjectType && quota.GetSubjectId() == ev->Get()->SubjectId) {
- for (const auto& limit : quota.GetLimit()) {
- response->Quotas.emplace(limit.GetName(), TQuotaUsage(limit.GetLimit()));
+ auto subjectType = ev->Get()->SubjectType;
+ auto subjectId = ev->Get()->SubjectId;
+ auto& subjectMap = QuotaCacheMap[subjectType];
+ auto it = subjectMap.find(subjectId);
+
+ bool pended = false;
+ auto& infoMap = QuotaInfoMap[subjectType];
+
+ // Load into cache, if needed
+ if (it == subjectMap.end()) {
+ TQuotaCache cache;
+ // 1. Load from Config
+ for (const auto& quota : Config.GetQuotas()) {
+ if (quota.GetSubjectType() == subjectType && quota.GetSubjectId() == subjectId) {
+ for (const auto& limit : quota.GetLimit()) {
+ cache.UsageMap.emplace(limit.GetName(), TQuotaCachedUsage(limit.GetLimit()));
+ }
+ }
+ }
+ // 2. Load from DB (TBD)
+ // 3. Append defaults
+ for (auto& it : infoMap) {
+ if (cache.UsageMap.find(it.first) == cache.UsageMap.end()) {
+ cache.UsageMap.emplace(it.first, TQuotaCachedUsage(it.second.DefaultLimit));
+ }
+ }
+ subjectMap.emplace(subjectId, cache);
+ }
+
+ auto& cache = subjectMap[subjectId];
+ if (!ev->Get()->AllowStaleUsage) {
+ // Refresh usage
+ for (auto& itUsage : cache.UsageMap) {
+ auto metricName = itUsage.first;
+ auto& cachedUsage = itUsage.second;
+ if (cachedUsage.Usage.UpdatedAt + UsageRefreshPeriod < Now()) {
+ auto it = infoMap.find(metricName);
+ if (it != infoMap.end()) {
+ if (it->second.UsageUpdater != NActors::TActorId{}) {
+ if (!cache.PendingUsage.contains(metricName)) {
+ Send(it->second.UsageUpdater, new TEvQuotaService::TQuotaUsageRequest(subjectType, subjectId, metricName));
+ cache.PendingUsage.insert(metricName);
+ cachedUsage.RequestedAt = Now();
+ }
+ if (!pended) {
+ cache.PendingRequests.emplace(ev->Sender, ev->Cookie);
+ pended = true;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if (!pended) {
+ auto response = MakeHolder<TEvQuotaService::TQuotaGetResponse>();
+ for (auto it : cache.UsageMap) {
+ response->Quotas.emplace(it.first, it.second.Usage);
+ }
+ Send(ev->Sender, response.Release());
+ cache.PendingRequests.erase(ev->Sender);
+ }
+ }
+
+ void Handle(TEvQuotaService::TQuotaChangeNotification::TPtr& ev) {
+ auto& request = *ev->Get();
+ auto itt = QuotaInfoMap.find(request.SubjectType);
+ if (itt != QuotaInfoMap.end()) {
+ auto& metricMap = itt->second;
+ auto itm = metricMap.find(request.MetricName);
+ if (itm != metricMap.end()) {
+ auto& info = itm->second;
+ if (info.UsageUpdater != NActors::TActorId{}) {
+ Send(info.UsageUpdater, new TEvQuotaService::TQuotaUsageRequest(request.SubjectType, request.SubjectId, request.MetricName));
+ }
+ }
+ }
+ }
+
+ void Handle(TEvQuotaService::TQuotaUsageResponse::TPtr& ev) {
+ auto subjectType = ev->Get()->SubjectType;
+ auto subjectId = ev->Get()->SubjectId;
+ auto metricName = ev->Get()->MetricName;
+ auto& subjectMap = QuotaCacheMap[subjectType];
+ auto it = subjectMap.find(subjectId);
+
+ if (it == subjectMap.end()) {
+ return;
+ }
+
+ auto& cache = it->second;
+ cache.PendingUsage.erase(metricName);
+
+ auto itQ = cache.UsageMap.find(metricName);
+ if (itQ != cache.UsageMap.end()) {
+ itQ->second.Usage.Usage = ev->Get()->Usage;
+ }
+
+ if (cache.PendingUsage.size() == 0) {
+ for (auto& itR : cache.PendingRequests) {
+ auto response = MakeHolder<TEvQuotaService::TQuotaGetResponse>();
+ for (auto it : cache.UsageMap) {
+ response->Quotas.emplace(it.first, it.second.Usage);
}
+ Send(itR.first, response.Release());
}
+ cache.PendingRequests.clear();
}
- Send(ev->Sender, response.Release());
}
NConfig::TQuotasManagerConfig Config;
const NMonitoring::TDynamicCounterPtr ServiceCounters;
+ THashMap<TString /* SubjectType */, THashMap<TString /* MetricName */, TQuotaInfo>> QuotaInfoMap;
+ THashMap<TString /* SubjectType */, THashMap<TString /* SubjectId */, TQuotaCache>> QuotaCacheMap;
+ TDuration UsageRefreshPeriod;
};
NActors::IActor* CreateQuotaServiceActor(
const NConfig::TQuotasManagerConfig& config,
- const NYq::TYqSharedResources::TPtr& yqSharedResources,
+ /* const NYq::TYqSharedResources::TPtr& yqSharedResources, */
const NMonitoring::TDynamicCounterPtr& counters,
std::vector<TQuotaDescription> quotaDesc) {
- return new TQuotaManagementService(config, yqSharedResources, counters, quotaDesc);
+ return new TQuotaManagementService(config, /* yqSharedResources, */ counters, quotaDesc);
}
} /* NYq */
diff --git a/ydb/core/yq/libs/quota_manager/quota_manager.h b/ydb/core/yq/libs/quota_manager/quota_manager.h
index 6db4d1deaf3..b28594fe698 100644
--- a/ydb/core/yq/libs/quota_manager/quota_manager.h
+++ b/ydb/core/yq/libs/quota_manager/quota_manager.h
@@ -7,13 +7,37 @@
#include <library/cpp/monlib/dynamic_counters/counters.h>
+/*
+
+ 1. TQuotaGetRequest is sent to get actual quota limits and usage from QM. QM guarantees actual limits and only eventually
+ consistent usage. Also QM provides no upper time for reply. Client must control timeout and may resent the message with
+ AllowStaleUsage = true to avoid waiting for usage requests (in any case it will wait for limits to be loaded into cache).
+ If message is resent N times client must be ready to receive from 1 up to (1 + N) replies and my use Cookies to dedup them
+
+ 2. TQuotaGetResponse contains actual limits and may provide last known usages. Client may check UsageUpdatedAt to decide if
+ value provided is fresh enough. If usage is missed or stale, client must reply upstream with retriable error "quota is
+ not accessible yet"
+
+ 3. TQuotaUsageRequest is sent by QM to handlers configured to calculate quota usage. For sake of simplicity QM requires no
+ time or event reply quarantees. Handler must do the best to reply as soon as possible but may delay or event discard the
+ request. But it should reply with the same Cookie (QM uses it to match reply with request). Also handler may receive another
+ request before previous is replied. To mitigate overload handler may implement throttling on its side
+
+ 4. TQuotaUsageResponse is sent by usage calculation handler back to QM. Cookie must be kept. Also handler my sent unpaired
+ message any time to refresh usage in pull mode with Cookie == 0. QM will refresh usage or discard the message if Subject
+ limits are not in the cache
+
+ 5. TQuotaChangeNotification may be sent from any point to toggle usage refresh if there is reason for usage to change
+
+*/
+
namespace NYq {
NActors::TActorId MakeQuotaServiceActorId();
NActors::IActor* CreateQuotaServiceActor(
const NConfig::TQuotasManagerConfig& config,
- const NYq::TYqSharedResources::TPtr& yqSharedResources,
+ /* const NYq::TYqSharedResources::TPtr& yqSharedResources, */
const NMonitoring::TDynamicCounterPtr& counters,
std::vector<TQuotaDescription> quotaDesc);