aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-08-01 13:33:20 +0300
committerhor911 <hor911@ydb.tech>2022-08-01 13:33:20 +0300
commit5cbf4c3a664cee3d79b1b449b252fd28c373ee98 (patch)
treed584ee10f5854c3d23a5476157e1e37dd6133835
parent08c896760519c0f6e107dc6afcc227a21e812be8 (diff)
downloadydb-5cbf4c3a664cee3d79b1b449b252fd28c373ee98.tar.gz
Quota DB persistance
-rw-r--r--CMakeLists.darwin.txt1
-rw-r--r--CMakeLists.linux.txt1
-rw-r--r--ydb/core/protos/services.proto3
-rw-r--r--ydb/core/yq/libs/config/protos/quotas_manager.proto5
-rw-r--r--ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp4
-rw-r--r--ydb/core/yq/libs/control_plane_storage/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/control_plane_storage/schema.h2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp8
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp8
-rw-r--r--ydb/core/yq/libs/init/init.cpp6
-rw-r--r--ydb/core/yq/libs/quota_manager/events/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/quota_manager/events/events.cpp15
-rw-r--r--ydb/core/yq/libs/quota_manager/events/events.h48
-rw-r--r--ydb/core/yq/libs/quota_manager/proto/CMakeLists.txt31
-rw-r--r--ydb/core/yq/libs/quota_manager/proto/quota_internal.proto18
-rw-r--r--ydb/core/yq/libs/quota_manager/quota_manager.cpp540
-rw-r--r--ydb/core/yq/libs/quota_manager/quota_manager.h6
-rw-r--r--ydb/core/yq/libs/quota_manager/quota_proxy.cpp14
-rw-r--r--ydb/core/yq/libs/shared_resources/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/shared_resources/db_exec.h55
20 files changed, 628 insertions, 140 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 139c1595f4..f93658dc33 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -771,6 +771,7 @@ add_subdirectory(ydb/core/yq/libs/common)
add_subdirectory(ydb/core/yq/libs/control_plane_storage/events)
add_subdirectory(ydb/core/yq/libs/control_plane_storage/proto)
add_subdirectory(ydb/core/yq/libs/quota_manager/events)
+add_subdirectory(ydb/core/yq/libs/quota_manager/proto)
add_subdirectory(ydb/core/yq/libs/control_plane_storage)
add_subdirectory(library/cpp/protobuf/interop)
add_subdirectory(ydb/core/yq/libs/config)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 51a9ab4284..49fb1cec4e 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -775,6 +775,7 @@ add_subdirectory(ydb/core/yq/libs/common)
add_subdirectory(ydb/core/yq/libs/control_plane_storage/events)
add_subdirectory(ydb/core/yq/libs/control_plane_storage/proto)
add_subdirectory(ydb/core/yq/libs/quota_manager/events)
+add_subdirectory(ydb/core/yq/libs/quota_manager/proto)
add_subdirectory(ydb/core/yq/libs/control_plane_storage)
add_subdirectory(library/cpp/protobuf/interop)
add_subdirectory(ydb/core/yq/libs/config)
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 69f74043b1..8f27f228f1 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -301,9 +301,10 @@ enum EServiceKikimr {
YQ_AUDIT = 1150;
YQ_AUDIT_EVENT_SENDER = 1151;
YQ_HEALTH = 1152;
- YQ_RATE_LIMITER = 1155;
FQ_INTERNAL_SERVICE = 1153;
FQ_QUOTA_SERVICE = 1154;
+ YQ_RATE_LIMITER = 1155;
+ FQ_QUOTA_PROXY = 1156;
// 1024 - 1099 is reserved for nbs
diff --git a/ydb/core/yq/libs/config/protos/quotas_manager.proto b/ydb/core/yq/libs/config/protos/quotas_manager.proto
index e3119141d8..8defdaee6d 100644
--- a/ydb/core/yq/libs/config/protos/quotas_manager.proto
+++ b/ydb/core/yq/libs/config/protos/quotas_manager.proto
@@ -20,6 +20,7 @@ message TQuotaList {
message TQuotasManagerConfig {
bool Enabled = 1;
repeated TQuotaList Quotas = 2;
- string UsageRefreshPeriod = 3;
- bool EnablePermissions = 4;
+ string LimitRefreshPeriod = 3;
+ string UsageRefreshPeriod = 4;
+ bool EnablePermissions = 5;
}
diff --git a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp
index d0c8190729..d85c0352d5 100644
--- a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp
+++ b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp
@@ -103,7 +103,7 @@ public:
void Bootstrap() {
CPP_LOG_T("Get quotas bootstrap. Cloud id: " << Event->Get()->CloudId << " Actor id: " << SelfId());
Become(&TGetQuotaActor::StateFunc, TDuration::Seconds(10), new NActors::TEvents::TEvWakeup());
- Send(MakeQuotaServiceActorId(), new TEvQuotaService::TQuotaGetRequest(SUBJECT_TYPE_CLOUD, Event->Get()->CloudId));
+ Send(MakeQuotaServiceActorId(SelfId().NodeId()), new TEvQuotaService::TQuotaGetRequest(SUBJECT_TYPE_CLOUD, Event->Get()->CloudId));
}
STRICT_STFUNC(StateFunc,
@@ -120,7 +120,7 @@ public:
void HandleTimeout() {
CPP_LOG_D("Quota request timeout. Cloud id: " << Event->Get()->CloudId << " Actor id: " << SelfId());
- Send(MakeQuotaServiceActorId(), new TEvQuotaService::TQuotaGetRequest(SUBJECT_TYPE_CLOUD, Event->Get()->CloudId, true));
+ Send(MakeQuotaServiceActorId(SelfId().NodeId()), new TEvQuotaService::TQuotaGetRequest(SUBJECT_TYPE_CLOUD, Event->Get()->CloudId, true));
}
};
diff --git a/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt b/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt
index ba0038e4a5..a1fa82c7c3 100644
--- a/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt
+++ b/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt
@@ -27,6 +27,7 @@ target_link_libraries(yq-libs-control_plane_storage PUBLIC
libs-control_plane_storage-proto
yq-libs-db_schema
libs-graph_params-proto
+ libs-quota_manager-events
yq-libs-shared_resources
yq-libs-ydb
ydb-library-security
diff --git a/ydb/core/yq/libs/control_plane_storage/schema.h b/ydb/core/yq/libs/control_plane_storage/schema.h
index 64dfa936f7..33b9ede7a4 100644
--- a/ydb/core/yq/libs/control_plane_storage/schema.h
+++ b/ydb/core/yq/libs/control_plane_storage/schema.h
@@ -75,6 +75,8 @@ namespace NYq {
#define SUBJECT_ID_COLUMN_NAME "subject_id"
#define METRIC_NAME_COLUMN_NAME "metric_name"
#define METRIC_LIMIT_COLUMN_NAME "metric_limit"
+#define LIMIT_UPDATED_AT_COLUMN_NAME "limit_updated_at"
#define METRIC_USAGE_COLUMN_NAME "metric_usage"
+#define USAGE_UPDATED_AT_COLUMN_NAME "usage_updated_at"
} // namespace NYq
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
index 94c6280621..a20039f000 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
@@ -46,7 +46,7 @@ void TYdbControlPlaneStorageActor::Bootstrap() {
CreateResultSetsTable();
CreateJobsTable();
CreateNodesTable();
- // CreateQuotasTable(); // not yet
+ CreateQuotasTable();
Become(&TThis::StateFunc);
}
@@ -240,8 +240,10 @@ void TYdbControlPlaneStorageActor::CreateQuotasTable()
.AddNullableColumn(SUBJECT_TYPE_COLUMN_NAME, EPrimitiveType::String)
.AddNullableColumn(SUBJECT_ID_COLUMN_NAME, EPrimitiveType::String)
.AddNullableColumn(METRIC_NAME_COLUMN_NAME, EPrimitiveType::String)
- .AddNullableColumn(METRIC_LIMIT_COLUMN_NAME, EPrimitiveType::Int64)
- .AddNullableColumn(METRIC_USAGE_COLUMN_NAME, EPrimitiveType::Int64)
+ .AddNullableColumn(METRIC_LIMIT_COLUMN_NAME, EPrimitiveType::Uint64)
+ .AddNullableColumn(LIMIT_UPDATED_AT_COLUMN_NAME, EPrimitiveType::Timestamp)
+ .AddNullableColumn(METRIC_USAGE_COLUMN_NAME, EPrimitiveType::Uint64)
+ .AddNullableColumn(USAGE_UPDATED_AT_COLUMN_NAME, EPrimitiveType::Timestamp)
.SetPrimaryKeyColumns({SUBJECT_TYPE_COLUMN_NAME, SUBJECT_ID_COLUMN_NAME, METRIC_NAME_COLUMN_NAME})
.Build();
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
index dec8d678b7..7b2354e021 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
@@ -80,9 +80,9 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
const TEvControlPlaneStorage::TEvCreateQueryRequest& event = *ev->Get();
const TString cloudId = event.CloudId;
auto it = event.Quotas.find(QUOTA_RESULT_LIMIT);
- ui64 resultLimit = (it != event.Quotas.end()) ? it->second.Limit : 0;
+ ui64 resultLimit = (it != event.Quotas.end()) ? it->second.Limit.Value : 0;
auto exec_ttl_it = event.Quotas.find(QUOTA_TIME_LIMIT);
- ui64 executionLimitMills = (exec_ttl_it != event.Quotas.end()) ? exec_ttl_it->second.Limit : 0;
+ ui64 executionLimitMills = (exec_ttl_it != event.Quotas.end()) ? exec_ttl_it->second.Limit.Value : 0;
const TString scope = event.Scope;
TRequestCountersPtr requestCounters = Counters.GetScopeCounters(cloudId, scope, RTS_CREATE_QUERY);
requestCounters->InFly->Inc();
@@ -118,8 +118,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
auto& quota = it->second;
if (!quota.Usage) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::NOT_READY, "Control Plane is not ready yet. Please retry later."));
- } else if (*quota.Usage >= quota.Limit) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::QUOTA_EXCEEDED, Sprintf("Too many queries (%lu of %lu). Please delete other queries or increase limits.", *quota.Usage, quota.Limit)));
+ } else if (quota.Usage->Value >= quota.Limit.Value) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::QUOTA_EXCEEDED, Sprintf("Too many queries (%lu of %lu). Please delete other queries or increase limits.", quota.Usage->Value, quota.Limit.Value)));
}
}
}
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 5141c31a36..c8d11788c9 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -284,14 +284,16 @@ void Init(
if (protoConfig.GetQuotasManager().GetEnabled()) {
auto quotaService = NYq::CreateQuotaServiceActor(
protoConfig.GetQuotasManager(),
- /* yqSharedResources, */
+ protoConfig.GetControlPlaneStorage().GetStorage(),
+ yqSharedResources,
+ credentialsProviderFactory,
serviceCounters.Counters,
{
TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_RESULT_LIMIT, 20_MB, 2_GB),
TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_COUNT_LIMIT, 100, 200, NYq::ControlPlaneStorageServiceActorId()),
TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_TIME_LIMIT, 0)
});
- actorRegistrator(NYq::MakeQuotaServiceActorId(), quotaService);
+ actorRegistrator(NYq::MakeQuotaServiceActorId(nodeId), quotaService);
auto quotaProxy = NYq::CreateQuotaProxyActor(
protoConfig.GetQuotasManager(),
diff --git a/ydb/core/yq/libs/quota_manager/events/CMakeLists.txt b/ydb/core/yq/libs/quota_manager/events/CMakeLists.txt
index 0b39cb10db..2ef3752564 100644
--- a/ydb/core/yq/libs/quota_manager/events/CMakeLists.txt
+++ b/ydb/core/yq/libs/quota_manager/events/CMakeLists.txt
@@ -12,6 +12,7 @@ target_link_libraries(libs-quota_manager-events PUBLIC
contrib-libs-cxxsupp
yutil
yq-libs-events
+ libs-quota_manager-proto
)
target_sources(libs-quota_manager-events PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/quota_manager/events/events.cpp
diff --git a/ydb/core/yq/libs/quota_manager/events/events.cpp b/ydb/core/yq/libs/quota_manager/events/events.cpp
index b4d595d7a6..f065785565 100644
--- a/ydb/core/yq/libs/quota_manager/events/events.cpp
+++ b/ydb/core/yq/libs/quota_manager/events/events.cpp
@@ -1 +1,14 @@
-#include "events.h" \ No newline at end of file
+#include "events.h"
+
+namespace NYq {
+
+void TQuotaUsage::Merge(const TQuotaUsage& other) {
+ if (other.Limit.UpdatedAt > Limit.UpdatedAt) {
+ Limit = other.Limit;
+ }
+ if (!Usage || (other.Usage && other.Usage->UpdatedAt > Usage->UpdatedAt)) {
+ Usage = other.Usage;
+ }
+}
+
+} /* NYq */ \ No newline at end of file
diff --git a/ydb/core/yq/libs/quota_manager/events/events.h b/ydb/core/yq/libs/quota_manager/events/events.h
index 0b87393bfe..21c1e06175 100644
--- a/ydb/core/yq/libs/quota_manager/events/events.h
+++ b/ydb/core/yq/libs/quota_manager/events/events.h
@@ -12,6 +12,8 @@
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/event_local.h>
+#include <ydb/core/yq/libs/quota_manager/proto/quota_internal.pb.h>
+
namespace NYq {
constexpr auto SUBJECT_TYPE_CLOUD = "cloud";
@@ -43,15 +45,39 @@ struct TQuotaDescription {
}
};
-struct TQuotaUsage {
- ui64 Limit;
- TMaybe<ui64> Usage;
+template <typename T>
+struct TTimedValue {
+ T Value;
TInstant UpdatedAt;
+ TTimedValue() = default;
+ TTimedValue(const TTimedValue&) = default;
+ TTimedValue(T value, const TInstant& updatedAt = TInstant::Zero()) : Value(value), UpdatedAt(updatedAt) {}
+};
+
+using TTimedUint64 = TTimedValue<ui64>;
+
+struct TQuotaUsage {
+ TTimedUint64 Limit;
+ TMaybe<TTimedUint64> Usage;
TQuotaUsage() = default;
TQuotaUsage(const TQuotaUsage&) = default;
- TQuotaUsage(ui64 limit) : Limit(limit), UpdatedAt(TInstant::Zero()) {}
- TQuotaUsage(ui64 limit, ui64 usage, const TInstant& updatedAt = Now())
- : Limit(limit), Usage(usage), UpdatedAt(updatedAt) {}
+ TQuotaUsage(ui64 limit, const TInstant& limitUpdatedAt = Now()) : Limit(limit, limitUpdatedAt) {}
+ TQuotaUsage(ui64 limit, const TInstant& limitUpdatedAt, ui64 usage, const TInstant& usageUpdatedAt = Now())
+ : Limit(limit, limitUpdatedAt), Usage(NMaybe::TInPlace{}, usage, usageUpdatedAt) {}
+ void Merge(const TQuotaUsage& other);
+ TString ToString() {
+ return (Usage ? std::to_string(Usage->Value) : "*") + "/" + std::to_string(Limit.Value);
+ }
+ TString ToString(const TString& subjectType, const TString& subjectId, const TString& metricName) {
+ TStringBuilder builder;
+ builder << subjectType << "." << subjectId << "." << metricName << "=" << ToString();
+ return builder;
+ }
+ TString ToString(const TString& metricName) {
+ TStringBuilder builder;
+ builder << metricName << "=" << ToString();
+ return builder;
+ }
};
using TQuotaMap = THashMap<TString, TQuotaUsage>;
@@ -73,6 +99,7 @@ struct TEvQuotaService {
EvQuotaSetResponse,
EvQuotaLimitChangeRequest,
EvQuotaLimitChangeResponse,
+ EvQuotaUpdateNotification,
EvEnd,
};
@@ -216,6 +243,15 @@ struct TEvQuotaService {
{}
};
+ struct TEvQuotaUpdateNotification : public NActors::TEventPB<TEvQuotaUpdateNotification,
+ Fq::Quota::EvQuotaUpdateNotification, EvQuotaUpdateNotification> {
+
+ TEvQuotaUpdateNotification() = default;
+ TEvQuotaUpdateNotification(const Fq::Quota::EvQuotaUpdateNotification& protoMessage)
+ : NActors::TEventPB<TEvQuotaUpdateNotification, Fq::Quota::EvQuotaUpdateNotification, EvQuotaUpdateNotification>(protoMessage) {
+
+ }
+ };
};
} /* NYq */
diff --git a/ydb/core/yq/libs/quota_manager/proto/CMakeLists.txt b/ydb/core/yq/libs/quota_manager/proto/CMakeLists.txt
new file mode 100644
index 0000000000..6e92863aed
--- /dev/null
+++ b/ydb/core/yq/libs/quota_manager/proto/CMakeLists.txt
@@ -0,0 +1,31 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(libs-quota_manager-proto)
+target_link_libraries(libs-quota_manager-proto PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-protobuf
+)
+target_proto_messages(libs-quota_manager-proto PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/quota_manager/proto/quota_internal.proto
+)
+target_proto_addincls(libs-quota_manager-proto
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(libs-quota_manager-proto
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
diff --git a/ydb/core/yq/libs/quota_manager/proto/quota_internal.proto b/ydb/core/yq/libs/quota_manager/proto/quota_internal.proto
new file mode 100644
index 0000000000..9d32ac9690
--- /dev/null
+++ b/ydb/core/yq/libs/quota_manager/proto/quota_internal.proto
@@ -0,0 +1,18 @@
+syntax = "proto3";
+option cc_enable_arenas = true;
+
+package Fq.Quota;
+
+import "google/protobuf/timestamp.proto";
+
+////////////////////////////////////////////////////////////
+
+message EvQuotaUpdateNotification {
+ string subject_type = 1;
+ string subject_id = 2;
+ string metric_name = 3;
+ uint64 metric_limit = 4;
+ google.protobuf.Timestamp limit_updated_at = 5;
+ uint64 metric_usage = 6;
+ google.protobuf.Timestamp usage_updated_at = 7;
+}
diff --git a/ydb/core/yq/libs/quota_manager/quota_manager.cpp b/ydb/core/yq/libs/quota_manager/quota_manager.cpp
index 94eb9529a8..68ec8812e7 100644
--- a/ydb/core/yq/libs/quota_manager/quota_manager.cpp
+++ b/ydb/core/yq/libs/quota_manager/quota_manager.cpp
@@ -1,11 +1,13 @@
#include "quota_manager.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/interconnect/interconnect_impl.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/core/yq/libs/control_plane_storage/util.h>
+#include <ydb/core/yq/libs/shared_resources/db_exec.h>
#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
#include <ydb/core/protos/services.pb.h>
@@ -18,51 +20,136 @@
LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream)
#define LOG_D(stream) \
LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream)
+#define LOG_T(stream) \
+ LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream)
namespace NYq {
-NActors::TActorId MakeQuotaServiceActorId() {
+NActors::TActorId MakeQuotaServiceActorId(ui32 nodeId) {
constexpr TStringBuf name = "FQ_QUOTA";
- return NActors::TActorId(0, name);
+ return NActors::TActorId(nodeId, name);
}
+constexpr TDuration LIMIT_REFRESH_PERIOD = TDuration::Minutes(1);
constexpr TDuration USAGE_REFRESH_PERIOD = TDuration::Seconds(10);
struct TQuotaCachedUsage {
TQuotaUsage Usage;
TInstant RequestedAt = TInstant::Zero();
+ bool SyncInProgress = false;
+ bool ChangedAfterSync = false;
+ TQuotaCachedUsage() = default;
TQuotaCachedUsage(ui64 limit)
: Usage(limit) {}
- TQuotaCachedUsage(ui64 limit, ui64 usage, const TInstant& updatedAt)
- : Usage(limit, usage, updatedAt) {}
+ TQuotaCachedUsage(ui64 limit, const TInstant& limitUpdatedAt, ui64 usage, const TInstant& usageUpdatedAt)
+ : Usage(limit, limitUpdatedAt, usage, usageUpdatedAt) {}
};
struct TQuotaCache {
THashMap<NActors::TActorId /* Sender */, ui64 /* Cookie */> PendingRequests;
THashMap<TString /* MetricName */, TQuotaCachedUsage> UsageMap;
THashSet<TString> PendingUsage;
+ TInstant LoadedAt = TInstant::Zero();
};
+struct TReadQuotaState {
+ TString SubjectType;
+ TString SubjectId;
+ THashMap<TString /* MetricName */, TQuotaUsage> UsageMap;
+};
+
+using TReadQuotaExecuter = TDbExecuter<TReadQuotaState>;
+
+struct TSyncQuotaState {
+ TString SubjectType;
+ TString SubjectId;
+ TString MetricName;
+ TQuotaUsage Usage;
+ bool Refreshed = false;
+};
+
+using TSyncQuotaExecuter = TDbExecuter<TSyncQuotaState>;
+
+TString ToString(const std::vector<ui32>& v) {
+ if (v.empty()) {
+ return "[]";
+ }
+ TStringBuilder builder;
+ for (auto i : v) {
+ if (builder.empty()) {
+ builder << "[" << i;
+ } else {
+ builder << ", " << i;
+ }
+ if (builder.size() > 1024) {
+ builder << "...";
+ break;
+ }
+ }
+ builder << "]";
+ return builder;
+}
+
+TString ToString(const TQuotaMap& quota){
+ if (quota.empty()) {
+ return "{}";
+ }
+ TStringBuilder builder;
+ for (auto p : quota) {
+ builder << (builder.empty() ? "{" : ", ") << p.second.ToString(p.first);
+ if (builder.size() > 1024) {
+ builder << "...";
+ break;
+ }
+ }
+ builder << "}";
+ return builder;
+}
+
+TString ToString(const THashMap<TString, TQuotaCachedUsage>& usageMap){
+ if (usageMap.empty()) {
+ return "{}";
+ }
+ TStringBuilder builder;
+ for (auto p : usageMap) {
+ builder << (builder.empty() ? "{" : ", ") << p.second.Usage.ToString(p.first);
+ if (builder.size() > 1024) {
+ builder << "...";
+ break;
+ }
+ }
+ builder << "}";
+ return builder;
+}
+
class TQuotaManagementService : public NActors::TActorBootstrapped<TQuotaManagementService> {
public:
TQuotaManagementService(
const NConfig::TQuotasManagerConfig& config,
- /* const NYq::TYqSharedResources::TPtr& yqSharedResources, */
+ const NConfig::TYdbStorageConfig& storageConfig,
+ const TYqSharedResources::TPtr& yqSharedResources,
+ NKikimr::TYdbCredentialsProviderFactory credProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
std::vector<TQuotaDescription> quotaDescriptions)
: Config(config)
- , ServiceCounters(counters->GetSubgroup("subsystem", "QuotaService"))
+ , StorageConfig(storageConfig)
+ , YqSharedResources(yqSharedResources)
+ , CredProviderFactory(credProviderFactory)
+ , ServiceCounters(counters->GetSubgroup("subsystem", "quota_manager"))
{
- /* Y_UNUSED(yqSharedResources); */
for (auto& description : quotaDescriptions) {
QuotaInfoMap[description.SubjectType].emplace(description.MetricName, description.Info);
}
+ LimitRefreshPeriod = GetDuration(Config.GetLimitRefreshPeriod(), LIMIT_REFRESH_PERIOD);
UsageRefreshPeriod = GetDuration(Config.GetUsageRefreshPeriod(), USAGE_REFRESH_PERIOD);
}
static constexpr char ActorName[] = "FQ_QUOTA_SERVICE";
void Bootstrap() {
+ YdbConnection = NewYdbConnection(StorageConfig, CredProviderFactory, YqSharedResources->CoreYdbDriver);
+ DbPool = YqSharedResources->DbPoolHolder->GetOrCreate(EDbPoolId::MAIN, 10, YdbConnection->TablePathPrefix);
+ Send(GetNameserviceActorId(), new NActors::TEvInterconnect::TEvListNodes());
Become(&TQuotaManagementService::StateFunc);
LOG_I("STARTED");
}
@@ -73,59 +160,90 @@ private:
hFunc(TEvQuotaService::TQuotaChangeNotification, Handle)
hFunc(TEvQuotaService::TQuotaUsageResponse, Handle)
hFunc(TEvQuotaService::TQuotaSetRequest, Handle)
+ hFunc(TEvents::TEvCallback, [](TEvents::TEvCallback::TPtr& ev) { ev->Get()->Callback(); } );
+ hFunc(NActors::TEvInterconnect::TEvNodesInfo, Handle)
+ hFunc(TEvQuotaService::TEvQuotaUpdateNotification, Handle)
+ hFunc(NActors::TEvents::TEvUndelivered, Handle)
);
+ void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
+ LOG_I("UNDELIVERED to Peer " << ev->Sender.NodeId() << ", " << ev->Get()->Reason);
+ }
+
+ void Handle(NActors::TEvInterconnect::TEvNodesInfo::TPtr& ev) {
+ auto oldPeerCount = NodeIds.size();
+ NodeIds.clear();
+ auto selfNodeId = SelfId().NodeId();
+ for (auto node : ev->Get()->Nodes) {
+ if (node.NodeId != selfNodeId) {
+ NodeIds.push_back(node.NodeId);
+ }
+ }
+ *ServiceCounters->GetCounter("PeerCount") = NodeIds.size();
+ if (oldPeerCount != NodeIds.size()) {
+ LOG_D("IC Peers[" << NodeIds.size() << "]: " << ToString(NodeIds));
+ }
+ TActivationContext::Schedule(TDuration::Seconds(NodeIds.empty() ? 1 : 5), new IEventHandle(GetNameserviceActorId(), SelfId(), new NActors::TEvInterconnect::TEvListNodes()));
+ }
+
void Handle(TEvQuotaService::TQuotaGetRequest::TPtr& ev) {
auto subjectType = ev->Get()->SubjectType;
auto subjectId = ev->Get()->SubjectId;
auto& subjectMap = QuotaCacheMap[subjectType];
auto& infoMap = QuotaInfoMap[subjectType];
- if (subjectId.empty()) { // Just get defaults
+ if (subjectId.empty()) { // Just reply with defaults
auto response = MakeHolder<TEvQuotaService::TQuotaGetResponse>();
response->SubjectType = subjectType;
for (auto& it : infoMap) {
response->Quotas.emplace(it.first, TQuotaUsage(it.second.DefaultLimit));
}
+ LOG_T(subjectType << ".<defaults>: " << ToString(response->Quotas));
Send(ev->Sender, response.Release());
return;
}
auto it = subjectMap.find(subjectId);
- bool pended = false;
- // Load into cache, if needed
if (it == subjectMap.end()) {
- TQuotaCache cache;
- // 1. Load from Config
- for (const auto& quota : Config.GetQuotas()) {
- if (quota.GetSubjectType() == subjectType && quota.GetSubjectId() == subjectId) {
- for (const auto& limit : quota.GetLimit()) {
- cache.UsageMap.emplace(limit.GetName(), TQuotaCachedUsage(limit.GetLimit()));
- }
- }
+ LOG_D(subjectType << "." << subjectId << " NOT CASHED, Loading ...");
+ } else {
+ if (it->second.LoadedAt + LimitRefreshPeriod < Now()) {
+ LOG_D(subjectType << "." << subjectId << " FORCE CASHE RELOAD, Loading ...");
+ it = subjectMap.end();
}
- // 2. Append defaults
- for (auto& it : infoMap) {
- if (cache.UsageMap.find(it.first) == cache.UsageMap.end()) {
- cache.UsageMap.emplace(it.first, TQuotaCachedUsage(it.second.DefaultLimit));
+ }
+
+ if (it == subjectMap.end()) {
+ ReadQuota(subjectType, subjectId,
+ [this, ev=ev](TReadQuotaExecuter& executer) {
+ // This block is executed in correct self-context, no locks/syncs required
+ auto& subjectMap = this->QuotaCacheMap[executer.State.SubjectType];
+ auto& cache = subjectMap[executer.State.SubjectId];
+ LOG_D(executer.State.SubjectType << "." << executer.State.SubjectId << ToString(cache.UsageMap) << " LOADED");
+ CheckUsageMaybeReply(executer.State.SubjectType, executer.State.SubjectId, cache, ev);
}
- }
- // 3. Load from DB
- subjectMap.emplace(subjectId, cache);
+ );
+ } else {
+ CheckUsageMaybeReply(subjectType, subjectId, it->second, ev);
}
+ }
- auto& cache = subjectMap[subjectId];
+ void CheckUsageMaybeReply(const TString& subjectType, const TString& subjectId, TQuotaCache& cache, const TEvQuotaService::TQuotaGetRequest::TPtr& ev) {
+
+ bool pended = false;
+ auto& infoMap = QuotaInfoMap[subjectType];
if (!ev->Get()->AllowStaleUsage) {
// Refresh usage
for (auto& itUsage : cache.UsageMap) {
auto metricName = itUsage.first;
auto& cachedUsage = itUsage.second;
- if (cachedUsage.Usage.UpdatedAt + UsageRefreshPeriod < Now()) {
+ if (!cachedUsage.Usage.Usage || cachedUsage.Usage.Usage->UpdatedAt + UsageRefreshPeriod < Now()) {
auto it = infoMap.find(metricName);
if (it != infoMap.end()) {
if (it->second.UsageUpdater != NActors::TActorId{}) {
if (!cache.PendingUsage.contains(metricName)) {
+ LOG_T(subjectType << "." << subjectId << "." << metricName << " IS STALE, Refreshing ...");
Send(it->second.UsageUpdater, new TEvQuotaService::TQuotaUsageRequest(subjectType, subjectId, metricName));
cache.PendingUsage.insert(metricName);
cachedUsage.RequestedAt = Now();
@@ -141,17 +259,165 @@ private:
}
if (!pended) {
- auto response = MakeHolder<TEvQuotaService::TQuotaGetResponse>();
- response->SubjectType = subjectType;
- response->SubjectId = subjectId;
- for (auto it : cache.UsageMap) {
- response->Quotas.emplace(it.first, it.second.Usage);
- }
- Send(ev->Sender, response.Release());
+ SendQuota(ev->Sender, ev->Cookie, subjectType, subjectId, cache);
cache.PendingRequests.erase(ev->Sender);
}
}
+ void ChangeLimitsAndReply(const TString& subjectType, const TString& subjectId, TQuotaCache& cache, const TEvQuotaService::TQuotaSetRequest::TPtr& ev) {
+
+ auto& infoMap = QuotaInfoMap[subjectType];
+ for (auto metricLimit : ev->Get()->Limits) {
+ auto& metricName = metricLimit.first;
+
+ auto it = cache.UsageMap.find(metricName);
+ if (it != cache.UsageMap.end()) {
+ auto& cached = it->second;
+ auto limit = metricLimit.second;
+ if (cached.Usage.Limit.Value == 0 || limit == 0 || limit > cached.Usage.Limit.Value) {
+ // check hard limit only if quota is increased
+ auto itI = infoMap.find(metricName);
+ if (itI != infoMap.end()) {
+ auto& info = itI->second;
+ if (info.HardLimit != 0 && (limit == 0 || limit > info.HardLimit)) {
+ limit = info.HardLimit;
+ }
+ }
+ }
+ if (cached.Usage.Limit.Value != limit) {
+ cached.Usage.Limit.Value = limit;
+ cached.Usage.Limit.UpdatedAt = Now();
+ LOG_T(cached.Usage.ToString(subjectType, subjectId, metricName) << " LIMIT Changed");
+ SyncQuota(subjectType, subjectId, metricName, cached);
+ }
+ }
+ }
+
+ auto response = MakeHolder<TEvQuotaService::TQuotaSetResponse>(subjectType, subjectId);
+ for (auto it : cache.UsageMap) {
+ response->Limits.emplace(it.first, it.second.Usage.Limit.Value);
+ }
+ Send(ev->Sender, response.Release());
+ }
+
+
+ void ReadQuota(const TString& subjectType, const TString& subjectId, TReadQuotaExecuter::TCallback callback) {
+
+ TDbExecutable::TPtr executable;
+ auto& executer = TReadQuotaExecuter::Create(executable, false, nullptr);
+
+ executer.State.SubjectType = subjectType;
+ executer.State.SubjectId = subjectId;
+
+ executer.Read(
+ [](TReadQuotaExecuter& executer, TSqlQueryBuilder& builder) {
+ builder.AddText(
+ "SELECT `" METRIC_NAME_COLUMN_NAME "`, `" METRIC_LIMIT_COLUMN_NAME "`, `" LIMIT_UPDATED_AT_COLUMN_NAME "`, `" METRIC_USAGE_COLUMN_NAME "`, `" USAGE_UPDATED_AT_COLUMN_NAME "`\n"
+ "FROM `" QUOTAS_TABLE_NAME "`\n"
+ "WHERE `" SUBJECT_TYPE_COLUMN_NAME "` = $subject\n"
+ " AND `" SUBJECT_ID_COLUMN_NAME "` = $id\n"
+ );
+ builder.AddString("subject", executer.State.SubjectType);
+ builder.AddString("id", executer.State.SubjectId);
+ },
+ [](TReadQuotaExecuter& executer, const TVector<NYdb::TResultSet>& resultSets) {
+ TResultSetParser parser(resultSets.front());
+ while (parser.TryNextRow()) {
+ auto name = *parser.ColumnParser(METRIC_NAME_COLUMN_NAME).GetOptionalString();
+ auto& quotaUsage = executer.State.UsageMap[name];
+ quotaUsage.Limit.Value = *parser.ColumnParser(METRIC_LIMIT_COLUMN_NAME).GetOptionalUint64();
+ quotaUsage.Limit.UpdatedAt = *parser.ColumnParser(LIMIT_UPDATED_AT_COLUMN_NAME).GetOptionalTimestamp();
+ auto usage = parser.ColumnParser(METRIC_USAGE_COLUMN_NAME).GetOptionalUint64();
+ if (usage) {
+ quotaUsage.Usage.ConstructInPlace();
+ quotaUsage.Usage->Value = *usage;
+ quotaUsage.Usage->UpdatedAt = *parser.ColumnParser(USAGE_UPDATED_AT_COLUMN_NAME).GetOptionalTimestamp();
+ }
+ }
+ },
+ "ReadQuotas"
+ ).Process(SelfId(),
+ [this, callback=callback](TReadQuotaExecuter& executer) {
+ auto& subjectMap = this->QuotaCacheMap[executer.State.SubjectType];
+ auto& cache = subjectMap[executer.State.SubjectId];
+
+ LOG_T(executer.State.SubjectType << "." << executer.State.SubjectId << " " << ToString(executer.State.UsageMap) << " FROM DB");
+
+ // 1. Fill from DB
+ for (auto& itUsage : executer.State.UsageMap) {
+ cache.UsageMap[itUsage.first].Usage = itUsage.second;
+ }
+
+ // 2. Append from Config
+ for (const auto& quota : this->Config.GetQuotas()) {
+ if (quota.GetSubjectType() == executer.State.SubjectType && quota.GetSubjectId() == executer.State.SubjectId) {
+ for (const auto& limit : quota.GetLimit()) {
+ if (cache.UsageMap.find(limit.GetName()) == cache.UsageMap.end()) {
+ cache.UsageMap.emplace(limit.GetName(), TQuotaCachedUsage(limit.GetLimit()));
+ }
+ }
+ }
+ }
+
+ // 3. Append defaults
+ auto& infoMap = this->QuotaInfoMap[executer.State.SubjectType];
+ for (auto& it : infoMap) {
+ if (cache.UsageMap.find(it.first) == cache.UsageMap.end()) {
+ cache.UsageMap.emplace(it.first, TQuotaCachedUsage(it.second.DefaultLimit));
+ }
+ }
+
+ cache.LoadedAt = Now();
+
+ if (callback) {
+ callback(executer);
+ }
+ }
+ );
+
+ Exec(DbPool, executable);
+ }
+
+ void SendQuota(NActors::TActorId receivedId, ui64 cookie, const TString& subjectType, const TString& subjectId, TQuotaCache& cache) {
+ auto response = MakeHolder<TEvQuotaService::TQuotaGetResponse>();
+ response->SubjectType = subjectType;
+ response->SubjectId = subjectId;
+ for (auto it : cache.UsageMap) {
+ response->Quotas.emplace(it.first, it.second.Usage);
+ }
+ LOG_T(subjectType << "." << subjectId << ToString(response->Quotas) << " SEND QUOTAS");
+ Send(receivedId, response.Release(), 0, cookie);
+ }
+
+ void Handle(TEvQuotaService::TEvQuotaUpdateNotification::TPtr& ev) {
+ auto& record = ev->Get()->Record;
+ TQuotaUsage usage(record.metric_limit(), NProtoInterop::CastFromProto(record.limit_updated_at()));
+ if (record.has_usage_updated_at()) {
+ usage.Usage.ConstructInPlace();
+ usage.Usage->Value = record.metric_usage();
+ usage.Usage->UpdatedAt = NProtoInterop::CastFromProto(record.usage_updated_at());
+ }
+ LOG_T(usage.ToString(record.subject_type(), record.subject_id(), record.metric_name()) << " UPDATE from Peer " << ev->Sender.NodeId());
+ UpdateQuota(record.subject_type(), record.subject_id(), record.metric_name(), usage);
+ }
+
+ void NotifyClusterNodes(const TString& subjectType, const TString& subjectId, const TString& metricName, TQuotaUsage& usage) {
+ LOG_T(usage.ToString(subjectType, subjectId, metricName) << " NOTIFY CHANGE");
+ for (auto nodeId : NodeIds) {
+ Fq::Quota::EvQuotaUpdateNotification notification;
+ notification.set_subject_type(subjectType);
+ notification.set_subject_id(subjectId);
+ notification.set_metric_name(metricName);
+ notification.set_metric_limit(usage.Limit.Value);
+ *notification.mutable_limit_updated_at() = NProtoInterop::CastToProto(usage.Limit.UpdatedAt);
+ if (usage.Usage) {
+ notification.set_metric_usage(usage.Usage->Value);
+ *notification.mutable_usage_updated_at() = NProtoInterop::CastToProto(usage.Usage->UpdatedAt);
+ }
+ Send(MakeQuotaServiceActorId(nodeId), new TEvQuotaService::TEvQuotaUpdateNotification(notification), IEventHandle::FlagTrackDelivery);
+ }
+ }
+
void Handle(TEvQuotaService::TQuotaChangeNotification::TPtr& ev) {
auto& request = *ev->Get();
auto itt = QuotaInfoMap.find(request.SubjectType);
@@ -161,12 +427,129 @@ private:
if (itm != metricMap.end()) {
auto& info = itm->second;
if (info.UsageUpdater != NActors::TActorId{}) {
+ LOG_T(request.SubjectType << "." << request.SubjectId << "." << request.MetricName << " FORCE UPDATE, Updating ...");
Send(info.UsageUpdater, new TEvQuotaService::TQuotaUsageRequest(request.SubjectType, request.SubjectId, request.MetricName));
}
}
}
}
+ void SyncQuota(const TString& subjectType, const TString& subjectId, const TString& metricName, TQuotaCachedUsage& cached) {
+
+ if (cached.SyncInProgress) {
+ cached.ChangedAfterSync = true;
+ return;
+ }
+
+ TDbExecutable::TPtr executable;
+ auto& executer = TSyncQuotaExecuter::Create(executable, false, nullptr);
+
+ executer.State.SubjectType = subjectType;
+ executer.State.SubjectId = subjectId;
+ executer.State.MetricName = metricName;
+ executer.State.Usage = cached.Usage;
+
+ cached.ChangedAfterSync = false;
+ cached.SyncInProgress = true;
+
+ executer.Read(
+ [](TSyncQuotaExecuter& executer, TSqlQueryBuilder& builder) {
+ builder.AddText(
+ "SELECT `" METRIC_LIMIT_COLUMN_NAME "`, `" LIMIT_UPDATED_AT_COLUMN_NAME"`, `" METRIC_USAGE_COLUMN_NAME "`, `" USAGE_UPDATED_AT_COLUMN_NAME "`\n"
+ "FROM `" QUOTAS_TABLE_NAME "`\n"
+ "WHERE `" SUBJECT_TYPE_COLUMN_NAME "` = $subject\n"
+ " AND `" SUBJECT_ID_COLUMN_NAME "` = $id\n"
+ " AND `" METRIC_NAME_COLUMN_NAME "` = $metric\n"
+ );
+ builder.AddString("subject", executer.State.SubjectType);
+ builder.AddString("id", executer.State.SubjectId);
+ builder.AddString("metric", executer.State.MetricName);
+ },
+ [](TSyncQuotaExecuter& executer, const TVector<NYdb::TResultSet>& resultSets) {
+ TResultSetParser parser(resultSets.front());
+ if (parser.TryNextRow()) {
+ auto limitUpdatedAt = parser.ColumnParser(LIMIT_UPDATED_AT_COLUMN_NAME).GetOptionalTimestamp();
+ if (limitUpdatedAt && *limitUpdatedAt > executer.State.Usage.Limit.UpdatedAt) {
+ // DB changed since last read, use it and ignore local changes
+ executer.State.Usage.Limit.Value = *parser.ColumnParser(METRIC_LIMIT_COLUMN_NAME).GetOptionalUint64();
+ executer.State.Usage.Limit.UpdatedAt = *limitUpdatedAt;
+ executer.State.Refreshed = true;
+ }
+ auto usageUpdatedAt = parser.ColumnParser(USAGE_UPDATED_AT_COLUMN_NAME).GetOptionalTimestamp();
+ if (usageUpdatedAt && (!executer.State.Usage.Usage || *usageUpdatedAt > executer.State.Usage.Usage->UpdatedAt)) {
+ if (!executer.State.Usage.Usage) {
+ executer.State.Usage.Usage.ConstructInPlace();
+ }
+ executer.State.Usage.Usage->Value = *parser.ColumnParser(METRIC_USAGE_COLUMN_NAME).GetOptionalUint64();
+ executer.State.Usage.Usage->UpdatedAt = *usageUpdatedAt;
+ }
+ }
+ if (!executer.State.Refreshed) {
+ executer.Write(
+ [](TSyncQuotaExecuter& executer, TSqlQueryBuilder& builder) {
+ builder.AddText(
+ "UPSERT INTO `" QUOTAS_TABLE_NAME "` (`" SUBJECT_TYPE_COLUMN_NAME "`, `" SUBJECT_ID_COLUMN_NAME "`, `" METRIC_NAME_COLUMN_NAME "`, `" METRIC_LIMIT_COLUMN_NAME "`, `" LIMIT_UPDATED_AT_COLUMN_NAME "`, `" METRIC_USAGE_COLUMN_NAME "`, `" USAGE_UPDATED_AT_COLUMN_NAME "`)\n"
+ );
+ builder.AddText(
+ executer.State.Usage.Usage ?
+ "VALUES ($subject, $id, $metric, $limit, $limit_updated_at, $usage, $usage_updated_at);\n"
+ : "VALUES ($subject, $id, $metric, $limit, $limit_updated_at, NULL, NULL);\n"
+ );
+ builder.AddString("subject", executer.State.SubjectType);
+ builder.AddString("id", executer.State.SubjectId);
+ builder.AddString("metric", executer.State.MetricName);
+ builder.AddUint64("limit", executer.State.Usage.Limit.Value);
+ builder.AddTimestamp("limit_updated_at", executer.State.Usage.Limit.UpdatedAt);
+ if (executer.State.Usage.Usage) {
+ builder.AddUint64("usage", executer.State.Usage.Usage->Value);
+ builder.AddTimestamp("usage_updated_at", executer.State.Usage.Usage->UpdatedAt);
+ }
+ },
+ "WriteQuota"
+ );
+ }
+ },
+ "CheckQuota"
+ ).Process(SelfId(),
+ [this](TSyncQuotaExecuter& executer) {
+ if (executer.State.Refreshed) {
+ UpdateQuota(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName, executer.State.Usage);
+ } else {
+ this->NotifyClusterNodes(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName, executer.State.Usage);
+ }
+
+ auto& subjectMap = this->QuotaCacheMap[executer.State.SubjectType];
+ auto it = subjectMap.find(executer.State.SubjectId);
+ if (it != subjectMap.end()) {
+ auto& cache = it->second;
+ auto itQ = cache.UsageMap.find(executer.State.MetricName);
+ if (itQ != cache.UsageMap.end()) {
+ itQ->second.SyncInProgress = false;
+ if (itQ->second.ChangedAfterSync) {
+ LOG_T(itQ->second.Usage.ToString(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName) << " RESYNC");
+ SyncQuota(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName, itQ->second);
+ }
+ }
+ }
+ }
+ );
+
+ Exec(DbPool, executable);
+ }
+
+ void UpdateQuota(const TString& subjectType, const TString& subjectId, const TString& metricName, TQuotaUsage& usage) {
+ auto& subjectMap = QuotaCacheMap[subjectType];
+ auto it = subjectMap.find(subjectId);
+ if (it != subjectMap.end()) {
+ auto& cache = it->second;
+ auto itQ = cache.UsageMap.find(metricName);
+ if (itQ != cache.UsageMap.end()) {
+ itQ->second.Usage.Merge(usage);
+ LOG_T(itQ->second.Usage.ToString(subjectType, subjectId, metricName) << " MERGED");
+ }
+ }
+ }
+
void Handle(TEvQuotaService::TQuotaUsageResponse::TPtr& ev) {
auto subjectType = ev->Get()->SubjectType;
auto subjectId = ev->Get()->SubjectId;
@@ -175,6 +558,7 @@ private:
auto it = subjectMap.find(subjectId);
if (it == subjectMap.end()) {
+ // if quotas are not cached - ignore usage update
return;
}
@@ -182,96 +566,66 @@ private:
cache.PendingUsage.erase(metricName);
auto itQ = cache.UsageMap.find(metricName);
- if (itQ != cache.UsageMap.end()) {
- itQ->second.Usage.Usage = ev->Get()->Usage;
+ if (itQ == cache.UsageMap.end()) {
+ // if metric is not defined - ignore usage update
+ return;
}
+ itQ->second.Usage.Usage = ev->Get()->Usage;
+ LOG_T(itQ->second.Usage.ToString(subjectType, subjectId, metricName) << " REFRESHED");
if (cache.PendingUsage.size() == 0) {
for (auto& itR : cache.PendingRequests) {
- auto response = MakeHolder<TEvQuotaService::TQuotaGetResponse>();
- response->SubjectType = subjectType;
- response->SubjectId = subjectId;
- for (auto it : cache.UsageMap) {
- response->Quotas.emplace(it.first, it.second.Usage);
- }
- Send(itR.first, response.Release());
+ SendQuota(itR.first, itR.second, subjectType, subjectId, cache);
}
cache.PendingRequests.clear();
}
+
+ SyncQuota(subjectType, subjectId, metricName, itQ->second);
}
void Handle(TEvQuotaService::TQuotaSetRequest::TPtr& ev) {
auto subjectType = ev->Get()->SubjectType;
auto subjectId = ev->Get()->SubjectId;
auto& subjectMap = QuotaCacheMap[subjectType];
- auto& infoMap = QuotaInfoMap[subjectType];
auto it = subjectMap.find(subjectId);
-
- // Load into cache, if needed
if (it == subjectMap.end()) {
- TQuotaCache cache;
- // 1. Load from Config
- for (const auto& quota : Config.GetQuotas()) {
- if (quota.GetSubjectType() == subjectType && quota.GetSubjectId() == subjectId) {
- for (const auto& limit : quota.GetLimit()) {
- cache.UsageMap.emplace(limit.GetName(), TQuotaCachedUsage(limit.GetLimit()));
- }
- }
- }
- // 2. Load from DB (TBD)
- // 3. Append defaults
- for (auto& it : infoMap) {
- if (cache.UsageMap.find(it.first) == cache.UsageMap.end()) {
- cache.UsageMap.emplace(it.first, TQuotaCachedUsage(it.second.DefaultLimit));
- }
- }
- bool _;
- std::tie(it, _) = subjectMap.emplace(subjectId, cache);
- }
-
- auto& cache = it->second;
-
- for (auto metricLimit : ev->Get()->Limits) {
- auto& name = metricLimit.first;
- auto it = cache.UsageMap.find(name);
- if (it != cache.UsageMap.end()) {
- auto& cached = it->second;
- auto limit = metricLimit.second;
- if (cached.Usage.Limit == 0 || limit == 0 || limit > cached.Usage.Limit) {
- // check hard limit only if quota is increased
- auto itI = infoMap.find(name);
- if (itI != infoMap.end()) {
- auto& info = itI->second;
- if (info.HardLimit != 0 && (limit == 0 || limit > info.HardLimit)) {
- limit = info.HardLimit;
- }
- }
+ ReadQuota(subjectType, subjectId,
+ [this, ev=ev](TReadQuotaExecuter& executer) {
+ // This block is executed in correct self-context, no locks/syncs required
+ auto& subjectMap = this->QuotaCacheMap[executer.State.SubjectType];
+ auto& cache = subjectMap[executer.State.SubjectId];
+ LOG_D(executer.State.SubjectType << "." << executer.State.SubjectId << ToString(cache.UsageMap) << " LOADED");
+ ChangeLimitsAndReply(executer.State.SubjectType, executer.State.SubjectId, cache, ev);
}
- cached.Usage.Limit = limit;
- }
+ );
+ } else {
+ ChangeLimitsAndReply(subjectType, subjectId, it->second, ev);
}
-
- auto response = MakeHolder<TEvQuotaService::TQuotaSetResponse>(subjectType, subjectId);
- for (auto it : cache.UsageMap) {
- response->Limits.emplace(it.first, it.second.Usage.Limit);
- }
- Send(ev->Sender, response.Release());
}
NConfig::TQuotasManagerConfig Config;
+ NConfig::TYdbStorageConfig StorageConfig;
+ ::NYq::TYqSharedResources::TPtr YqSharedResources;
+ NKikimr::TYdbCredentialsProviderFactory CredProviderFactory;
+ TYdbConnectionPtr YdbConnection;
+ TDbPool::TPtr DbPool;
const ::NMonitoring::TDynamicCounterPtr ServiceCounters;
THashMap<TString /* SubjectType */, THashMap<TString /* MetricName */, TQuotaInfo>> QuotaInfoMap;
THashMap<TString /* SubjectType */, THashMap<TString /* SubjectId */, TQuotaCache>> QuotaCacheMap;
+ TDuration LimitRefreshPeriod;
TDuration UsageRefreshPeriod;
+ std::vector<ui32> NodeIds;
};
NActors::IActor* CreateQuotaServiceActor(
const NConfig::TQuotasManagerConfig& config,
- /* const NYq::TYqSharedResources::TPtr& yqSharedResources, */
+ const NConfig::TYdbStorageConfig& storageConfig,
+ const TYqSharedResources::TPtr& yqSharedResources,
+ NKikimr::TYdbCredentialsProviderFactory credProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
std::vector<TQuotaDescription> quotaDesc) {
- return new TQuotaManagementService(config, /* yqSharedResources, */ counters, quotaDesc);
+ return new TQuotaManagementService(config, storageConfig, yqSharedResources, credProviderFactory, counters, quotaDesc);
}
} /* NYq */
diff --git a/ydb/core/yq/libs/quota_manager/quota_manager.h b/ydb/core/yq/libs/quota_manager/quota_manager.h
index b596123ece..9338067411 100644
--- a/ydb/core/yq/libs/quota_manager/quota_manager.h
+++ b/ydb/core/yq/libs/quota_manager/quota_manager.h
@@ -32,11 +32,13 @@
namespace NYq {
-NActors::TActorId MakeQuotaServiceActorId();
+NActors::TActorId MakeQuotaServiceActorId(ui32 nodeId);
NActors::IActor* CreateQuotaServiceActor(
const NConfig::TQuotasManagerConfig& config,
- /* const NYq::TYqSharedResources::TPtr& yqSharedResources, */
+ const NConfig::TYdbStorageConfig& storageConfig,
+ const TYqSharedResources::TPtr& yqSharedResources,
+ NKikimr::TYdbCredentialsProviderFactory credProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
std::vector<TQuotaDescription> quotaDesc);
diff --git a/ydb/core/yq/libs/quota_manager/quota_proxy.cpp b/ydb/core/yq/libs/quota_manager/quota_proxy.cpp
index 7e7394b80e..e6cb9f205b 100644
--- a/ydb/core/yq/libs/quota_manager/quota_proxy.cpp
+++ b/ydb/core/yq/libs/quota_manager/quota_proxy.cpp
@@ -12,13 +12,15 @@
#include <ydb/core/protos/services.pb.h>
#define LOG_E(stream) \
- LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream)
+ LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_PROXY, stream)
#define LOG_W(stream) \
- LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream)
+ LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_PROXY, stream)
#define LOG_I(stream) \
- LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream)
+ LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_PROXY, stream)
#define LOG_D(stream) \
- LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream)
+ LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_PROXY, stream)
+#define LOG_T(stream) \
+ LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_PROXY, stream)
namespace NYq {
@@ -36,7 +38,7 @@ public:
void Bootstrap() {
Become(&TQuotaProxyGetRequestActor::StateFunc);
- Send(NYq::MakeQuotaServiceActorId(), new TEvQuotaService::TQuotaGetRequest(Ev->Get()->SubjectType, Ev->Get()->SubjectId));
+ Send(NYq::MakeQuotaServiceActorId(SelfId().NodeId()), new TEvQuotaService::TQuotaGetRequest(Ev->Get()->SubjectType, Ev->Get()->SubjectId));
}
STRICT_STFUNC(StateFunc,
@@ -58,7 +60,7 @@ public:
void Bootstrap() {
Become(&TQuotaProxySetRequestActor::StateFunc);
- Send(NYq::MakeQuotaServiceActorId(), new TEvQuotaService::TQuotaSetRequest(Ev->Get()->SubjectType, Ev->Get()->SubjectId, Ev->Get()->Limits));
+ Send(NYq::MakeQuotaServiceActorId(SelfId().NodeId()), new TEvQuotaService::TQuotaSetRequest(Ev->Get()->SubjectType, Ev->Get()->SubjectId, Ev->Get()->Limits));
}
STRICT_STFUNC(StateFunc,
diff --git a/ydb/core/yq/libs/shared_resources/CMakeLists.txt b/ydb/core/yq/libs/shared_resources/CMakeLists.txt
index 2aade5a890..ff3008d5ab 100644
--- a/ydb/core/yq/libs/shared_resources/CMakeLists.txt
+++ b/ydb/core/yq/libs/shared_resources/CMakeLists.txt
@@ -19,6 +19,7 @@ target_link_libraries(yq-libs-shared_resources PUBLIC
ydb-core-protos
libs-control_plane_storage-proto
yq-libs-events
+ libs-quota_manager-events
libs-shared_resources-interface
ydb-library-logger
ydb-library-security
diff --git a/ydb/core/yq/libs/shared_resources/db_exec.h b/ydb/core/yq/libs/shared_resources/db_exec.h
index a2b4343544..bf3a2287e7 100644
--- a/ydb/core/yq/libs/shared_resources/db_exec.h
+++ b/ydb/core/yq/libs/shared_resources/db_exec.h
@@ -71,10 +71,16 @@ inline TAsyncStatus Exec(TDbPool::TPtr dbPool, TDbExecutable::TPtr executable) {
template <typename TState>
class TDbExecuter : public TDbExecutable {
+public:
+ using TCallback = std::function<void(TDbExecuter<TState>&)>;
+ using TBuildCallback = std::function<void(TDbExecuter<TState>&, TSqlQueryBuilder&)>;
+ using TResultCallback = std::function<void(TDbExecuter<TState>&, const TVector<NYdb::TResultSet>&)>;
+
+private:
struct TExecStep {
- std::function<void(TDbExecuter<TState>&, TSqlQueryBuilder&)> BuilderCallback;
- std::function<void(TDbExecuter<TState>&, const TVector<NYdb::TResultSet>&)> HandlerCallback;
- std::function<void(TDbExecuter<TState>&)> ProcessCallback;
+ TBuildCallback BuildCallback;
+ TResultCallback ResultCallback;
+ TCallback ProcessCallback;
TString Name;
bool Commit = false;
};
@@ -84,15 +90,20 @@ class TDbExecuter : public TDbExecutable {
NActors::TActorId HandlerActorId;
TMaybe<TTransaction> Transaction;
NActors::TActorSystem* ActorSystem = nullptr;
- std::function<void(TDbExecuter<TState>&)> HandlerCallback;
- std::function<void(TDbExecuter<TState>&)> StateInitCallback;
+ TCallback HandlerCallback;
+ TCallback StateInitCallback;
bool skipStep = false;
protected:
- TDbExecuter(bool collectDebugInfo, std::function<void(TDbExecuter<TState>&)> stateInitCallback = nullptr)
+ TDbExecuter(bool collectDebugInfo, std::function<void(TDbExecuter<TState>&)> stateInitCallback)
: TDbExecutable(collectDebugInfo), StateInitCallback(stateInitCallback) {
}
+ TDbExecuter(bool collectDebugInfo)
+ : TDbExecutable(collectDebugInfo) {
+ StateInitCallback = [](TDbExecuter<TState>& executer) { executer.State = TState{}; };
+ }
+
TDbExecuter(const TDbExecuter& other) = delete;
public:
@@ -105,6 +116,12 @@ public:
return *executer;
};
+ static TDbExecuter& Create(TDbExecutable::TPtr& holder, bool collectDebugInfo, std::function<void(TDbExecuter<TState>&)> stateInitCallback) {
+ auto executer = new TDbExecuter(collectDebugInfo, stateInitCallback);
+ holder.reset(executer);
+ return *executer;
+ };
+
void SkipStep() {
skipStep = true;
}
@@ -121,7 +138,11 @@ public:
TCommitTransactionResult result = future.GetValue();
auto status = static_cast<TStatus>(result);
- return this->NextStep(session);
+ if (!status.IsSuccess()) {
+ return MakeFuture(status);
+ } else {
+ return this->NextStep(session);
+ }
});
}
if (HandlerActorId != NActors::TActorId{}) {
@@ -136,7 +157,7 @@ public:
} else {
TSqlQueryBuilder builder(DbPool->TablePathPrefix, Steps[CurrentStepIndex].Name);
skipStep = false;
- Steps[CurrentStepIndex].BuilderCallback(*this, builder);
+ Steps[CurrentStepIndex].BuildCallback(*this, builder);
if (skipStep) { // TODO Refactor this
this->CurrentStepIndex++;
@@ -170,9 +191,9 @@ public:
this->Transaction = result.GetTransaction();
}
- if (this->Steps[CurrentStepIndex].HandlerCallback) {
+ if (this->Steps[CurrentStepIndex].ResultCallback) {
try {
- this->Steps[CurrentStepIndex].HandlerCallback(*this, result.GetResultSets());
+ this->Steps[CurrentStepIndex].ResultCallback(*this, result.GetResultSets());
} catch (const TControlPlaneStorageException& exception) {
NYql::TIssue issue = MakeErrorIssue(exception.Code, exception.GetRawMessage());
Issues.AddIssue(issue);
@@ -200,34 +221,32 @@ public:
TAsyncStatus Execute(NYdb::NTable::TSession& session) override {
if (StateInitCallback) {
StateInitCallback(*this);
- } else {
- State = TState{};
}
return NextStep(session);
}
TDbExecuter& Read(
- std::function<void(TDbExecuter<TState>&, TSqlQueryBuilder&)> builderCallback
- , std::function<void(TDbExecuter<TState>&, const TVector<NYdb::TResultSet>&)> handlerCallback
+ TBuildCallback buildCallback
+ , TResultCallback resultCallback
, const TString& Name = "DefaultReadName"
, bool commit = false
) {
- Steps.emplace(Steps.begin() + InsertStepIndex, TExecStep{builderCallback, handlerCallback, nullptr, Name, commit});
+ Steps.emplace(Steps.begin() + InsertStepIndex, TExecStep{buildCallback, resultCallback, nullptr, Name, commit});
InsertStepIndex++;
return *this;
}
TDbExecuter& Write(
- std::function<void(TDbExecuter<TState>&, TSqlQueryBuilder&)> builderCallback
+ TBuildCallback buildCallback
, const TString& Name = "DefaultWriteName"
, bool commit = false
) {
- return Read(builderCallback, nullptr, Name, commit);
+ return Read(buildCallback, nullptr, Name, commit);
}
void Process(
NActors::TActorId actorId
- , std::function<void(TDbExecuter<TState>&)> handlerCallback
+ , TCallback handlerCallback
) {
Y_VERIFY(HandlerActorId == NActors::TActorId{}, "Handler must be empty");
ActorSystem = TActivationContext::ActorSystem();